Erlang 最为人称道的就是其出色的并发能力。随之而来的是各种疑问,为什么 Erlang 并发能力如此强大,为什么 Erlang 的 Process 要比系统的 Thread 更轻量,Erlang 调度器是如何工作的。
想要理解一个事物,除了阅读资料以外,就是自己动手做一个玩。
Erlang 的并发能力主要由 Erlang Process 和 Scheduler 提供。我们先来看 Process。
Erlang Process 是一个独立运行的单元,外界通过消息传递和 Process 交互。比如 Pid ! hello
。每个 Process 又有自己的状态。如果把 !
改成 send
,那么完全可以把 Erlang Process 当做一个对象来看。所以在这里用 ErlProcess 这个类来实现 Erlang Process。
Erlang Process 收到消息时,消息会被先放到 mailbox 里,等 process 获得执行权的时候,在会对其进行处理。
也就是说 Erlang 同步接收消息,异步处理消息。而消息处理又一条条处理的,所以在 Erlang Process 内不存在任何线程安全、锁之类的问题。
mailbox 非常好实现,只要给 ErlProcess 加个实例变量 @mailbox 即可。
class ErlProcess
def initialize
@mailbox = []
end
Erlang 使用 receive 方法,阻塞当前 process,并申明期望接收的消息,和对应的操作。等到 process 有执行权的时候,如果有匹配的消息,则调用对应的操作进行处理。
这种先声明、让权、延后执行的能力,Ruby 至少有两种实现方式。
第一种是用 fiber,比如庄生梦蝶就用 fiber 实现了 actor 模型(我对里面的代码进行了整理,代码在这里)。fiber 本身比较复杂,而且 fiber 只能在一个线程内进行切换,无法利用多核多线程的能力。
所以这里选用更直观的 block 来实现。
ErlProcess#receive
接收一个 block 参数,receive
被调用完之后,会让出执行权。
Erlang 的 receive 可以有多条语句,比如
receive
hello -> io:format("hello");
word -> io:format("word")
end
这其实是利用了 Erlang 的 pattern match,Ruby 不支持 pattern match。但我们可以做一个简单的。
我们让 ErlProcess#receive
多接收一个 type
参数,type
是一个 symbol,用来表明消息的类别。之后再利用 block 参数的个数,就实现了一个简单版本 pattern match。
def receive(type, &block)
@receivers ||= []
@receivers << [type, block.arity + 1, block]
end
使用的例子
def loop(count)
receive(:inc) do
puts "inc counter"
loop(count + 1)
end
receive(:stop) do
puts "Stop! Count is #{count}"
end
end
ErlProcess 有了 receive 之后,需要能被唤醒 (resume)。
唤醒实质上就是,检查 @mailbox
里是否有期望的消息。既检查消息第一个参数是否等于 receive 声明的 type,参数的个数是否和 block 声明的相等。
def resume
# xxxx
# loop through all message in mailbox
msg = @mailbox[i]
@receivers.each do |receiver|
if msg[0] == receiver[0] && msg.count == receiver[1]
@receivers = []
receiver[2].call *msg[1..-1]
@mailbox.delete_at i
return nil
end
end
# xxxx
end
我们看到,一个 process 的状态有,从创建后声明 receive,等待 (waiting) 消息传入,消息传入后,还没有执行,process 处于 runnable,
执行过后,没有 receivers
,既没有需要执行的代码,process 进入 dead 状态。
process 的状态如下图
并行是指同一时间处理多个事情的能力。并发是指逻辑上同时处理多个事情的能力。并发要做的事情是,让每个任务执行一段时间后,切换到另一个任务执行。
Erlang 使用 reductions 做为切换的依据。
Erlang 给每个 process 分配了 4000 个 reductions(之前的版本是 2000),每一次方法的调用,都消耗一个 reduction(nif,I/O 会多些,这里不做深入讨论,具体看 The BEAM Book)。如果 4000 个 reductions 被用完了,就执行下一个 runnable process。
ErlScheduler
的调度就简单多了,ErlProcess
每次调用 receive
的时候,只是是把要执行的语句存下来,之后就可以执行下一个 ErlProcess。每次 ErlProcess#resume
会处理一条消息,然后让权。所以 ErlScheduler
根据消息进行调度。
Erlang 的调度器,使用一个队列来存放 runnable 的 processes(实际上有多个队列,具体还是看 Elrang Runtime System),
ErlScheduler
里使用 loop
方法遍历对应的队列 @runnable_pids
。当 process 接到消息的时候,则把 process 入队,process resume 的时候则出队。
def loop
while pid = @runnable_pids.shift
process = @pid_to_object[pid]
state = process.resume
@pid_to_object.delete(pid) if state == :dead
end
end
我们注意到,@runnable_pids
没有直接存 object 的引用,而是存了 pid。
这样做的原因是,一个 process 执行完,需要释放内存。Ruby 会自动回收没有引用的对象的内存,所以不能把对象的引用直接暴露出去,而是每次 spwan 的时候,都返回一个 pid,使用 @pid_to_object
来记录 pid 到 object 的映射。当一个 process 处于 dead 的时候,直接从 @pid_to_object
里面删掉就可以了。
没有,目前也不需要。因为目前的 scheduler 都是在一个线程里执行的,代码切换,都是在完成一个方法调用之后再切换的。 所以不需要考虑线程切换的问题,也就不需要锁。
这样做的缺点是,无法利用多核资源。Erlang 处理的方法是,一个 Erlang 虚拟机根据 cpu 物理进程数,创建多个 scheduler。 每个 scheduler 独立执行 process,如果某个 scheduler 闲下来的时候,就去拿其他 scheduler 的 process。如果所有 scheduler 都闲着,就 spin io。
即使有多个 scheduler,同一个时间,同一个 process 只能在一个 scheduler 内被执行,所以在 process 内依然不存在 thread safe 之类的问题。
目前 ErlProcess
和 ErlScheduler
虽然可以说明一些 Erlang 的原理,但仍然很简陋,只能算是一个玩具,有很多缺陷。
除了上面说到的不能利用多核外,还没有提供异步 I/O 的机制。没有异步 I/O 的话,虽然有并发能力,但没有效率。是最大的缺陷。
Elang 使用 Driver + Port,有单独的线程池做处理。在 Ruby 里,可以使用异步 I/O 发起 I/O 操作,之后将 file descriptor + callback 挂到 nio 上,让权给其他 process,同时 nio 使用 select 查找 ready 的 file descriptor,之后再执行 file descriptor 对应的 callback 即可。ErlProcess
还需要一个这样的机制。
ErlProcess 没有优先级。目前 ErlProcess 里的 process 都是平等的,而实际应用中,有些 process 起到了枢纽作用,负载更高,需要可以优先执行。
Erlang 有 timeout 机制,等待的时间超过 timeout 的时间,会触发 timeout 事件。可以让一次调用的时间在可控的范围内,也可以用来解决 process 间的死锁问题。
Erlang Process 还可以 link,和 monitor,这样一个 process 可以知道自己依赖的 process 的状态,当所依赖的 process 出现状况的时候,可以做相应的处理。
再比如,可以考虑用 lambda 来实现 process 而不是对象等等。
我们来看一个例子
class Counter < ErlProcess
def initialize
super
loop(0)
end
def loop(count)
receive(:inc) do
puts "inc counter"
loop(count + 1)
end
receive(:stop) do
puts "Stop! Count is #{count}"
end
end
end
10000.times do |i|
scheduler.send_message(counter, :inc)
end
scheduler.send_message(counter, :stop)
scheduler.loop
目前,可以用 ErlProcess 实现 counter,还可以 ping,pong。基本的能力都有了,我们来跑段代码,看下性能。
我用 ErlProcess
和 ErlScheduler
实现了 Skynet,
简单来说 Skynet 就是 spawn 1M 的 processes,每 spawn process 都给一个 number,这个 number 从 0 开始递增。spawn 完所有 process 之后,再把 number 传到 process 对应的 parent 里,并相加。
在本地,Erlang 的 skynet 用了 2872 ms
,而 Ruby 版用了 8118 ms
,速度大约是 Erlang 三分之一左右。个人觉得速度还是可以接受的,当然目前的实现非常简陋,这个比较并不公平。