Rails Rails 集成 RabbitMQ 消息队列实践

luolinae86 · 2018年03月14日 · 2652 次阅读

什么是消息队列,什么是RabbitMQ,重点推荐以下书籍,能够让你真正理解消息队列的发展历史,以及RabbitMQ的方方面面。

参考文献

《RabbitMQ实战》高效部署分布式消息队列 美 Alvaro Videla / Jason J.W William 著 汪佳南 译
购买链接: https://item.jd.com/11790530.html

下面,我主要介绍一下RabbitMQ server 以及 Client端的安装配置,以及在Ruby On Rails 中利用Bunny发布消息,用Sneakers消费消息的详细介绍及使用说明,希望对正在使用RabbitMQ以及即将使用RabbitMQ的朋友一些参考。

RabbitMQ

安装

Ubuntu 环境下的安装方法

#更新包
sudo apt-get update
#安装
sudo apt-get install rabbitmq-server
#启动rabbit-server 服务默认启动在5672端口
sudo rabbitmq-server start

启用Rabbitmq_management插件

#需要从web上面管理控制RabbitMQ
sudo rabbitmq-plugins enable rabbitmq_management
#重启服务以让其生效,控制台默认运行在15672端口
sudo rabbitmqctl stop
sudo rabbitmq-server -detached

创建用户

#创建用户
rabbitmqctl  add_user user_name password
#给权限
rabbitmqctl set_user_tags user_name administrator
rabbitmqctl set_permissions -p / user_name ".*" ".*" ".*"

访问 http://rabbitmq-server-ip:15672user_namepassword登陆web控制台

生产者 (使用 Bunny)

Bunny是一个使用Ruby连接和操作RabbitMQ的客户端,参考

https://github.com/ruby-amqp/bunny

其中,Bunny的标准使用,可以参考Bunny官方Gem使用手册,按步骤:

  • 创建一条和RabbitMQ消息服务器的真实TCP连接
  • 在TCP链接上面创建AMQP信道(channel)
  • 在信道上面申明队列
  • 队列将消息发布到交换器

官方代码示例(实际生产环境中使用,需要包装代码,后面会有详细说明原因

require "bunny"

# Start a communication session with RabbitMQ
conn = Bunny.new
conn.start

# open a channel
ch = conn.create_channel

# declare a queue
q  = ch.queue("test1")

# publish a message to the default exchange which then gets routed to this queue
q.publish("Hello, everybody!")

# fetch a message from the queue
delivery_info, metadata, payload = q.pop

puts "This is the message: #{payload}"

# close the connection
conn.stop

业务代码包装

新建config/initializers/rabbitmq.rb文件,代码如下

class RabbitMQ
  #注意下面名为galaxy(根据自己业务起名字)的Exchange是提前在web控制台上面创建好的,并和名为galaxy.queue的队列做好
  了类型为direct的绑定,这样exchangegalaxy的消息,就会发送到名为galaxy.queuequeue中,提供给后续的seakers消费
  EXCHANGE = "galaxy"
  TYPE = "direct"
  DURABLE = true
  class << self
    #全局创建一个单例@conn,这样一个应用只会和RabbmitMQ服务器建立一条TCP连接
    #这里的ENV['MQ_PD_CNN'],配置在环境变量里面,例如: amqp://user_name:password@rabbitmq_server_ip
    def connection
      @conn ||= Bunny.new(ENV['MQ_PD_CNN']).start
    end

    # 一个TCP connnection上面可以创建多个信道channel
    def channel
      @ch = connection.create_channel
    end

    def publish(exchange,message={})
      exchange.publish(message)
    end
  end
end

下面说一下为什么一个应用创建一个TCP链接(connection),多个信道(channel)。

首先,我们必须先连接到RabbitMQ,才能消费或者发布消息。应用程序和Rabbit代理服务器之间创建一条TCP连接。一旦TCP连接打开(通过了认证),应用程序就可以创建一条AMQP信道。信道是建立在“真实的”TCP连接内的虚拟连接。不论是发布消息、订阅消息还是接收消息,都是通过信道完成的。

为什么用信道而不是直接通过TCP连接发送AMQP命令呢?主要原因是频繁地建立和销毁TCP,操作系统的开销非常昂贵。比如,当业务高峰的时候,那么每秒都会有大量的TCP连接的创建和销毁,这会造成系统TCP连接资源的浪费,很快就出遇到系统的性能瓶颈。

因此,引入了信道channel的概念,在一条认证过TCP的连接上面每秒可以创建成百上千个信道,而不会影响操作系统,并且,在一条TCP连接上面创建多少条信道是没有限制的。

业务代码调用

根据上面业务包装的代码,可以简单地实现向MQ的消息队列生产消息了

conn = RabbitMQ.connection
ch = RabbitMQ.channel
exchange = ch.direct("galaxy",:durable => true)
exchange.publish({site: :rubychina}.to_json)
ch.close

消费者 (使用 Sneakers)

业务通过Bunny在Rails中简易、快速地生产发布了消息,就需要有消费者来接收和消费消息,Sneakers是一个处理RabbitMQ消息队列的高性能Ruby框架

https://github.com/jondot/sneakers

Rails 集成 Sneakers

下面介绍怎么在Rails里面使用Sneaker

1.添加sneakers gem包

# Gemfile
gem 'sneakers'

2.Rakefile 引用 seakers/tasks

#Rakefile
require 'sneakers/tasks'

3.配置sneakers

# 新建 config/initializers/seakers.rb
opts = {
  daemonize: true,
  amqp: ENV['MQ_PD_CNN'],
  log: "log/sneakers.log",
  pid_path: "tmp/pids/sneakers.pid",
  threads: 1,
  workers: 1,
}

Sneakers.configure(opts)
Sneakers.logger.level = Logger::INFO

4.添加worker

# 新建文件 app/workers/galaxy_worker.rb
# Seaker 作为消费者,消费binding 为 galaxy.queue的队列
class GalaxyWorker
  include Sneakers::Worker
  # worker从galaxy.queue队列里面取数据
  from_queue "galaxy.queue", env: nil

  def work(msg)
    logger.info "GalaxyWorker::work msg #{msg}"
    # 给队列发送确认信息
    ack!
  end
end

5.启动Sneaker消费队列

WORKERS=GalaxyWorker rake sneakers:run

这样sneaker就以GalaxyWorker这个worker在后台接收和处理消息

样例展示

生产消息

启动Rails 项目,在console里面执行

Loading development environment (Rails 5.0.2)
2.3.0 :001 > conn = RabbitMQ.connection
 => #<Bunny::Session:0x7fcab3399240 user_name@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>
2.3.0 :002 > ch = RabbitMQ.channel
 => #<Bunny::Channel:70254257290940 @id=1 @connection=#<Bunny::Session:0x7fcab3399240 user_name@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>>
2.3.0 :003 > exchange = ch.direct("galaxy",:durable => true)
 => #<Bunny::Exchange:0x007fcab114b918 @channel=#<Bunny::Channel:70254257290940 @id=1 @connection=#<Bunny::Session:0x7fcab3399240 user_name@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>>, @name="galaxy", @type=:direct, @options={:queue=>"galaxy", :nowait=>false, :durable=>true}, @durable=true, @auto_delete=nil, @internal=nil, @arguments=nil>
2.3.0 :004 > exchange.publish({site: :rubychina}.to_json)
 => #<Bunny::Exchange:0x007fcab114b918 @channel=#<Bunny::Channel:70254257290940 @id=1 @connection=#<Bunny::Session:0x7fcab3399240 user_name@*.*.*.*:5672, vhost=/, addresses=[*.*.*.*:5672]>>, @name="galaxy", @type=:direct, @options={:queue=>"galaxy", :nowait=>false, :durable=>true}, @durable=true, @auto_delete=nil, @internal=nil, @arguments=nil>
2.3.0 :005 > ch.close

消费消息

启动Sneakers Worker

~/workspace/hesheng/galaxy % WORKERS=GalaxyWorker rake sneakers:run

查看Seakers日志

~/workspace/hesheng/galaxy % tail -f log/sneakers.log
018-03-14T02:35:01Z p-79500 t-oval8l8bg INFO: Heartbeat: running threads [7]
2018-03-14T02:35:06Z p-79500 t-ovam12vfo INFO: GalaxyWorker::work msg {"site":"rubychina"}

从日志来看,GalaxyWorker成功收到了消息, {"site":"rubychina"},因此,拿到消息后,可以根据业务对消息进行相应的处理。

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册