最近在恶补并发的知识,看了不少代码和博客,稍微有了一点点感觉。趁着印象还比较深刻,先记下来目前对 eventmachine 和 fiber 使用的理解。
通过读 em-synchrony 的代码,发现其中连接池的实现是一个很好的例子,下面就通过模仿 em-synchrony 线程池的实现来说明我所理解的使用模式。
require 'fiber'
require 'eventmachine'
class Connection
end
class Pool
def initialize(size)
# 构造连接的数组,这里用虚构的Connection代替,可以抽象的理解为各种资源,如mysql连接,redis连接...
@connections = size.times.map { Connection.new }
# 等待队列,存放等待资源的fiber
@pending = []
end
def execute
# 获取资源,当没有空闲资源的时候,会让出执行,而不会阻塞进程
conn = acquire
# 只有在拿到资源后才会执行到这里
'get conn, do job'
# 传过来的block应该是一个异步的任务
yield conn if block_given?
release(conn)
end
def acquire
f = Fiber.current
# 试图去连接池中拿
c = @connections.pop
# 拿到了就直接返回
return c if c
# 如果没拿到,就需要等待
puts 'waiting for connection'
# 但是这里的等待并不是阻塞等待的,因为有了fiber,我们可以在这里让出执行权
# 只要将当前的fiber加入到等待队列中
Fiber.yield @pending.push f
# 当fiber被唤醒时,会在这里继续执行,再次尝试acquire
acquire
end
def release(conn)
# 把连接放回连接池
@connections.push conn
# 当资源被释放时,会从等待队列中唤醒一个fiber
p = @pending.shift and p.resume
end
end
怎么用?
EM.run do
# 容量为3的资源池
pool = Pool.new(3)
# 其他事件
EM.add_periodic_timer(2) { puts 'concurrent running!' }
# 执行10件任务,10个fiber协作
10.times do
Fiber.new do
pool.execute do |conn|
f = Fiber.current
time = rand(10)
# 这里注册了事件后就会继续向下执行
# 相当于模拟了一个需要执行time时间的异步任务
# 任务的执行不会阻塞进程,而是在任务完成后的callback中才唤醒自身
EM.add_timer(time) do
# 在callback中恢复执行
f.resume("mission complete in #{time}")
end
# 立刻让出执行权
puts Fiber.yield
end
end.resume
end
end
可以执行上面的代码,通过输出可以对整个过程有更好的理解。
这种基于 fiber 的连接池一定要配合 eventmachine 一起用才能发挥作用,其实就是 em-synchrony 的原理。em-synchrony 将整个块放到 fiber 中执行,注册事件后就Fiber.yield
让权,并在 callback 中唤醒。这样通过包装将 callback 隐藏起来了。
节选一段 em-http 的实现,可以看出相同的实现模式:
module EventMachine
module HTTPMethods
%w[get head post delete put patch options].each do |type|
class_eval %[
alias :a#{type} :#{type}
def #{type}(options = {}, &blk)
f = Fiber.current
conn = setup_request(:#{type}, options, &blk)
if conn.error.nil?
conn.callback { f.resume(conn) }
conn.errback { f.resume(conn) }
Fiber.yield
else
conn
end
end
]
end
end
end
为什么事件驱动很重要?因为如果在拿到资源后同步执行,那么等待返回的过程会阻塞整个进程,也就是说 fiber 只能一个接一个的执行,那么就退化到了跟不用 fiber 一样,都是顺序执行了。
因为使用 eventmachine,或者说 reactor 模式。每个 fiber 在拿到连接后其实只需要注册一个事件,然后就把自己挂起(让出执行权),在事件完成后会执行 callback,并在 callback 中唤醒之前的任务,并赋给它异步请求的结果。
而没拿到连接的 fiber 会让出执行权,在其他 fiber 释放了连接后才被唤醒。因为大家都是事件驱动,不会因为 IO 或是等待连接而阻塞,从而让所有任务可以并发的执行。
以上就是我的理解,欢迎指正与讨论。