Rails Rails 集成 RabbitMQ 消息队列实践

luolinae86 · 2018年03月14日 · 最后由 Zhang-jia-chen 回复于 2019年08月20日 · 7045 次阅读

什么是消息队列,什么是 RabbitMQ,重点推荐以下书籍,能够让你真正理解消息队列的发展历史,以及 RabbitMQ 的方方面面。

参考文献

《RabbitMQ 实战》高效部署分布式消息队列 美 Alvaro Videla / Jason J.W William 著 汪佳南 译
购买链接: https://item.jd.com/11790530.html

下面,我主要介绍一下 RabbitMQ server 以及 Client 端的安装配置,以及在 Ruby On Rails 中利用 Bunny 发布消息,用 Sneakers 消费消息的详细介绍及使用说明,希望对正在使用 RabbitMQ 以及即将使用 RabbitMQ 的朋友一些参考。

RabbitMQ

安装

Ubuntu 环境下的安装方法

#更新包
sudo apt-get update
#安装
sudo apt-get install rabbitmq-server
#启动rabbit-server 服务默认启动在5672端口
sudo rabbitmq-server start

启用 Rabbitmq_management 插件

#需要从web上面管理控制RabbitMQ
sudo rabbitmq-plugins enable rabbitmq_management
#重启服务以让其生效,控制台默认运行在15672端口
sudo rabbitmqctl stop
sudo rabbitmq-server -detached

创建用户

#创建用户
rabbitmqctl  add_user user_name password
#给权限
rabbitmqctl set_user_tags user_name administrator
rabbitmqctl set_permissions -p / user_name ".*" ".*" ".*"

访问 http://rabbitmq-server-ip:15672user_namepassword登陆 web 控制台

注:访问 15672 或者 5672 端口以前,请确保本地防火墙,或者阿里云的安全组打开了端口的访问限制

生产者 (使用 Bunny)

Bunny 是一个使用 Ruby 连接和操作 RabbitMQ 的客户端,参考

https://github.com/ruby-amqp/bunny

其中,Bunny 的标准使用,可以参考 Bunny 官方 Gem 使用手册,按步骤:

  • 创建一条和 RabbitMQ 消息服务器的真实 TCP 连接
  • 在 TCP 链接上面创建 AMQP 信道(channel)
  • 在信道上面申明队列
  • 队列将消息发布到交换器

官方代码示例(实际生产环境中使用,需要包装代码,后面会有详细说明原因

require "bunny"

# Start a communication session with RabbitMQ
conn = Bunny.new
conn.start

# open a channel
ch = conn.create_channel

# declare a queue
q  = ch.queue("test1")

# publish a message to the default exchange which then gets routed to this queue
q.publish("Hello, everybody!")

# fetch a message from the queue
delivery_info, metadata, payload = q.pop

puts "This is the message: #{payload}"

# close the connection
conn.stop

业务代码包装

新建config/initializers/rabbitmq.rb文件,代码如下

class RabbitMQ
  # 可以前提前在MQ Web上面创建好exchange和queue,并创建好绑定和路由的关系
  class << self
    #全局创建一个单例@connection,这样一个进程只会和RabbmitMQ服务器建立一条真实的TCP连接
    #这里的ENV['mq_connection'],配置在环境变量里面,例如: amqp://user_name:password@rabbitmq_server_ip
    def connection
      @connection ||= Bunny.new(ENV['mq_connection']).start
    end

    def channel
      @channel = connection.create_channel
    end

    # for example: exchange = RabbitMQ.exchange("exchange_name", {durable: true_or_false, queue: :queue_name})
    def exchange(name, option = {})
      channel.direct(name, option)
    end

    def publish(routing_key, message = {})
      exchange.publish(message, routing_key: routing_key)
      channel.close
    end
  end
end

下面说一下为什么一个应用创建一个 TCP 链接(connection),多个信道(channel)。

首先,我们必须先连接到 RabbitMQ,才能消费或者发布消息。应用程序和 Rabbit 代理服务器之间创建一条 TCP 连接。一旦 TCP 连接打开(通过了认证),应用程序就可以创建一条 AMQP 信道。信道是建立在“真实的”TCP 连接内的虚拟连接。不论是发布消息、订阅消息还是接收消息,都是通过信道完成的。

为什么用信道而不是直接通过 TCP 连接发送 AMQP 命令呢?主要原因是频繁地建立和销毁 TCP,操作系统的开销非常昂贵。比如,当业务高峰的时候,那么每秒都会有大量的 TCP 连接的创建和销毁,这会造成系统 TCP 连接资源的浪费,很快就出遇到系统的性能瓶颈。

因此,引入了信道 channel 的概念,在一条认证过 TCP 的连接上面每秒可以创建成百上千个信道,而不会影响操作系统,并且,在一条 TCP 连接上面创建多少条信道是没有限制的。

业务代码调用

根据上面业务包装的代码,可以简单地实现向 MQ 的消息队列生产消息了

exchange = RabbitMQ.exchange("exchange_name", {durable: true, queue: :queue_name})
exchange.publish({site: :rubychina}.to_json, "your_routing_key")

消费者 (使用 Sneakers)

业务通过 Bunny 在 Rails 中简易、快速地生产发布了消息,就需要有消费者来接收和消费消息,Sneakers 是一个处理 RabbitMQ 消息队列的高性能 Ruby 框架

https://github.com/jondot/sneakers

Rails 集成 Sneakers

下面介绍怎么在 Rails 里面使用 Sneaker

1.添加 sneakers gem 包

# Gemfile
gem 'sneakers'

2.Rakefile 引用 seakers/tasks

#Rakefile
require 'sneakers/tasks'

3.配置 sneakers

# 新建 config/initializers/seakers.rb
opts = {
  daemonize: true,
  amqp: ENV['mq_connection'],
  log: "log/sneakers.log",
  pid_path: "tmp/pids/sneakers.pid",
  threads: 1,
  workers: 1,
}

Sneakers.configure(opts)
Sneakers.logger.level = Logger::INFO

4.添加 worker

# 新建文件 app/workers/galaxy_worker.rb
# Seaker 作为消费者,消费binding 为 galaxy.queue的队列
class GalaxyWorker
  include Sneakers::Worker
  # worker从galaxy.queue队列里面取数据
  from_queue "galaxy.queue", env: nil

  def work(msg)
    logger.info "GalaxyWorker::work msg #{msg}"
    # 给队列发送确认信息
    ack!
  end
end

5.启动 Sneaker 消费队列

WORKERS=GalaxyWorker rake sneakers:run

这样 sneaker 就以 GalaxyWorker 这个 worker 在后台接收和处理消息

样例展示

生产消息

启动 Rails 项目,在 console 里面执行

Loading development environment (Rails 6.0.1)
[1] pry(main)> exchange = RabbitMQ.exchange("print.exchange", {durable: true,queue: :print})
=> #<Bunny::Exchange:0x00007fb88fb0c6e8
 @arguments=nil,
 @auto_delete=nil,
 @bindings=#<Set: {}>,
 @channel=
  #<Bunny::Channel:70215330701360 @id=1 @connection=#<Bunny::Session:0x7fb88fae7348 deployer@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>> @open=true,
 @durable=true,
 @internal=nil,
 @name="print.exchange",
 @options={:queue=>:print, :nowait=>false, :durable=>true},
 @type=:direct>
[2] pry(main)> exchange.publish({site: :rubychina}.to_json,{routing_key: 'print.key'})
=> #<Bunny::Exchange:0x00007fb88fb0c6e8
 @arguments=nil,
 @auto_delete=nil,
 @bindings=#<Set: {}>,
 @channel=
  #<Bunny::Channel:70215330701360 @id=1 @connection=#<Bunny::Session:0x7fb88fae7348 deployer@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>> @open=true,
 @durable=true,
 @internal=nil,
 @name="print.exchange",
 @options={:queue=>:print, :nowait=>false, :durable=>true},
 @type=:direct>

消费消息

启动 Sneakers Worker

~/workspace/hesheng/galaxy % WORKERS=GalaxyWorker rake sneakers:run

查看 Seakers 日志

~/workspace/hesheng/galaxy % tail -f log/sneakers.log
018-03-14T02:35:01Z p-79500 t-oval8l8bg INFO: Heartbeat: running threads [7]
2018-03-14T02:35:06Z p-79500 t-ovam12vfo INFO: GalaxyWorker::work msg {"site":"rubychina"}

从日志来看,GalaxyWorker 成功收到了消息, {"site":"rubychina"},因此,拿到消息后,可以根据业务对消息进行相应的处理。

感谢作者的这句 # Seaker 作为消费者,消费 binding 为 galaxy.queue 的队列

需要 登录 后方可回复, 如果你还没有账号请 注册新账号