Debug 的时候顺便看了一下源码、就按照自己的思维对源码有个复盘
这里记录一下也是顺便安利一下我们项目组在项目中对于某个逻辑常见的复盘方法
ConsumerWorkPool 拉出来是想给大家分享一下 Bunny 制作 Pool 的一个思路、我们的一些逻辑也借用了这个思路
然后总的来说 Bunny 的源码对 面向对象的封装也有很多取舍、代码也谈不上特别漂亮的 (一些方法太长没拆分之类的)
下面三部分一起介绍了 Queue Subscribe & Consume 的流程
ConsumerWorkPool
顾名思义也就是 消费者工作的池子
ConsumerWorkPool
通过 死循环
利用 线程 (Thread) 去 消费(consume) 队列中生产者积攒的消息
三个比较重要的 Instance Method
submit: 整个池子动起来之后唯一能被外界塞入待处理消息的地方
start: 初始化整个池子的运转 可以看到创建线程 并且准备好了循环体
join: threads.map(:&join)
class ConsumerWorkPool
attr_reader :threads
attr_reader :size
def initialize()
@queue = ::Queue.new
end
# submit 方法可以说是这个工作池启动工作的外界因素了 (顺便,&block 本来就是 callable 的)
def submit(callable = nil, &block)
@queue.push(callable || block)
end
def start
@threads = []
@size.times do
t = Thread.new(&method(:run_loop))
@threads << t
end
@running = true
end
def join(timeout = nil)
(@threads || []).each { |t| t.join(timeout) }
end
protected
def run_loop
catch(:terminate) do
loop do
Thread.stop if @paused
callable = @queue.pop
begin
callable.call
rescue ::StandardError => e
end
end
end
end
end
队列的概念存在于消费者这边。消费者的客户端需要去订阅队列,例如官方案例:
queue.subscribe(block: true) do |_delivery_info, _properties, body|
puts " [x] Received #{body}"
end
可以看到 subscribe
方法创建了一个 Comsumer
的实例
然后将业务逻辑的代码块 - 也就是 &block(顺便也是上文提到的 callable)作为参数传给 Comsumer#on_delivery
等待 consumer
被上文提到的 ConsumerWorkPool
call
( 也就是执行业务逻辑) 即可
class Queue
# Adds a consumer to the queue
def subscribe(opts = {}, &block)
consumer = Comsumer.new(
@channel,
self
)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
@channel.work_pool.join # 这里是直接启动线程了
end
consumer
end
end
class Consumer
def on_delivery(&block)
@on_delivery = block
self
end
def call(*args)
@on_delivery.call(*args) if @on_delivery
end
alias handle_delivery call
end
Channel 可以说是 RabbitMQ 交互的核心、因为 Connection 的建立、销毁代价太高,使用了 Channel 来多路复用一个 TCP 连接
在这里我们关心以下几个方法
basic_consume_with(consumer) Register a consumer for queue as Consumer instance
maybe_start_consumer_work_pool! 保证启动 consumer_work_pool
register_consumer
handle_frameset(basic_deliver, properties, content)
class Channel
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@consumers = Hash.new
end
def basic_consume_with(consumer)
maybe_start_consumer_work_pool!
...
register_consumer(consumer.consumer_tag, consumer)
end
def maybe_start_consumer_work_pool!
if @work_pool && !@work_pool.running?
@work_pool.start
end
end
def generate_consumer_tag(name = "bunny")
"#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end
def register_consumer(consumer_tag, consumer)
@consumer_mutex.synchronize do
@consumers[consumer_tag] = consumer
end
end
# 可以看到这里调用了 ConsumerWorkPool 的 submit 方法、 consumer 自己也 call 了
def handle_frameset(basic_deliver, properties, content)
consumer = @consumers[basic_deliver.consumer_tag]
if consumer
@work_pool.submit do
begin
consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
rescue StandardError => e
@uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
end
end
else
@logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
end
end
end