Ruby RabbitMQ Ruby 客户端 Bunny 部分源码解读

so_zengtao · 2018年08月03日 · 最后由 imtoken 回复于 2018年09月11日 · 2543 次阅读
本帖已被设为精华帖!

Debug的时候顺便看了一下源码、就按照自己的思维对源码有个复盘
这里记录一下也是顺便安利一下我们项目组在项目中对于某个逻辑常见的复盘方法
ConsumerWorkPool 拉出来是想给大家分享一下 Bunny 制作 Pool 的一个思路、我们的一些逻辑也借用了这个思路
然后总的来说 Bunny 的源码对 面向对象的封装也有很多取舍、代码也谈不上特别漂亮的(一些方法太长没拆分之类的)

下面三部分一起介绍了 Queue Subscribe & Consume 的流程


ConsumerWorkPool 开始说起 (交代背景 😉

ConsumerWorkPool 顾名思义也就是 消费者工作的池子
ConsumerWorkPool 通过 死循环 利用 线程(Thread) 去 消费(consume) 队列中生产者积攒的消息

三个比较重要的Instance Method

submit: 整个池子动起来之后唯一能被外界塞入待处理消息的地方
start:  初始化整个池子的运转 可以看到创建线程 并且准备好了循环体    
join:   threads.map(:&join)
class ConsumerWorkPool
  attr_reader :threads
  attr_reader :size

  def initialize()
    @queue = ::Queue.new
  end

  # submit 方法可以说是这个工作池启动工作的外界因素了 (顺便,&block 本来就是 callable 的)
  def submit(callable = nil, &block)
    @queue.push(callable || block)
  end

  def start
    @threads = []

    @size.times do
      t = Thread.new(&method(:run_loop))               
      @threads << t
    end

    @running = true
  end

  def join(timeout = nil)
    (@threads || []).each { |t| t.join(timeout) }
  end

  protected
    def run_loop
      catch(:terminate) do
        loop do
          Thread.stop if @paused      
          callable = @queue.pop

          begin
            callable.call
          rescue ::StandardError => e

          end
        end
      end
    end
end

然后就是 QueueConsumer

  1. Queues store and forward messages to consumers.
  2. Messages flow from producer to exchanges that route them to queues & queues deliver the message to consumer

队列的概念存在于消费者这边 。消费者的客户端需要去订阅队列,例如官方案例:

queue.subscribe(block: true) do |_delivery_info, _properties, body|
  puts " [x] Received #{body}"
end

可以看到 subscribe 方法创建了一个 Comsumer 的实例

然后将业务逻辑的代码块 - 也就是 &block(顺便也是上文提到的 callable)作为参数传给 Comsumer#on_delivery

等待 consumer 被上文提到的 ConsumerWorkPool call ( 也就是执行业务逻辑) 即可

class Queue
  # Adds a consumer to the queue
  def subscribe(opts = {}, &block)

    consumer = Comsumer.new(
      @channel,
      self
    )

    consumer.on_delivery(&block)
    @channel.basic_consume_with(consumer)
    if opts[:block]
      @channel.work_pool.join           # 这里是直接启动线程了
    end
    consumer
  end
end
class Consumer
  def on_delivery(&block)
    @on_delivery = block
    self
  end

  def call(*args)
    @on_delivery.call(*args) if @on_delivery
  end

  alias handle_delivery call
end

继续往下我们可以看到 Channel

Channel 可以说是 RabbitMQ 交互的核心、因为 Connection 的建立、销毁代价太高,使用了 Channel 来多路复用一个 TCP 连接

在这里我们关心以下几个方法

basic_consume_with(consumer)     Register a consumer for queue as Consumer instance 
maybe_start_consumer_work_pool!  保证启动 consumer_work_pool
register_consumer
handle_frameset(basic_deliver, properties, content)
class Channel

  def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
    @consumers  = Hash.new
  end

  def basic_consume_with(consumer)
    maybe_start_consumer_work_pool!
    ...
    register_consumer(consumer.consumer_tag, consumer)
  end

  def maybe_start_consumer_work_pool!
    if @work_pool && !@work_pool.running?
      @work_pool.start
    end
  end

  def generate_consumer_tag(name = "bunny")
    "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
  end

  def register_consumer(consumer_tag, consumer)
    @consumer_mutex.synchronize do
      @consumers[consumer_tag] = consumer
    end
  end

  # 可以看到这里调用了 ConsumerWorkPool 的 submit 方法、 consumer 自己也 call 了
  def handle_frameset(basic_deliver, properties, content)
    consumer = @consumers[basic_deliver.consumer_tag]
    if consumer
      @work_pool.submit do
        begin
          consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
        rescue StandardError => e
          @uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
        end
      end
    else
      @logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
    end
  end
end

共收到 2 条回复
jasl 屏蔽了此话题:没写完,私聊先隐藏 08月03日 16:33
jasl 将本帖设为了精华贴 08月04日 10:33

为什么不用kafka?

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册