RabbitMQ 是基于 AMQP 的一种开源实现,使用 Erlang 语言开发,是一种重量级的消息中间件。特别适用于 SOA 场景下的消息异步处理,通过订阅者和发布者模式解耦各个模块的强依赖关系,一定程度上缓解分布式事务处理的复杂场景,通过 Ack 和 Nack 机制保证消息的相对必达性。
概念 | 解释 |
---|---|
Conection | 对应一个 TCP 连接 |
Channel | 相对 Conection 更高层次的连接抽象,约等于进程和线程的关系,建立和关闭的代价小于 Connection |
Exchange | 消息中转节点,一般会在这里设置各种路由规则 |
Queue | 消息队列通道 |
Key | 即 routing_key,消息路由依据,Queue 没有 bind routing_key 时默认路由 Queue 的 name |
直接路由即根据绑定的 routing_key 完全匹配时才将消息路由到指定的 Queue
广播即路由消息至所有绑定到当前 Exchange 的所有队列
主题订阅,通过特定的匹配规则模糊路由消息,# 匹配 0 个或更多关键字,* 匹配一个关键字
RabbitMQ 支持分布式消息负载路由,同级 N 个 Server 消费同一个 Queue,消息会根据策略(Round Robin)自动路由至一个消费者,如果期望多个消费者同时收取同一条消息,请每个消费者创建各自 Queue,并绑定至指定的 routing_key.
Ruby 中可以通过Bunny访问 RabbitMQ,这个 Gem 包提供了 RabbitMQ 的一些基础访问方法和很好的异常处理机制。
消息发送代码:
require "bunny"
conn = Bunny.new
conn.start
#创建连接通道
ch = conn.create_channel
q = ch.queue("hello")
#向指定Topic发送消息
ch.default_exchange.publish("Hello World!", :routing_key => q.name)
puts " [x] Sent 'Hello World!'"
conn.close
消息接收代码:
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
#声明消费主题为hello
q = ch.queue("hello")
puts " [*] Waiting for messages in #{q.name}. To exit press CTRL+C"
#启用block避免手动loop
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "[x] Received #{body}"
# 此处代码在接收到消息时将退出,注释掉可以一直消费直到中断
# delivery_info.consumer.cancel
end
Bunny 支持各种容错性恢复,包括网络异常,如果因为特别场景需要关闭连接异常自动重连,可以在Bunny.new中
:automatic_recovery => false