一年前,我曾写过我们的一个仪表板程序以及如何使用 Faye 延迟消息(现已打包成 gem)来解决性能问题。
从最开始,我们就选择了面向服务的架构,事实证明这也是我们做过的最棒的决定。套件(到写这篇文章时止已有 8 个应用)中的每一个程序都只负责业务的一部分。仪表板程序用来显示其他应用的信息给用户。
不过,这并不是全部。这个项目最重要的特性之一就是可以在应用之间导入、导出数据。而且,这要求在各应用之间互相提供读写接口,我们需要处理网状结构,而不仅是星型结构。
特性与用户的同步增长导致了许多性能问题。应用间的连接都通过 HTTP API 以“我要某某某”这种方式进行,套件开始变得缓慢。
我们考虑过重新设计架构,减少依赖,但是由于每个应用的核心功能都是独立且精心设计过的,所以实际上能提升的空间很有限。并且,以后我们还可能将一些功能抽出来,变成新的应用。
一个武断的解决方法是:
我们只要采用 HTTP 缓存就可以了 其实不然。
HTTP cache 的最大问题就是最大生命周期
。
系统中有两种场景下使用缓存:
几乎没法用定长时间来确定资源需要被缓存多久。如果时间太短,请求就会太多,尽管数据可能压根没变过。如果时间太长,我们可能会收到成堆的用户邮件,抱怨系统没有运行,其实只是因为他们看到的是未及时更新的数据。
缓存的出发点是为了提升性能,不是为了破坏用户体验。
那么我们该
怎么办?
答案是:回到架构
。
面向对象的编程原则推崇 告诉,而不是问 的模式。如果我们将之引申到架构的高度,解决思路就自然出现了。
无需让消费者每次问
生产者要资源,我们让生产者告诉消费者发生了什么变化。
废话少说,我们上手实战用 RabbitMQ 消息系统来代替老旧的 HTTP API 吧。
RabbitMQ 是实现了 AMQP 协议的开源消息中介、队列系统,由 Erlang 写就。信息标签包括:
管用的消息系统
这一点我发现是确实没错。
要了解 RabbitMQ 能做什么,我们得看看示例拓扑图。
我们将采用发布/订阅拓扑,并采用多个 fanout 交换与队列。
这个部分因操作系统而异。RabbitMQ 的官网有许多指南供参考。 如果恰巧你的系统是 Mac OS X,且安装了 Homebrew,你只需:
brew install rabbitmq
用以下命令启动:
/usr/local/opt/rabbitmq/sbin/rabbitmq-server
启动 rabbitmq-server
之后,就可以通过http://localhost:15672 来访问管理界面。默认用户是 guest
,密码也是 guest
。管理界面很有用。后面的小节我们会展开涉及一些。
想像两个应用:一个带博文的博客应用,一个显示最近 5 条博文的仪表板程序。仪表板应用无需向博客应用的 HTTP API 索要
最近的博文,我们让博客应用主动说出
每一个最新发布的博文。
上图中:
Blog
- 用 SQL 数据库的典型 Rails 应用P
- RabbitMQ 生产者X
- RabbitMQ 交换Queue
- RabbitMQ 队列C
- RabbitMQ 消费者Dashboard
- 用 Redis 的 Rails 应用博客应用中的博文在创建后,会让生产者向交换发出一条消息。交换将消息放入队列。然后,连接到此队列的消费者会抓取这条消息,并更新仪表板的 Redis 缓存。 尽管听起来有点复杂,但借助强大的 Ruby 库,我们其实只需很少的工作。
首先,创建基本的博文脚手架
rails new blog
cd blog
bundle
rails generate scaffold post title:string body:text
rake db:migrate
rails server
管理博文的地址在: http://localhost:3000/posts
现在,我们需创建一个 RabbitMQ 生产者。我们称之为发布者。我们将使用 bunny 这个超易用的 RabbitMQ Ruby 客户端。
Gemfile
中加入以下代码:
# blog/Gemfile
gem "bunny"
并运行 bundle install
.
下面实现我们的发布者:
# blog/app/services/publisher.rb
class Publisher
# In order to publish message we need a exchange name.
# Note that RabbitMQ does not care about the payload -
# we will be using JSON-encoded strings
def self.publish(exchange, message = {})
# grab the fanout exchange
x = channel.fanout("blog.#{exchange}")
# and simply publish message
x.publish(message.to_json)
end
def self.channel
@channel ||= connection.create_channel
end
# We are using default settings here
# The `Bunny.new(...)` is a place to
# put any specific RabbitMQ settings
# like host or port
def self.connection
@connection ||= Bunny.new.tap do |c|
c.start
end
end
end
现在,每条博文创建后,我们都要调用 Publisher.publish
。
# blog/app/controllers/posts_controller.rb
class PostsController < ApplicationController
# ...
def create
@post = Post.new(post_params)
if @post.save
# Publish post data
Publisher.publish("posts", @post.attributes)
redirect_to @post, notice: 'Post was successfully created.'
else
render :new
end
end
# ...
end
就是这样。
进行下面的操作前,别忘了重启 Rails 服务器。
现在可以创建一条新的博文,来到 RabbitMQ 管理界面,选择“Exchanges”, 选择 blog.posts
,可以看到下图:
往下滚动页面,我们可以发现此交换没有绑定。
也就是发送到此交换的消息没有去往任何地方。
现在我们需在交换与仪表板应用间建立一个队列,以便消费者更新其本地缓存。
需要留心持久性: RabbitMQ 发布/订阅示例中使用了按需创建的随机队列。这种方案在某些情况下很好,但是在我们这里并不很适用。假使出于未知原因,我们的仪表板应用挂掉了,这个临时队列就会被删除,其中发自博客应用的消息就再也不会到达仪表板应用了。这就是为什么我们需要一个静态的可持续的队列来保存消息,以便仪表板应用断线并重新连接后也能收到之前的所有消息。
如果你比较熟悉 Sidekiq 或 Resque,下面的内容就很容易。 还有一个特别棒的 Ruby 库用来处理 RabbitMQ 消息队列里的消息。这个工具由 @jondot 开发,名叫 sneakers(作者博客)。
我们来创建仪表板应用:
rails new dashboard
cd dashboard
添加一些 gems:
# dashboard/Gemfile
gem 'redis-rails'
gem 'redis-namespace'
gem 'sneakers'
运行 bundle install
.
Redis 和 sneakers 都需要设置一番:
# dashboard/config/initializers/redis.rb
$redis = Redis::Namespace.new("dashboard:#{Rails.env}", redis: Redis.new)
# dashboard/Rakefile
# load sneakers tasks
require 'sneakers/tasks'
Rails.application.load_tasks
# dashboard/config/initializers/sneakers.rb
Sneakers.configure({})
Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy
既然我们不用 ActiveRecord,我们需要地方来保存涉及最近博文的功能。我们创建一个叫 RecentPosts
的服务。
# app/services/recent_posts.rb
class RecentPosts
KEY = "recent_posts" # redis key
STORE_LIMIT = 5 # how many posts should be kept
# Get list of recent posts from redis
# Since redis stores data in binary text format
# we need to parse each list item as JSON
def self.list(limit = STORE_LIMIT)
$redis.lrange(KEY, 0, limit-1).map do |raw_post|
JSON.parse(raw_post).with_indifferent_access
end
end
# Push new post to list and trim it's size
# to limit required storage space
# `raw_post` is already a JSON string
# so there is no need to encode it as JSON
def self.push(raw_post)
$redis.lpush(KEY, raw_post)
$redis.ltrim(KEY, 0, STORE_LIMIT-1)
end
end
仪表板需要视图来查看是否工作正常。
# dashboard/app/controllers/home_controller.rb
class HomeController < ApplicationController
def index
@posts = RecentPosts.list
end
end
# dashboard/app/views/home/index.html.erb
<h2>Recently updated posts</h2>
<table>
<thead>
<tr>
<th>Title</th>
</tr>
</thead>
<tbody>
<% @posts.each do |post| %>
<tr>
<td><%= post[:title] %></td>
</tr>
<% end %>
</tbody>
</table>
# dashboard/config/routes.rb
Rails.application.routes.draw do
root to: "home#index"
end
最后,我们来建立 sneakers 工人。你可能已经注意到了,这与 sidekiq 的工人很像。
# dashboard/app/workers/posts_worker.rb
class PostsWorker
include Sneakers::Worker
# This worker will connect to "dashboard.posts" queue
# env is set to nil since by default the actuall queue name would be
# "dashboard.posts_development"
from_queue "dashboard.posts", env: nil
# work method receives message payload in raw format
# in our case it is JSON encoded string
# which we can pass to RecentPosts service without
# changes
def work(raw_post)
RecentPosts.push(raw_post)
ack! # we need to let queue know that message was received
end
end
就是这样,仪表板应用已经就绪。 要启用工人,运行:
WORKERS=PostsWorker rake sneakers:run
在 RabbitMQ 管理页面,我们可以看到 dashboard.posts
队列已经创建
现在,如果你新发布一条博文的话,在 blog.posts
交换中就会出现一条消息,但 dashboard.posts
队列依然是空的。
为什么呢?因为我们需要在交换与队列间建立一个绑定。
我们需责成 blog.posts
交换将收到的消息发送给 dashboard.posts
队列。尽管这个操作可以在 RabbitMQ 管理页面完成,但是我们最好将此操作做成可被自动执行(比如,部署时自动执行)的配置文件的形式。
我们还是使用 bunny 库:
# config/Rakefile
namespace :rabbitmq do
desc "Setup routing"
task :setup do
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
# get or create exchange
x = ch.fanout("blog.posts")
# get or create queue (note the durable setting)
queue = ch.queue("dashboard.posts", durable: true)
# bind queue to exchange
queue.bind("blog.posts")
conn.close
end
end
运行:
rake rabbitmq:setup
在 RabbitMQ 管理页面查看 blog.posts
交换的绑定。
这下,每个新建的博文
,都会被作为消息发送到 blog.posts
交换中去,然后路由到 dashboard.posts
队列,被 PostsWorker
读取到,再由 RecentPosts
服务放进 redis。
上面的示例演示了如何连接两个应用。现实世界里,应用间的连接可就复杂多了。请看下图:
与上一示例比,有如下不同:
Blog
现在发布多条信息到多个交换Dashboard
从多个队列获取消息Admin
另外一个应用,同时兼任生产者与消费者blog.posts
)或多对一(*.page_views
)Admin
用 SQL 取代了 redis 的消费者应用我们过一遍其中几个点
为缓存选择合适的存储是个很宽泛的话题。我只举几个例子。
在仪表板里,我们只关心最新的五条博文。这很适合用 redis 列表。我们用 LPUSH
将新条目原子性地放到列表的前端,用 LRANGE
来获取最先的几个条目,用 LTRIM
限制存储。无需做任何排序或过滤。Redis 够用了。
另一个使用 redis 的地方是获取页面访问统计。用 INCR
, INCRBY
或 HINCR
,很容易原子性地增长计数器,而无需顾虑竞争条件。猛击这儿来看示例。
当然,redis 也不是银弹。有时我们也需要对缓存进行过滤或排序。这种情况下,我们可以创建一个普通的 SQL 数据库模型,并在其中存储必要的信息。见 Admin 应用中存储博文的示例,用到了 Blog::Post 模型与 Blog::PostsWorker。
复杂的架构需要复杂的绑定。幸运的是,我们可以用上面示例中的方法来解决。 我们只需修改下启动任务:
# config/Rakefile
namespace :rabbitmq do
desc "Setup routing"
task :setup do
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
# connect one exchange to multiple queues
x = ch.fanout("blog.posts")
ch.queue("dashboard.posts", durable: true).bind("blog.posts")
ch.queue("admin.posts", durable: true).bind("blog.posts")
# connect mutliple exchanges to the same queue
x = ch.fanout("admin.page_views")
ch.queue("dashboard.page_views", durable: true).bind("admin.page_views")
x = ch.fanout("blog.page_views")
ch.queue("dashboard.page_views", durable: true).bind("blog.page_views")
conn.close
end
end
RabbitMQ 处理复杂的面向服务的系统有很大的潜能。用可持续的队列,我们可以在一部分应用挂掉的时候,依然可以保持数据的一致性。对于仪表板式的应用,用户依然可以访问缓存过的数据,即使下层应用挂掉的时候。
与不断地访问 API 相比,消息传递性能提升明显。速度,持续性与可用性都有提高。理论上,我们放弃一些临时的一致性和实时数据更新。然而大多数情况下,我们发现用户在应用间访问时,缓存已经得到更新,最新的数据也是可用的。
可用的示例程序托管在此。