什么是消息队列,什么是 RabbitMQ,重点推荐以下书籍,能够让你真正理解消息队列的发展历史,以及 RabbitMQ 的方方面面。
《RabbitMQ 实战》高效部署分布式消息队列 美 Alvaro Videla / Jason J.W William 著 汪佳南 译
购买链接: https://item.jd.com/11790530.html
下面,我主要介绍一下 RabbitMQ server 以及 Client 端的安装配置,以及在 Ruby On Rails 中利用 Bunny 发布消息,用 Sneakers 消费消息的详细介绍及使用说明,希望对正在使用 RabbitMQ 以及即将使用 RabbitMQ 的朋友一些参考。
Ubuntu 环境下的安装方法
#更新包
sudo apt-get update
#安装
sudo apt-get install rabbitmq-server
#启动rabbit-server 服务默认启动在5672端口
sudo rabbitmq-server start
#需要从web上面管理控制RabbitMQ
sudo rabbitmq-plugins enable rabbitmq_management
#重启服务以让其生效,控制台默认运行在15672端口
sudo rabbitmqctl stop
sudo rabbitmq-server -detached
#创建用户
rabbitmqctl add_user user_name password
#给权限
rabbitmqctl set_user_tags user_name administrator
rabbitmqctl set_permissions -p / user_name ".*" ".*" ".*"
访问 http://rabbitmq-server-ip:15672 以 user_name
和password
登陆 web 控制台
注:访问 15672 或者 5672 端口以前,请确保本地防火墙,或者阿里云的安全组打开了端口的访问限制
Bunny 是一个使用 Ruby 连接和操作 RabbitMQ 的客户端,参考
其中,Bunny 的标准使用,可以参考 Bunny 官方 Gem 使用手册,按步骤:
官方代码示例(实际生产环境中使用,需要包装代码,后面会有详细说明原因)
require "bunny"
# Start a communication session with RabbitMQ
conn = Bunny.new
conn.start
# open a channel
ch = conn.create_channel
# declare a queue
q = ch.queue("test1")
# publish a message to the default exchange which then gets routed to this queue
q.publish("Hello, everybody!")
# fetch a message from the queue
delivery_info, metadata, payload = q.pop
puts "This is the message: #{payload}"
# close the connection
conn.stop
新建config/initializers/rabbitmq.rb
文件,代码如下
class RabbitMQ
# 可以前提前在MQ Web上面创建好exchange和queue,并创建好绑定和路由的关系
class << self
#全局创建一个单例@connection,这样一个进程只会和RabbmitMQ服务器建立一条真实的TCP连接
#这里的ENV['mq_connection'],配置在环境变量里面,例如: amqp://user_name:password@rabbitmq_server_ip
def connection
@connection ||= Bunny.new(ENV['mq_connection']).start
end
def channel
@channel = connection.create_channel
end
# for example: exchange = RabbitMQ.exchange("exchange_name", {durable: true_or_false, queue: :queue_name})
def exchange(name, option = {})
channel.direct(name, option)
end
def publish(routing_key, message = {})
exchange.publish(message, routing_key: routing_key)
channel.close
end
end
end
下面说一下为什么一个应用创建一个 TCP 链接(connection),多个信道(channel)。
首先,我们必须先连接到 RabbitMQ,才能消费或者发布消息。应用程序和 Rabbit 代理服务器之间创建一条 TCP 连接。一旦 TCP 连接打开(通过了认证),应用程序就可以创建一条 AMQP 信道。信道是建立在“真实的”TCP 连接内的虚拟连接。不论是发布消息、订阅消息还是接收消息,都是通过信道完成的。
为什么用信道而不是直接通过 TCP 连接发送 AMQP 命令呢?主要原因是频繁地建立和销毁 TCP,操作系统的开销非常昂贵。比如,当业务高峰的时候,那么每秒都会有大量的 TCP 连接的创建和销毁,这会造成系统 TCP 连接资源的浪费,很快就出遇到系统的性能瓶颈。
因此,引入了信道 channel 的概念,在一条认证过 TCP 的连接上面每秒可以创建成百上千个信道,而不会影响操作系统,并且,在一条 TCP 连接上面创建多少条信道是没有限制的。
根据上面业务包装的代码,可以简单地实现向 MQ 的消息队列生产消息了
exchange = RabbitMQ.exchange("exchange_name", {durable: true, queue: :queue_name})
exchange.publish({site: :rubychina}.to_json, "your_routing_key")
业务通过 Bunny 在 Rails 中简易、快速地生产发布了消息,就需要有消费者来接收和消费消息,Sneakers 是一个处理 RabbitMQ 消息队列的高性能 Ruby 框架
下面介绍怎么在 Rails 里面使用 Sneaker
1.添加 sneakers gem 包
# Gemfile
gem 'sneakers'
2.Rakefile 引用 seakers/tasks
#Rakefile
require 'sneakers/tasks'
3.配置 sneakers
# 新建 config/initializers/seakers.rb
opts = {
daemonize: true,
amqp: ENV['mq_connection'],
log: "log/sneakers.log",
pid_path: "tmp/pids/sneakers.pid",
threads: 1,
workers: 1,
}
Sneakers.configure(opts)
Sneakers.logger.level = Logger::INFO
4.添加 worker
# 新建文件 app/workers/galaxy_worker.rb
# Seaker 作为消费者,消费binding 为 galaxy.queue的队列
class GalaxyWorker
include Sneakers::Worker
# worker从galaxy.queue队列里面取数据
from_queue "galaxy.queue", env: nil
def work(msg)
logger.info "GalaxyWorker::work msg #{msg}"
# 给队列发送确认信息
ack!
end
end
5.启动 Sneaker 消费队列
WORKERS=GalaxyWorker rake sneakers:run
这样 sneaker 就以 GalaxyWorker 这个 worker 在后台接收和处理消息
启动 Rails 项目,在 console 里面执行
Loading development environment (Rails 6.0.1)
[1] pry(main)> exchange = RabbitMQ.exchange("print.exchange", {durable: true,queue: :print})
=> #<Bunny::Exchange:0x00007fb88fb0c6e8
@arguments=nil,
@auto_delete=nil,
@bindings=#<Set: {}>,
@channel=
#<Bunny::Channel:70215330701360 @id=1 @connection=#<Bunny::Session:0x7fb88fae7348 deployer@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>> @open=true,
@durable=true,
@internal=nil,
@name="print.exchange",
@options={:queue=>:print, :nowait=>false, :durable=>true},
@type=:direct>
[2] pry(main)> exchange.publish({site: :rubychina}.to_json,{routing_key: 'print.key'})
=> #<Bunny::Exchange:0x00007fb88fb0c6e8
@arguments=nil,
@auto_delete=nil,
@bindings=#<Set: {}>,
@channel=
#<Bunny::Channel:70215330701360 @id=1 @connection=#<Bunny::Session:0x7fb88fae7348 deployer@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>> @open=true,
@durable=true,
@internal=nil,
@name="print.exchange",
@options={:queue=>:print, :nowait=>false, :durable=>true},
@type=:direct>
启动 Sneakers Worker
~/workspace/hesheng/galaxy % WORKERS=GalaxyWorker rake sneakers:run
查看 Seakers 日志
~/workspace/hesheng/galaxy % tail -f log/sneakers.log
018-03-14T02:35:01Z p-79500 t-oval8l8bg INFO: Heartbeat: running threads [7]
2018-03-14T02:35:06Z p-79500 t-ovam12vfo INFO: GalaxyWorker::work msg {"site":"rubychina"}
从日志来看,GalaxyWorker 成功收到了消息, {"site":"rubychina"},因此,拿到消息后,可以根据业务对消息进行相应的处理。