Ruby 使用 Ruby 实现 Erlang Process -- 8 秒 spawn 一百万 “process”

yfractal · 2018年11月11日 · 最后由 lanzhiheng 回复于 2018年11月19日 · 5584 次阅读
本帖已被管理员设置为精华贴

Erlang 最为人称道的就是其出色的并发能力。随之而来的是各种疑问,为什么 Erlang 并发能力如此强大,为什么 Erlang 的 Process 要比系统的 Thread 更轻量,Erlang 调度器是如何工作的。

想要理解一个事物,除了阅读资料以外,就是自己动手做一个玩。

Process

Erlang 的并发能力主要由 Erlang Process 和 Scheduler 提供。我们先来看 Process。

Erlang Process 是一个独立运行的单元,外界通过消息传递和 Process 交互。比如 Pid ! hello。每个 Process 又有自己的状态。如果把 ! 改成 send,那么完全可以把 Erlang Process 当做一个对象来看。所以在这里用 ErlProcess 这个类来实现 Erlang Process。

Erlang Process 收到消息时,消息会被先放到 mailbox 里,等 process 获得执行权的时候,在会对其进行处理。

也就是说 Erlang 同步接收消息,异步处理消息。而消息处理又一条条处理的,所以在 Erlang Process 内不存在任何线程安全、锁之类的问题。

mailbox 非常好实现,只要给 ErlProcess 加个实例变量 @mailbox 即可。

class ErlProcess
  def initialize
    @mailbox = []
  end

Erlang 使用 receive 方法,阻塞当前 process,并申明期望接收的消息,和对应的操作。等到 process 有执行权的时候,如果有匹配的消息,则调用对应的操作进行处理。

这种先声明、让权、延后执行的能力,Ruby 至少有两种实现方式。

第一种是用 fiber,比如庄生梦蝶就用 fiber 实现了 actor 模型(我对里面的代码进行了整理,代码在这里)。fiber 本身比较复杂,而且 fiber 只能在一个线程内进行切换,无法利用多核多线程的能力。

所以这里选用更直观的 block 来实现。

ErlProcess#receive 接收一个 block 参数,receive 被调用完之后,会让出执行权。

Erlang 的 receive 可以有多条语句,比如

receive
  hello -> io:format("hello");
  word -> io:format("word")
end

这其实是利用了 Erlang 的 pattern match,Ruby 不支持 pattern match。但我们可以做一个简单的。

我们让 ErlProcess#receive 多接收一个 type 参数,type 是一个 symbol,用来表明消息的类别。之后再利用 block 参数的个数,就实现了一个简单版本 pattern match。

def receive(type, &block)
    @receivers ||= []
    @receivers << [type, block.arity + 1, block]
end

使用的例子

def loop(count)
  receive(:inc) do
    puts "inc counter"
    loop(count + 1)
  end
  receive(:stop) do
    puts "Stop! Count is #{count}"
  end
end

ErlProcess 有了 receive 之后,需要能被唤醒 (resume)。

唤醒实质上就是,检查 @mailbox 里是否有期望的消息。既检查消息第一个参数是否等于 receive 声明的 type,参数的个数是否和 block 声明的相等。

def resume
  # xxxx
  # loop through all message in mailbox
  msg = @mailbox[i]
  @receivers.each do |receiver|
    if msg[0] == receiver[0] && msg.count == receiver[1]
      @receivers = []

      receiver[2].call *msg[1..-1]

      @mailbox.delete_at i

      return nil
    end
  end
  # xxxx
end

我们看到,一个 process 的状态有,从创建后声明 receive,等待 (waiting) 消息传入,消息传入后,还没有执行,process 处于 runnable, 执行过后,没有 receivers,既没有需要执行的代码,process 进入 dead 状态。

process 的状态如下图

Scheduler

并行是指同一时间处理多个事情的能力。并发是指逻辑上同时处理多个事情的能力。并发要做的事情是,让每个任务执行一段时间后,切换到另一个任务执行。

Erlang 使用 reductions 做为切换的依据。

Erlang 给每个 process 分配了 4000 个 reductions(之前的版本是 2000),每一次方法的调用,都消耗一个 reduction(nif,I/O 会多些,这里不做深入讨论,具体看 The BEAM Book)。如果 4000 个 reductions 被用完了,就执行下一个 runnable process。

ErlScheduler 的调度就简单多了,ErlProcess 每次调用 receive 的时候,只是是把要执行的语句存下来,之后就可以执行下一个 ErlProcess。每次 ErlProcess#resume 会处理一条消息,然后让权。所以 ErlScheduler 根据消息进行调度。

Erlang 的调度器,使用一个队列来存放 runnable 的 processes(实际上有多个队列,具体还是看 Elrang Runtime System),

ErlScheduler里使用 loop 方法遍历对应的队列 @runnable_pids。当 process 接到消息的时候,则把 process 入队,process resume 的时候则出队。

def loop
  while pid = @runnable_pids.shift
    process = @pid_to_object[pid]
    state = process.resume
    @pid_to_object.delete(pid) if state == :dead
  end
end

我们注意到,@runnable_pids 没有直接存 object 的引用,而是存了 pid。

这样做的原因是,一个 process 执行完,需要释放内存。Ruby 会自动回收没有引用的对象的内存,所以不能把对象的引用直接暴露出去,而是每次 spwan 的时候,都返回一个 pid,使用 @pid_to_object 来记录 pid 到 object 的映射。当一个 process 处于 dead 的时候,直接从 @pid_to_object 里面删掉就可以了。

Thread safe? Lock?

没有,目前也不需要。因为目前的 scheduler 都是在一个线程里执行的,代码切换,都是在完成一个方法调用之后再切换的。 所以不需要考虑线程切换的问题,也就不需要锁。

这样做的缺点是,无法利用多核资源。Erlang 处理的方法是,一个 Erlang 虚拟机根据 cpu 物理进程数,创建多个 scheduler。 每个 scheduler 独立执行 process,如果某个 scheduler 闲下来的时候,就去拿其他 scheduler 的 process。如果所有 scheduler 都闲着,就 spin io。

即使有多个 scheduler,同一个时间,同一个 process 只能在一个 scheduler 内被执行,所以在 process 内依然不存在 thread safe 之类的问题。

缺陷

目前 ErlProcessErlScheduler 虽然可以说明一些 Erlang 的原理,但仍然很简陋,只能算是一个玩具,有很多缺陷。

除了上面说到的不能利用多核外,还没有提供异步 I/O 的机制。没有异步 I/O 的话,虽然有并发能力,但没有效率。是最大的缺陷。

Elang 使用 Driver + Port,有单独的线程池做处理。在 Ruby 里,可以使用异步 I/O 发起 I/O 操作,之后将 file descriptor + callback 挂到 nio 上,让权给其他 process,同时 nio 使用 select 查找 ready 的 file descriptor,之后再执行 file descriptor 对应的 callback 即可。ErlProcess 还需要一个这样的机制。

ErlProcess 没有优先级。目前 ErlProcess 里的 process 都是平等的,而实际应用中,有些 process 起到了枢纽作用,负载更高,需要可以优先执行。

Erlang 有 timeout 机制,等待的时间超过 timeout 的时间,会触发 timeout 事件。可以让一次调用的时间在可控的范围内,也可以用来解决 process 间的死锁问题。

Erlang Process 还可以 link,和 monitor,这样一个 process 可以知道自己依赖的 process 的状态,当所依赖的 process 出现状况的时候,可以做相应的处理。

再比如,可以考虑用 lambda 来实现 process 而不是对象等等。

Example

我们来看一个例子

class Counter < ErlProcess
  def initialize
    super
    loop(0)
  end

  def loop(count)
    receive(:inc) do
      puts "inc counter"
      loop(count + 1)
    end
    receive(:stop) do
      puts "Stop! Count is #{count}"
    end
  end
end

10000.times do |i|
  scheduler.send_message(counter, :inc)
end

scheduler.send_message(counter, :stop)
scheduler.loop

Benchmark

目前,可以用 ErlProcess 实现 counter,还可以 ping,pong。基本的能力都有了,我们来跑段代码,看下性能。

我用 ErlProcessErlScheduler 实现了 Skynet

简单来说 Skynet 就是 spawn 1M 的 processes,每 spawn process 都给一个 number,这个 number 从 0 开始递增。spawn 完所有 process 之后,再把 number 传到 process 对应的 parent 里,并相加。

在本地,Erlang 的 skynet 用了 2872 ms,而 Ruby 版用了 8118 ms,速度大约是 Erlang 三分之一左右。个人觉得速度还是可以接受的,当然目前的实现非常简陋,这个比较并不公平。

链接

不错,通过 Ruby 可以更快捷地了解 Erlang。学习下。

jasl 将本帖设为了精华贴。 11月12日 04:03

天壤之别,erlang 的精髓在实现上。

lilijreey 回复

一看你就特别了解 Erlang,来来具体说说?

erlang 的调度是 cpu 时间片的,你不可以用 for 循环占住 cpu,这是 erlang 的 vm 实现的,ruby 这种完全做不到。

另外,erlang 有一个大不一样的地方是 actor 的崩溃不会危害整个 vm,而且 actor 可以建立监督机制,监督的 actor 会收到子 actor 崩溃的信息。ruby 没有这种机制,弄不好,整个程序栈都崩溃了,会让整个进程崩掉。

jimrokliu 回复

erlang 的调度是 cpu 时间片的

这个是怎么得到的?The beam book 里写的是根据 reductions 做的,而 reductions 简单来说,是方法调用次数。OTP 里也能找到 erts_current_reductions 之类的方法。

jimrokliu 回复

哈哈,是这样的。如果 process 有异常的话,整个程序都会挂掉。不过处理起来也比较容易,执行的时候 catch 所有的异常就可以了,有异常,直接让 process 挂掉就可以。

moitor 和 link 其实也可以实现。process resume 之后的状态如果是 dead 的话,这个 process 有 monitor 的话,就发 EXIT 消息就可以了。

这些问题有想过,不过都没去实现。。。我目前觉得异步 I/O 是缺少的最关键的 feature。

😆 刚上 Erlang 的车 7 天。

lanzhiheng 回复

欢迎入坑,其实 Elixir 挺好的。Erlang 语法太个性了。

yfractal 回复

erlang 是 reductions,不是时间片,跟操作系统的调度混了。

jjym 回复

thx,第一个跑 demo 跑不起来,就放弃了…

yfractal 回复

应该没问题啊,这个库还是很稳定的,别用 master 分支,用 release 的 gem 来跑试试

jjym 回复

哈,可能是 master 的原因,thx again。

@jasl @huacnlee 楼上广告赶紧删掉😂

gehao 回复

处理了。。。

yfractal 回复

之前有看过一下 Elixir,看起来比较舒服。不过感觉如果上 Elixir 早晚还得回来看 Erlang 就干脆上 Erlang 先了。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号