项目地址:https://github.com/mperham/connection_pool,官方介绍如下
Generic connection pooling for Ruby.
如果你使用过 sidekiq 或者 dalli(多线程模式,需要自行设置),那你已经使用了这个 Gem。而且你猜的没错,他们的作者都是 Mike Perham(Github 地址)。
作为一个通用的连接池项目,它的代码足够简单易读。
$redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.connect }
$redis.with do |conn|
conn.sadd('foo', 1)
conn.smembers('foo')
end
|--lib
|--connection_pool
|--timed_stack.rb
|--version.rb
|--connection_pool.rb
整个项目只有 4 个 class
ConnectionPool # Pool class
ConnectionPool::TimedStack # Stack,用于生成和保存连接
ConnectionPool::PoolShuttingDownError # 继承自 RuntimeError 的 Error
ConnectionPool::Wrapper # ConnectionPool 的简单包装,可以忽略
整个项目的运行原理,大致如下:
@key = :"current-#{@available.object_id}"
来标记自己Mutex
线程锁来保证线程安全,另外使用了 ConditionVariable
来保证资源的可用或者 raise Timeout::Error。Thread.new { pool.with {|con| con.do_something} }
@key
来找到对应的 stack,::Thread.current[@key]
代码中 从 timed_stack 到 调用线程的 stack 被称之为 checkout,相应的,从线程的 stack 返回到 timed_stack 被称之为 checkin
# 使用,创建一个连接池
$redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.connect }
initialize
def initialize(options = {}, &block) # Line #44, lib/connection_pool.rb
# block 是用于生成连接的代码,比方 Redis.connect
raise ArgumentError, 'Connection pool requires a block' unless block
# 配置部分,默认是 DEFAULTS = {size: 5, timeout: 5}
options = DEFAULTS.merge(options)
@size = options.fetch(:size)
@timeout = options.fetch(:timeout)
@available = TimedStack.new(@size, &block) # 实例了一个栈,用于保存连接
@key = :"current-#{@available.object_id}" # 分配一个 pool 的 key,以后方便找回
end
# 使用 pool 来执行操作
$redis.with do |conn|
conn.sadd('foo', 1)
end
with
def with(options = {}) # Line #59, lib/connection_pool.rb
# 延后 interrupt,以免出现 connection leak,比方主线程中断时
Thread.handle_interrupt(Exception => :never) do
conn = checkout(options) # 从 timed_stack checkout 一个 connection
begin
Thread.handle_interrupt(Exception => :immediate) do
yield conn # 执行连接上的操作,比方 conn.sadd('foo', 1)
end
ensure
checkin # 保证 checkin,归还连接给 timed_stack
end
end
end
check out
def checkout(options = {}) # Line #86, lib/connection_pool.rb
conn = if stack.empty?
timeout = options[:timeout] || @timeout
@available.pop(timeout: timeout) # 从 timed_stack 取一个连接
else
stack.last # 如果已经有,直接使用线程的 stack 里最后一个连接
end
stack.push conn # 把连接 push 线程的 stack
conn
end
check in
def checkin # Line #98, lib/connection_pool.rb
conn = pop_connection # 从线程的 stack pop 出连接
@available.push(conn) if stack.empty? # 如果线程的 stack 已经清空,那么把连接放回到 timed_stack
nil
end
shutdown
# 关闭整个连接池
$redis.shutdown { |conn| conn.quit }
def shutdown(&block) # Line #105, lib/connection_pool.rb
@available.shutdown(&block)
end
# 使用,创建一个保存连接的 stack
TimedStack.new(size) { Redis.connect }
def initialize(size = 0, &block) # Line #35, lib/connection_pool/timed_stack.rb
@create_block = block # 用来创建连接的代码块
@created = 0 # 已经创建的连接数量
@que = [] # 真正用来保存连接的 stack,用 Ruby 的数组实现
@max = size # 最大连接数
@mutex = Mutex.new # 线程锁
@resource = ConditionVariable.new # Mutex使用时资源的控制变量
@shutdown_block = nil # 用来关闭连接的代码块
end
pop
def pop(timeout = 0.5, options = {}) # Line #71, lib/connection_pool/timed_stack.rb
# 允许传递 {timeout: 0.7} 这样的参数
options, timeout = timeout, 0.5 if Hash === timeout
timeout = options.fetch :timeout, timeout
deadline = Time.now + timeout
# 加上线程锁开始操作
@mutex.synchronize do
loop do
# 如果已经开始关闭连接了,那么就不该再 pop 出去连接了
raise ConnectionPool::PoolShuttingDownError if @shutdown_block
# 如果 @que 中有可用的连接,取一个出来,返回
return fetch_connection(options) if connection_stored?(options)
# 如果 @que 中没有可用的连接
# 试着创建一个,成功的话就返回
connection = try_create(options)
return connection if connection
# 创建失败的话,比方已经达到最大连接数后不允许继续创建,那么继续往下走
# 如果已经超时,抛错
to_wait = deadline - Time.now
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
# 释放 @mutex 锁,等待有可用连接。等到再次唤醒后再来重新执行这个 loop
@resource.wait(@mutex, to_wait)
end
end
end
push
def push(obj, options = {}) # Line #49, lib/connection_pool/timed_stack.rb
# 加锁开始操作
@mutex.synchronize do
# 如果已经收到了关闭的代码块,那么关闭这个 push 回来的连接
if @shutdown_block
@shutdown_block.call(obj)
else
# 不然就存起来
store_connection obj, options
end
# 释放锁,唤醒等待的操作(比方pop)
@resource.broadcast
end
end
alias_method :<<, :push
shutdown
def shutdown(&block) # Line #95, lib/connection_pool/timed_stack.rb
raise ArgumentError, "shutdown must receive a block" unless block_given?
# 加锁开始执行
@mutex.synchronize do
# 保存关闭连接的代码块
@shutdown_block = block
# 释放锁,唤醒其他所有等待这个锁的操作(比方pop)
# 这个时候如果还在执行 pop 里的 loop 就会 raise PoolShuttingDownError
# 后续使用这些连接的操作都会收到这个 error
@resource.broadcast
# 关闭所有的连接
shutdown_connections
end
end