分享 em-synchrony 中连接池的实现原理

loveltyoic · April 03, 2015 · Last by yukihiro_matz replied at April 07, 2015 · 2985 hits

最近在恶补并发的知识,看了不少代码和博客,稍微有了一点点感觉。趁着印象还比较深刻,先记下来目前对 eventmachine 和 fiber 使用的理解。

通过读 em-synchrony 的代码,发现其中连接池的实现是一个很好的例子,下面就通过模仿 em-synchrony 线程池的实现来说明我所理解的使用模式。

require 'fiber'
require 'eventmachine'

class Connection
end

class Pool
  def initialize(size)
    # 构造连接的数组,这里用虚构的Connection代替,可以抽象的理解为各种资源,如mysql连接,redis连接...
    @connections = size.times.map { Connection.new }
    # 等待队列,存放等待资源的fiber
    @pending = []
  end

  def execute
    # 获取资源,当没有空闲资源的时候,会让出执行,而不会阻塞进程
    conn = acquire
    # 只有在拿到资源后才会执行到这里
    'get conn, do job'
    # 传过来的block应该是一个异步的任务
    yield conn if block_given?
    release(conn)
  end

  def acquire
    f = Fiber.current
    # 试图去连接池中拿
    c = @connections.pop
    # 拿到了就直接返回
    return c if c
    # 如果没拿到,就需要等待
    puts 'waiting for connection'
    # 但是这里的等待并不是阻塞等待的,因为有了fiber,我们可以在这里让出执行权
    # 只要将当前的fiber加入到等待队列中
    Fiber.yield @pending.push f
    # 当fiber被唤醒时,会在这里继续执行,再次尝试acquire
    acquire
  end

  def release(conn)
    # 把连接放回连接池
    @connections.push conn
    # 当资源被释放时,会从等待队列中唤醒一个fiber
    p = @pending.shift and p.resume
  end
end

怎么用?

EM.run do 
  # 容量为3的资源池
  pool = Pool.new(3)

  # 其他事件
  EM.add_periodic_timer(2) { puts 'concurrent running!' }

  # 执行10件任务,10个fiber协作
  10.times do
    Fiber.new do
      pool.execute do |conn|
        f = Fiber.current
        time = rand(10)
        # 这里注册了事件后就会继续向下执行
        # 相当于模拟了一个需要执行time时间的异步任务
        # 任务的执行不会阻塞进程,而是在任务完成后的callback中才唤醒自身
        EM.add_timer(time) do 
          # 在callback中恢复执行
          f.resume("mission complete in #{time}")
        end
        # 立刻让出执行权
        puts Fiber.yield
      end
    end.resume
  end

end

可以执行上面的代码,通过输出可以对整个过程有更好的理解。

这种基于 fiber 的连接池一定要配合 eventmachine 一起用才能发挥作用,其实就是 em-synchrony 的原理。em-synchrony 将整个块放到 fiber 中执行,注册事件后就Fiber.yield让权,并在 callback 中唤醒。这样通过包装将 callback 隐藏起来了。 节选一段 em-http 的实现,可以看出相同的实现模式:

module EventMachine
  module HTTPMethods
     %w[get head post delete put patch options].each do |type|
       class_eval %[
         alias :a#{type} :#{type}
         def #{type}(options = {}, &blk)
           f = Fiber.current

           conn = setup_request(:#{type}, options, &blk)
           if conn.error.nil?
             conn.callback { f.resume(conn) }
             conn.errback  { f.resume(conn) }

             Fiber.yield
           else
             conn
           end
         end
      ]
    end
  end
end

为什么事件驱动很重要?因为如果在拿到资源后同步执行,那么等待返回的过程会阻塞整个进程,也就是说 fiber 只能一个接一个的执行,那么就退化到了跟不用 fiber 一样,都是顺序执行了。

因为使用 eventmachine,或者说 reactor 模式。每个 fiber 在拿到连接后其实只需要注册一个事件,然后就把自己挂起(让出执行权),在事件完成后会执行 callback,并在 callback 中唤醒之前的任务,并赋给它异步请求的结果。

而没拿到连接的 fiber 会让出执行权,在其他 fiber 释放了连接后才被唤醒。因为大家都是事件驱动,不会因为 IO 或是等待连接而阻塞,从而让所有任务可以并发的执行。

以上就是我的理解,欢迎指正与讨论。

fiber 用的场景比较少

#1 楼 @yukihiro_matz 嗯,一提到并发就用别的语言去了~ 不过了解一下 Fiber 对理解 actor 模式的实现也是挺有帮助的http://www.blogjava.net/killme2008/archive/2010/04/13/318182.html 不论用什么语言,总是存在共性的。

#2 楼 @loveltyoic

Matz is not a threading guy

You need to Sign in before reply, if you don't have an account, please Sign up first.