Ruby ractor 有没有更简洁的写法

tomanderson · 2021年01月28日 · 最后由 aaline57963 回复于 2021年02月18日 · 780 次阅读

这两天在测试 ractor。一般的写法如下:

def ractor
  threads = 4  #线程数
  t = []

  (1..threads).each do
    r = Ractor.new do
      B.xxx  #执行方法
    end
    t << r
  end

  render json: t.map { |r| r.take }.reduce(:+)
end

但是感觉有点太繁琐了。以前用 gem parallel,写法是:

arr_new = Parallel.map(arr, in_processes:4) do |sub_arr| ...

自动把 arr 拆分,分配到多进程/多线程去运算,结果合并输出。

ractor 有没有类似的方法?

本来指望 gem parralle 升级一下,把多线程方法改为 ractor。然而并没有。找了下也没发现有别的 gem 可以做类似功能。

自己封一个一样的 api 不就好了?

想了个 Array#ractor_map 方法

class Array
  def ractor_map(handler, ractors_count: 1)
    Ractor.make_shareable(handler)

    array_length = length
    min_count = [ractors_count, array_length].min

    ractors = min_count.times.map do
      Ractor.new handler do |handler|
        loop do
          recv = Ractor.receive

          break if recv.is_a?(Interrupt)

          Ractor.yield handler.call(recv)
        end
      end
    end

    (0...min_count).each { |index| ractors[index].send self[index] }

    next_index = ractors_count
    result = []

    loop do
      ractor, ret = Ractor.select(*ractors)
      result << ret

      if result.length >= array_length
        ractors.each { |r| r.send Interrupt.new }
        break
      end

      ractor.send(self[next_index]) if next_index < array_length

      next_index += 1
    end

    result
  end
end

handler = proc { |x| x / 2 }
result = [1, 2, 3, 4, 5].ractor_map(handler, ractors_count: 2)
puts result.inspect # => [1, 0, 1, 2, 2]
zhengpd 回复

非常感谢,试了很管用!我有几个想法,如果方便的话,请您指教一下: 1、这个方法每次输出数组的顺序是随机的,最好能够和原数组对应。这个很容易解决,我把 ractor 处理的数据改为 hash,带上原数组 index 就可以了。 2、如果 handler 中含有类方法,如 handler = proc { |x| A.handle(x) },此方法是无效的,因为 A 类没有传到 ractor 里面去。至于怎样修改,我尝试了半天也没解决…… 3、如果 n 个线程中,有 1 个特别慢,其它线程都执行完了它还没完,那么剩余待处理的数据会继续放到 n-1 个线程中继续运行吗?看代码我认为是的,但是不敢确定。

tomanderson 回复

方法里边的第二个 loop 我的想法是用 select 去取先完成的 ractor,然后执行数组的下一个待处理元素,理论上是你说的第 3 点的逻辑,不过也没仔细验证过。

1、2 不太清楚,ractor 我只是简单看了下,还不如你有经验

需要 登录 后方可回复, 如果你还没有账号请 注册新账号