Rails Rails 中用 RabbitMQ 做消息队列 [译]

cisolarix · October 29, 2014 · Last by liurui12w replied at December 04, 2018 · 24997 hits
Topic has been selected as the excellent topic by the admin.

一年前,我曾写过我们的一个仪表板程序以及如何使用 Faye 延迟消息(现已打包成 gem)来解决性能问题。

从最开始,我们就选择了面向服务的架构,事实证明这也是我们做过的最棒的决定。套件(到写这篇文章时止已有 8 个应用)中的每一个程序都只负责业务的一部分。仪表板程序用来显示其他应用的信息给用户。

不过,这并不是全部。这个项目最重要的特性之一就是可以在应用之间导入、导出数据。而且,这要求在各应用之间互相提供读写接口,我们需要处理网状结构,而不仅是星型结构

特性与用户的同步增长导致了许多性能问题。应用间的连接都通过 HTTP API 以“我要某某某”这种方式进行,套件开始变得缓慢。

我们考虑过重新设计架构,减少依赖,但是由于每个应用的核心功能都是独立且精心设计过的,所以实际上能提升的空间很有限。并且,以后我们还可能将一些功能抽出来,变成新的应用。

一个武断的解决方法是:

我们只要采用 HTTP 缓存就可以了 其实不然。

HTTP cache 的最大问题就是最大生命周期。 系统中有两种场景下使用缓存:

  • 用户执行了某种操作,跳到其它的应用中且希望能看到更新之后的数据
  • 用户来到仪表板,希望看到数据的当前状态(这些状态可能已经数天未变了)

几乎没法用定长时间来确定资源需要被缓存多久。如果时间太短,请求就会太多,尽管数据可能压根没变过。如果时间太长,我们可能会收到成堆的用户邮件,抱怨系统没有运行,其实只是因为他们看到的是未及时更新的数据。

缓存的出发点是为了提升性能,不是为了破坏用户体验。

那么我们怎么办?

答案是:回到架构

消息传递是解决之道

面向对象的编程原则推崇 告诉,而不是问 的模式。如果我们将之引申到架构的高度,解决思路就自然出现了。

无需让消费者每次生产者要资源,我们让生产者告诉消费者发生了什么变化。

废话少说,我们上手实战用 RabbitMQ 消息系统来代替老旧的 HTTP API 吧。

RabbitMQ 是什么

RabbitMQ 是实现了 AMQP 协议的开源消息中介、队列系统,由 Erlang 写就。信息标签包括:

管用的消息系统

这一点我发现是确实没错。

要了解 RabbitMQ 能做什么,我们得看看示例拓扑图

我们将采用发布/订阅拓扑,并采用多个 fanout 交换与队列。

安装 RabbitMQ

这个部分因操作系统而异。RabbitMQ 的官网有许多指南供参考。 如果恰巧你的系统是 Mac OS X,且安装了 Homebrew,你只需:

brew install rabbitmq

用以下命令启动:

/usr/local/opt/rabbitmq/sbin/rabbitmq-server

启动 rabbitmq-server 之后,就可以通过http://localhost:15672 来访问管理界面。默认用户是 guest,密码也是 guest。管理界面很有用。后面的小节我们会展开涉及一些。

简单的架构:博客的仪表板

想像两个应用:一个带博文的博客应用,一个显示最近 5 条博文的仪表板程序。仪表板应用无需向博客应用的 HTTP API 索要最近的博文,我们让博客应用主动说出每一个最新发布的博文。

上图中:

  • Blog - 用 SQL 数据库的典型 Rails 应用
  • P - RabbitMQ 生产者
  • X - RabbitMQ 交换
  • Queue - RabbitMQ 队列
  • C - RabbitMQ 消费者
  • Dashboard - 用 Redis 的 Rails 应用

博客应用中的博文在创建后,会让生产者向交换发出一条消息。交换将消息放入队列。然后,连接到此队列的消费者会抓取这条消息,并更新仪表板的 Redis 缓存。 尽管听起来有点复杂,但借助强大的 Ruby 库,我们其实只需很少的工作。

博文发布者

首先,创建基本的博文脚手架

rails new blog
cd blog
bundle
rails generate scaffold post title:string body:text
rake db:migrate
rails server

管理博文的地址在: http://localhost:3000/posts

现在,我们需创建一个 RabbitMQ 生产者。我们称之为发布者。我们将使用 bunny 这个超易用的 RabbitMQ Ruby 客户端。

Gemfile 中加入以下代码:

# blog/Gemfile
gem "bunny"

并运行 bundle install. 下面实现我们的发布者:

# blog/app/services/publisher.rb
class Publisher
  # In order to publish message we need a exchange name.
  # Note that RabbitMQ does not care about the payload -
  # we will be using JSON-encoded strings
  def self.publish(exchange, message = {})
    # grab the fanout exchange
    x = channel.fanout("blog.#{exchange}")
    # and simply publish message
    x.publish(message.to_json)
  end

  def self.channel
    @channel ||= connection.create_channel
  end

  # We are using default settings here
  # The `Bunny.new(...)` is a place to
  # put any specific RabbitMQ settings
  # like host or port
  def self.connection
    @connection ||= Bunny.new.tap do |c|
      c.start
    end
  end
end

现在,每条博文创建后,我们都要调用 Publisher.publish

# blog/app/controllers/posts_controller.rb
class PostsController < ApplicationController
  # ...

  def create
    @post = Post.new(post_params)

    if @post.save
      # Publish post data
      Publisher.publish("posts", @post.attributes)

      redirect_to @post, notice: 'Post was successfully created.'
    else
      render :new
    end
  end

  # ...
end

就是这样。 进行下面的操作前,别忘了重启 Rails 服务器。 现在可以创建一条新的博文,来到 RabbitMQ 管理界面,选择“Exchanges”, 选择 blog.posts,可以看到下图:

往下滚动页面,我们可以发现此交换没有绑定。

也就是发送到此交换的消息没有去往任何地方。

现在我们需在交换与仪表板应用间建立一个队列,以便消费者更新其本地缓存。

需要留心持久性: RabbitMQ 发布/订阅示例中使用了按需创建的随机队列。这种方案在某些情况下很好,但是在我们这里并不很适用。假使出于未知原因,我们的仪表板应用挂掉了,这个临时队列就会被删除,其中发自博客应用的消息就再也不会到达仪表板应用了。这就是为什么我们需要一个静态的可持续的队列来保存消息,以便仪表板应用断线并重新连接后也能收到之前的所有消息。

仪表板消费者

如果你比较熟悉 SidekiqResque,下面的内容就很容易。 还有一个特别棒的 Ruby 库用来处理 RabbitMQ 消息队列里的消息。这个工具由 @jondot 开发,名叫 sneakers作者博客)。

我们来创建仪表板应用:

rails new dashboard
cd dashboard

添加一些 gems:

# dashboard/Gemfile
gem 'redis-rails'
gem 'redis-namespace'
gem 'sneakers'

运行 bundle install. Redis 和 sneakers 都需要设置一番:

设置 Redis

# dashboard/config/initializers/redis.rb
$redis = Redis::Namespace.new("dashboard:#{Rails.env}", redis: Redis.new)

设置 Sneakers

# dashboard/Rakefile

# load sneakers tasks
require 'sneakers/tasks'

Rails.application.load_tasks
# dashboard/config/initializers/sneakers.rb
Sneakers.configure({})
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy

最近博文服务

既然我们不用 ActiveRecord,我们需要地方来保存涉及最近博文的功能。我们创建一个叫 RecentPosts 的服务。

# app/services/recent_posts.rb
class RecentPosts
  KEY = "recent_posts" # redis key
  STORE_LIMIT = 5      # how many posts should be kept

  # Get list of recent posts from redis
  # Since redis stores data in binary text format
  # we need to parse each list item as JSON
  def self.list(limit = STORE_LIMIT)
    $redis.lrange(KEY, 0, limit-1).map do |raw_post|
      JSON.parse(raw_post).with_indifferent_access
    end
  end

  # Push new post to list and trim it's size
  # to limit required storage space
  # `raw_post` is already a JSON string
  # so there is no need to encode it as JSON
  def self.push(raw_post)
    $redis.lpush(KEY, raw_post)
    $redis.ltrim(KEY, 0, STORE_LIMIT-1)
  end
end

仪表板视图

仪表板需要视图来查看是否工作正常。

# dashboard/app/controllers/home_controller.rb
class HomeController < ApplicationController
  def index
    @posts = RecentPosts.list
  end
end
# dashboard/app/views/home/index.html.erb
<h2>Recently updated posts</h2>

<table>
  <thead>
    <tr>
      <th>Title</th>
    </tr>
  </thead>

  <tbody>
    <% @posts.each do |post| %>
      <tr>
        <td><%= post[:title] %></td>
      </tr>
    <% end %>
  </tbody>
</table>
# dashboard/config/routes.rb
Rails.application.routes.draw do
  root to: "home#index"
end

工人

最后,我们来建立 sneakers 工人。你可能已经注意到了,这与 sidekiq 的工人很像。

# dashboard/app/workers/posts_worker.rb
class PostsWorker
  include Sneakers::Worker
  # This worker will connect to "dashboard.posts" queue
  # env is set to nil since by default the actuall queue name would be
  # "dashboard.posts_development"
  from_queue "dashboard.posts", env: nil

  # work method receives message payload in raw format
  # in our case it is JSON encoded string
  # which we can pass to RecentPosts service without
  # changes
  def work(raw_post)
    RecentPosts.push(raw_post)
    ack! # we need to let queue know that message was received
  end
end

就是这样,仪表板应用已经就绪。 要启用工人,运行:

WORKERS=PostsWorker rake sneakers:run

RabbitMQ 管理页面,我们可以看到 dashboard.posts 队列已经创建

现在,如果你新发布一条博文的话,在 blog.posts 交换中就会出现一条消息,但 dashboard.posts 队列依然是空的。 为什么呢?因为我们需要在交换与队列间建立一个绑定。

全部联系到一起

我们需责成 blog.posts 交换将收到的消息发送给 dashboard.posts 队列。尽管这个操作可以在 RabbitMQ 管理页面完成,但是我们最好将此操作做成可被自动执行(比如,部署时自动执行)的配置文件的形式。 我们还是使用 bunny 库:

# config/Rakefile
namespace :rabbitmq do
  desc "Setup routing"
  task :setup do
    require "bunny"

    conn = Bunny.new
    conn.start

    ch = conn.create_channel

    # get or create exchange
    x = ch.fanout("blog.posts")

    # get or create queue (note the durable setting)
    queue = ch.queue("dashboard.posts", durable: true)

    # bind queue to exchange
    queue.bind("blog.posts")

    conn.close
  end
end

运行:

rake rabbitmq:setup

RabbitMQ 管理页面查看 blog.posts 交换的绑定。

这下,每个新建的博文,都会被作为消息发送到 blog.posts 交换中去,然后路由到 dashboard.posts 队列,被 PostsWorker 读取到,再由 RecentPosts 服务放进 redis。

复杂架构

上面的示例演示了如何连接两个应用。现实世界里,应用间的连接可就复杂多了。请看下图:

与上一示例比,有如下不同:

  • Blog 现在发布多条信息到多个交换
  • Dashboard 从多个队列获取消息
  • Admin 另外一个应用,同时兼任生产者与消费者
  • Exchange ↔ Queue 绑定复杂多了 - 可能是一对多(blog.posts)或多对一(*.page_views
  • Admin 用 SQL 取代了 redis 的消费者应用

我们过一遍其中几个点

缓存存储

为缓存选择合适的存储是个很宽泛的话题。我只举几个例子。

最近博文

在仪表板里,我们只关心最新的五条博文。这很适合用 redis 列表。我们用 LPUSH 将新条目原子性地放到列表的前端,用 LRANGE 来获取最先的几个条目,用 LTRIM 限制存储。无需做任何排序或过滤。Redis 够用了。

页面访问统计

另一个使用 redis 的地方是获取页面访问统计。用 INCR, INCRBYHINCR,很容易原子性地增长计数器,而无需顾虑竞争条件。猛击这儿来看示例。

带过滤与排序的结构化缓存

当然,redis 也不是银弹。有时我们也需要对缓存进行过滤或排序。这种情况下,我们可以创建一个普通的 SQL 数据库模型,并在其中存储必要的信息。见 Admin 应用中存储博文的示例,用到了 Blog::Post 模型与 Blog::PostsWorker

Exchange ↔ Queue 绑定

复杂的架构需要复杂的绑定。幸运的是,我们可以用上面示例中的方法来解决。 我们只需修改下启动任务:

# config/Rakefile
namespace :rabbitmq do
  desc "Setup routing"
  task :setup do
    require "bunny"

    conn = Bunny.new
    conn.start

    ch = conn.create_channel

    # connect one exchange to multiple queues
    x = ch.fanout("blog.posts")
    ch.queue("dashboard.posts", durable: true).bind("blog.posts")
    ch.queue("admin.posts", durable: true).bind("blog.posts")

    # connect mutliple exchanges to the same queue
    x = ch.fanout("admin.page_views")
    ch.queue("dashboard.page_views", durable: true).bind("admin.page_views")

    x = ch.fanout("blog.page_views")
    ch.queue("dashboard.page_views", durable: true).bind("blog.page_views")

    conn.close
  end
end

结语

RabbitMQ 处理复杂的面向服务的系统有很大的潜能。用可持续的队列,我们可以在一部分应用挂掉的时候,依然可以保持数据的一致性。对于仪表板式的应用,用户依然可以访问缓存过的数据,即使下层应用挂掉的时候。

与不断地访问 API 相比,消息传递性能提升明显。速度,持续性与可用性都有提高。理论上,我们放弃一些临时的一致性和实时数据更新。然而大多数情况下,我们发现用户在应用间访问时,缓存已经得到更新,最新的数据也是可用的。

可用的示例程序托管在

非常棒的文章,还没来得及细看

有个不太理解的问题望赐教,这个队列可以用例如 Sidekiq,或者 Redis 的 PubSub 来解决,用 RabbitMQ 的优势在哪

#1 楼 @sanivbyfish 这也就是我翻译这篇文章的目的所在,抛砖引玉

已保存到 pocket

#6 楼 @limkurn 谢谢,链接里将两者的区别讲得很清楚。

#1 楼 @sanivbyfish MQ 相对 redis 来说,提供的功能更多点

碉堡!!晚上来拜读。

牛 X!!!!

save to pocket +1 😄

涨知识啦,值得实践一下~~👍

没有细看,不错

#16 楼 @xiaohuxu 请看 6 楼贴的链接

18 Floor has deleted
19 Floor has deleted

#6 楼 感谢 @limkurn 理解了

请教这个图是用什么画的

原文在哪?

最近刚好想学学消息队列方面的知识结构,介绍得灰常详细,赞赞~

不明觉厉!最近有做兼职或者跳槽的打算么

#25 楼 @eryu 可以邮件进一步沟通

#26 楼 @cisolarix 好滴,可以加我的 Q2369410907

ruby conf 前,我表正看你的这个帖子,若知道是我面前这位写的。靠,立马问问。。。

看完了。写的很赞。有个小问题,rabbitmq:setup 不是写在 config/Rakefile 里的吧,应该写在 lib/task/*.rake 里面。

#29 楼 @sickate 谢谢你的建议,还是保留原文,希望有疑问的人能看到你的回帖

您好 请问你的这些图是用什么软件或者网站画的啊?

#31 楼 @dplord 图是原作者画的

bunny 或 sneakers 的 publish 能否定期?比如 5min 后 插入到交换区

写的真好。谢谢。正好本人也在搞 rabbitmq,hutch 不建议使用,消费者用 sneakers,Publisher 直接用本文介绍。

@cisolarix 每次调用 self.connection,不是会新建 connection 么,没看到 close,不会到之连接过多么


类方法,不会的,我又错了

#35 楼 @as181920 你好。我也很关注这个问题。@connection.close没有。

#36 楼 @gazeldx 类方法,加载类的时候就新建一个 connection,本来就要的。程序停止后,connection 也会被关闭掉。期间断线会自动重连,可设置 heartbeat 参数。 queue 要清理的话,在建 queue 的时候有个 auto_delete 的配置项。

据说 RabbitMQ 吞吐量低,楼主怎么看?

需要留心持久性: RabbitMQ 发布/订阅示例中使用了按需创建的随机队列。这种方案在某些情况下很好,但是在我们这里并不很适用。假使出于未知原因,我们的仪表板应用挂掉了,这个临时队列就会被删除,其中发自博客应用的消息就再也不会到达仪表板应用了。这就是为什么我们需要一个静态的可持续的队列来保存消息,以便仪表板应用断线并重新连接后也能收到之前的所有消息。

楼主我对这里不是很明白,在我的项目里面,我的 exchange 附带了参数 durable:true,还有 queue 也附带了 durable:true 让 RabbitMQ 做了持久化,这种情况下我 down 了多个订阅应用以及尝试 down 了 RabbitMQ 本身,但是我重启的时候还是能收到消息,不知道楼主说的临时队列是不是指的是这个。

#40 楼 @jason_wu 应该是这个意思。本文确实不错。sneakers 不错,我用的是 standalone 的方式,没和 rails 整合在一起。

请大家注意,bunny 连 rabbitMQ 的 gem 可能存在版本问题,比如我就遇到了用 bunny1.7.0 无法连现有的 rabbitMQ。 如果你遇到了同样的问题,请在 Gemfile 中加入:gem 'bunny', '>= 2.2.1'试试看。

#42 楼 @gazeldx 手上有运行中的程序,是基于 1.7.0 的,连接 ok。记得配置参数 key 是 sym/string,其中有个不支持,可以在命令行返回信息中查看详细。

请问,队列消息存在 unique 机制吗?或者提供 queue.query(message) API 吗? @cisolarix

👍 照着实践了一遍。谢谢 @cisolarix .

@cisolarix bunny 客户端运行一段时间后,连接 error: Bunny::ConnectionClosedError -> Trying to send frame through a closed connection. Frame is ..., method class is AMQ::Protocol::Basic::Publish

是 rescue 后怎么 reconnect?有遇到类似问题么?

#46 楼 @as181920 你用的是 bunny 1.6.3?低版本 bunny 存在这个 bug,见: https://github.com/ruby-amqp/bunny/issues/240 我也遇见了,我升级到 bunny 2.2.1 后没有这个问题了。

#47 楼 @gazeldx 之前 bunny 从 1.x 升级到 2.x 没有解决问题,倒是 rabbitmq server 版本升级后没再出现这个问题。 但新出现了 SubscribeEventPublisher publish error: AMQ::Protocol::EmptyResponseError -> Empty response received from the server.这个错误,还没来得及仔细查。

打算出异常时,重建 connection/channel/exchange,只是代码还没想好怎么写。

#48 楼 @as181920 bunny2.2.1 要和 rabbitmq 3.3+ 配合使用,就没有问题。

在 dashboard 如何实时监听数据数据更新并展示到页面。。。。。 x = ch.fanout("admin.page_views") ch.queue("dashboard.page_views", durable: true).bind("admin.page_views")

x = ch.fanout("blog.page_views") ch.queue("dashboard.page_views", durable: true).bind("blog.page_views") 这段话没明白。作用时干嘛的。

RabbitMQ 吃内存多吗?官网推荐最少 128M 内存,不会做 profile,有没有替代品?

不错,参考学习中

我最近也在做 rabbitmq 的调用问题;有时慢有时快,找不到原因

管理界面需要手动启用才有 sudo rabbitmq-plugins enable rabbitmq_management

您好 看完了还是不太明白,Exchange ↔ Queue 绑定 这个一定要绑定才可以用吗,我可不可以只用 rabbitMQ 的队列,做成类似的异步任务

You need to Sign in before reply, if you don't have an account, please Sign up first.