Gem 源码阅读系列 - connection_pool

serco · 2015年08月20日 · 最后由 serco 回复于 2015年09月16日 · 6012 次阅读
本帖已被设为精华帖!

项目地址:https://github.com/mperham/connection_pool,官方介绍如下

Generic connection pooling for Ruby.

如果你使用过 sidekiq 或者 dalli(多线程模式,需要自行设置),那你已经使用了这个 Gem。而且你猜的没错,他们的作者都是 Mike Perham(Github 地址)。

作为一个通用的连接池项目,它的代码足够简单易读。

1-基本用法

$redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.connect }
$redis.with do |conn|
  conn.sadd('foo', 1)
  conn.smembers('foo')
end

2-目录结构

|--lib
   |--connection_pool
     |--timed_stack.rb
     |--version.rb
   |--connection_pool.rb

整个项目只有 4 个 class

ConnectionPool # Pool class
ConnectionPool::TimedStack # Stack,用于生成和保存连接
ConnectionPool::PoolShuttingDownError # 继承自 RuntimeError 的 Error
ConnectionPool::Wrapper # ConnectionPool 的简单包装,可以忽略

3-原理

整个项目的运行原理,大致如下:

  1. ConnectionPool 会在实例化(下面称该实例为 pool)时同时创建一个 TimedStack 的实例对象(下面称该实例为 timed_stack)用来保存可用的连接。
    • pool 有一个实例变量 @key = :"current-#{@available.object_id}" 来标记自己
  2. 每当 pool 调用 with 方法时,会从 timed_stack pop 出一个可用的连接。
    • timed_stack 中的连接是在使用时动态创建的,最大数为配置参数 size。
    • 在操作 timed_stack 时,使用了 Mutex 线程锁来保证线程安全,另外使用了 ConditionVariable 来保证资源的可用或者 raise Timeout::Error。
  3. 在步骤 2 中使用 pool 的线程也会生成一个 stack 用来保存正在使用的连接,并且在使用完毕后归还给 timed_stack。
    • 可以每次生成一个新的线程来执行,比如 Thread.new { pool.with {|con| con.do_something} }
    • 使用 pool 的线程的 stack 可以用 pool 的 @key 来找到对应的 stack,::Thread.current[@key]

代码中 从 timed_stack 到 调用线程的 stack 被称之为 checkout,相应的,从线程的 stack 返回到 timed_stack 被称之为 checkin

4-ConnectionPool 部分代码解释

步骤 1

# 使用,创建一个连接池
$redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.connect }

initialize

def initialize(options = {}, &block) # Line #44, lib/connection_pool.rb
  # block 是用于生成连接的代码,比方 Redis.connect
  raise ArgumentError, 'Connection pool requires a block' unless block

  # 配置部分,默认是 DEFAULTS = {size: 5, timeout: 5}
  options = DEFAULTS.merge(options)

  @size = options.fetch(:size)
  @timeout = options.fetch(:timeout)

  @available = TimedStack.new(@size, &block) # 实例了一个栈,用于保存连接
  @key = :"current-#{@available.object_id}" # 分配一个 pool 的 key,以后方便找回
end

步骤 2&3

# 使用 pool 来执行操作
$redis.with do |conn|
  conn.sadd('foo', 1)
end

with

def with(options = {}) # Line #59, lib/connection_pool.rb
  # 延后 interrupt,以免出现 connection leak,比方主线程中断时
  Thread.handle_interrupt(Exception => :never) do
    conn = checkout(options) # 从 timed_stack checkout 一个 connection
    begin
      Thread.handle_interrupt(Exception => :immediate) do
        yield conn # 执行连接上的操作,比方 conn.sadd('foo', 1)
      end
    ensure
      checkin # 保证 checkin,归还连接给 timed_stack
    end
  end
end

check out

def checkout(options = {}) # Line #86, lib/connection_pool.rb
  conn = if stack.empty?
    timeout = options[:timeout] || @timeout
    @available.pop(timeout: timeout) # 从 timed_stack 取一个连接
  else
    stack.last # 如果已经有,直接使用线程的 stack 里最后一个连接
  end

  stack.push conn # 把连接 push 线程的 stack
  conn
end

check in

def checkin # Line #98, lib/connection_pool.rb
  conn = pop_connection # 从线程的 stack pop 出连接
  @available.push(conn) if stack.empty? # 如果线程的 stack 已经清空,那么把连接放回到 timed_stack

  nil
end

shutdown

# 关闭整个连接池
$redis.shutdown { |conn| conn.quit }
def shutdown(&block) # Line #105, lib/connection_pool.rb
  @available.shutdown(&block)
end

5-TimedStack 部分代码解释

# 使用,创建一个保存连接的 stack
TimedStack.new(size) { Redis.connect }
def initialize(size = 0, &block) # Line #35, lib/connection_pool/timed_stack.rb
  @create_block = block # 用来创建连接的代码块
  @created = 0 # 已经创建的连接数量
  @que = [] # 真正用来保存连接的 stack,用 Ruby 的数组实现
  @max = size # 最大连接数
  @mutex = Mutex.new # 线程锁
  @resource = ConditionVariable.new # Mutex使用时资源的控制变量
  @shutdown_block = nil # 用来关闭连接的代码块
end

pop

def pop(timeout = 0.5, options = {}) # Line #71, lib/connection_pool/timed_stack.rb
  # 允许传递 {timeout: 0.7} 这样的参数
  options, timeout = timeout, 0.5 if Hash === timeout
  timeout = options.fetch :timeout, timeout

  deadline = Time.now + timeout
  # 加上线程锁开始操作
  @mutex.synchronize do
    loop do
      # 如果已经开始关闭连接了,那么就不该再 pop 出去连接了
      raise ConnectionPool::PoolShuttingDownError if @shutdown_block
      # 如果 @que 中有可用的连接,取一个出来,返回
      return fetch_connection(options) if connection_stored?(options)

      # 如果 @que 中没有可用的连接
      # 试着创建一个,成功的话就返回
      connection = try_create(options)
      return connection if connection

      # 创建失败的话,比方已经达到最大连接数后不允许继续创建,那么继续往下走

      # 如果已经超时,抛错
      to_wait = deadline - Time.now
      raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0

      # 释放 @mutex 锁,等待有可用连接。等到再次唤醒后再来重新执行这个 loop
      @resource.wait(@mutex, to_wait)
    end
  end
end

push

def push(obj, options = {}) # Line #49, lib/connection_pool/timed_stack.rb
  # 加锁开始操作
  @mutex.synchronize do
    # 如果已经收到了关闭的代码块,那么关闭这个 push 回来的连接
    if @shutdown_block
      @shutdown_block.call(obj)
    else
    # 不然就存起来
      store_connection obj, options
    end

    # 释放锁,唤醒等待的操作(比方pop) 
    @resource.broadcast
  end
end
alias_method :<<, :push

shutdown

def shutdown(&block) # Line #95, lib/connection_pool/timed_stack.rb
  raise ArgumentError, "shutdown must receive a block" unless block_given?

  # 加锁开始执行
  @mutex.synchronize do
    # 保存关闭连接的代码块
    @shutdown_block = block
    # 释放锁,唤醒其他所有等待这个锁的操作(比方pop)
    # 这个时候如果还在执行 pop 里的 loop 就会 raise PoolShuttingDownError
    # 后续使用这些连接的操作都会收到这个 error
    @resource.broadcast

    # 关闭所有的连接
    shutdown_connections
  end
end

虽然还看不懂,但应该是很有含量的,慢慢体会!

前几天看到也看到这个 ,顺便把 Ruby MongoDB Driver 和 ActiveRecord 的连接池 也看了看。

这个库值得学习,写的简洁,看起来很容易懂

没看明白的是为什么在 ConnectionPool instance 里面还要整一个 stack 出来放 connection 呢?是为了防止多次运行 checkout 吗?

对 ConditionVariable 也有一点疑问,然后我找到了之前的一个老贴 关于 Ruby 线程之间控制的一个问题 接着对贴子里 7 楼 的代码做了一次测试,发现结果也不及预期,各位也可以看看 https://github.com/crhan/ruby_condition_variable_test

#5 楼 @ruohanc 直接回复在老帖子里了。

#4 楼 @ruohanc 其实这部分我也有疑惑

def checkout(options = {}) # Line #86, lib/connection_pool.rb
  conn = if stack.empty? # 什么时候会出现不是 empty?
    timeout = options[:timeout] || @timeout
    @available.pop(timeout: timeout) 
  else
    stack.last 
  end

  stack.push conn # 比较奇怪的是这里,如果是取的stack.last,这里再push回去就同一个conn放了两个了。 
  conn
end

#7 楼 @serco 找到了修改这行的 commit: 387013

关键还是在于他的测试用例,他想做的是在一个嵌套的 #with 中可以不用重复的 pop 出新连接。虽然不知道为什么要这么设计,但是在这个设计里面这个代码是合理的

#8 楼 @ruohanc 懂了,修改前的代码,嵌套的 with 操作会在结束后把连接放回去,如果外层的 with 还想操作这个连接就会出错。 所以才会有奇怪的这行 stack.push conn, 即使是同一个 conn,每次 checkout 都要把它 push 到 thread 的 stack 上。

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册