Ruby Ruby 基于 RabbitMQ 场景的应用

stanwang · 2016年10月01日 · 最后由 betterthornbird 回复于 2016年10月03日 · 4395 次阅读

RabbitMQ 是基于 AMQP 的一种开源实现,使用 Erlang 语言开发,是一种重量级的消息中间件。特别适用于 SOA 场景下的消息异步处理,通过订阅者和发布者模式解耦各个模块的强依赖关系,一定程度上缓解分布式事务处理的复杂场景,通过 Ack 和 Nack 机制保证消息的相对必达性。

RabbitMQ 基础概念

概念 解释
Conection 对应一个 TCP 连接
Channel 相对 Conection 更高层次的连接抽象,约等于进程线程的关系,建立和关闭的代价小于 Connection
Exchange 消息中转节点,一般会在这里设置各种路由规则
Queue 消息队列通道
Key 即 routing_key,消息路由依据,Queue 没有 bind routing_key 时默认路由 Queue 的 name

路由规则

  • Direct Exchange

直接路由即根据绑定的 routing_key 完全匹配时才将消息路由到指定的 Queue

直接路由

  • Fanout Exchange

广播即路由消息至所有绑定到当前 Exchange 的所有队列

  • Topic Exchange

主题订阅,通过特定的匹配规则模糊路由消息,# 匹配 0 个或更多关键字,* 匹配一个关键字

场景策略

RabbitMQ 支持分布式消息负载路由,同级 N 个 Server 消费同一个 Queue,消息会根据策略(Round Robin)自动路由至一个消费者,如果期望多个消费者同时收取同一条消息,请每个消费者创建各自 Queue,并绑定至指定的 routing_key.

Ruby 使用 RabbitMQ

Ruby 中可以通过Bunny访问 RabbitMQ,这个 Gem 包提供了 RabbitMQ 的一些基础访问方法和很好的异常处理机制。

消息发送代码:

require "bunny"

conn = Bunny.new
conn.start

#创建连接通道
ch   = conn.create_channel

q    = ch.queue("hello")

#向指定Topic发送消息
ch.default_exchange.publish("Hello World!", :routing_key => q.name)

puts " [x] Sent 'Hello World!'"

conn.close

消息接收代码:

require "bunny"

conn = Bunny.new
conn.start

ch = conn.create_channel

#声明消费主题为hello
q = ch.queue("hello")

puts " [*] Waiting for messages in #{q.name}. To exit press CTRL+C"

#启用block避免手动loop
q.subscribe(:block => true) do |delivery_info, properties, body|
    puts "[x] Received #{body}"

    # 此处代码在接收到消息时将退出,注释掉可以一直消费直到中断
    # delivery_info.consumer.cancel
end

Bunny 支持各种容错性恢复,包括网络异常,如果因为特别场景需要关闭连接异常自动重连,可以在Bunny.new

:automatic_recovery => false

参考地址

这是对某 document 的一个笔记?

需要 登录 后方可回复, 如果你还没有账号请 注册新账号