Rails 利用 ActiveSupport::Notifications 在 Rails 中实现 PUB/SUB 模式

victor · March 30, 2015 · Last by shaojunda replied at October 18, 2016 · 11031 hits
Topic has been selected as the excellent topic by the admin.

利用 ActiveSupport::Notifications 在 Rails 中实现 PUB/SUB 模式

前言

自从 Rails 4 从核心中移除了 Observers 之后,对于复杂的业务逻辑和依赖关系该放在哪,大家就开始各显神通了。

有人建议利用 Concerns 和 callback 来搞定。

module MyConcernModule
  extend ActiveSupport::Concern

  included do
    after_save :do_something
  end

  def do_something
     ...
  end
end

class MyModel < ActiveRecord::Base
  include MyConcernModule
end

更多的人认为,依据单一职责原则抽出一个 Service Object 才是王道。

在此基础上,Wisper确实是一个很好的解决方案。

你可以这么玩:

class PostsController < ApplicationController
  def create
    @post = Post.new(params[:post])

    @post.subscribe(PusherListener.new)
    @post.subscribe(ActivityListener.new)
    @post.subscribe(StatisticsListener.new)

    @post.on(:create_post_successful) { |post| redirect_to post }
    @post.on(:create_post_failed)     { |post| render :action => :new }

    @post.create
  end
end

也可以这么玩:

class ApplicationController < ActionController::Base
  around_filter :register_event_listeners

  def register_event_listeners(&around_listener_block)
    Wisper.with_listeners(UserListener.new) do
      around_listener_block.call
    end
  end
end

class User
  include Wisper::Publisher
  after_create{ |user| publish(:user_registered, user) }
end

class UserListener
  def user_registered(user)
    Analytics.track("user:registered", user.analytics)
  end
end

但是 @Rei 一句 Rails 有 ActiveSupport::Notifications 竟让我无言以对。

ActiveSupport::Notifications

在阅读了相关阅读中给出的链接,以及 Notifications in Rails 3 和官方文档之后,感觉这东西设计出来完全是为了进行统计、日志、性能分析之类的事情啊。

那么用它能不能实现一个 PUB/SUB 模式呢?先来回顾一下 PUB/SUB 模式核心的两个要点。

  1. Publishers 在对象状态改变且需要触发事件的时候发布事件。
  2. Subscribers 仅接收它们能响应的事件,并且在每个事件中可以接收到被监控的对象。

The Basics of AS::Notifications

ActiveSupport::Notifications 主要核心就是两个方法:instrumentsubscribe

你可以把 instrument 理解为发布事件。instrument 会在代码块执行完毕并返回结果之后,发布事件 my.custom.event,同时会自动把相关的一组参数:开始时间、结束时间、每个事件的唯一 ID 等,放入 payload 对象。

ActiveSupport::Notifications.instrument "my.custom.event", this: :data do
  # do your custom stuff here
end

现在你可以监听这个事件:

ActiveSupport::Notifications.subscribe "my.custom.event" do |name, started, finished, unique_id, data|
  puts data.inspect # {:this=>:data}
end

理解了这两个方法,我们可以试着实现一个 PUB/SUB 模式。

Publisher

# app/pub_sub/publisher.rb
module Publisher
  extend self

  # delegate to ActiveSupport::Notifications.instrument
  def broadcast_event(event_name, payload={})
    if block_given?
      ActiveSupport::Notifications.instrument(event_name, payload) do
        yield
      end
    else
      ActiveSupport::Notifications.instrument(event_name, payload)
    end
  end
end

首先我们定了 Publisher。它把 payload 夹在事件中广播出去,代码块也可以当做一个可选参数传递进来。

我们可以在 model 或 controller 中发布具体事件。

if user.save
  # publish event 'user.created', with payload {user: user}
  Publisher.broadcast_event('user.created', user: user)
end

def create_user(params)
  user = User.new(params)
  # publish event 'user.created', with payload {user: user}, using block syntax
  # now the event will have additional data about duration and exceptions
  Publisher.broadcast_event('user.created', user: user) do
    User.save!
    # do some more important stuff here
  end
end

Subscriber

Subscriber 可以订阅事件,并将代码块当做参数,传递给 ActiveSupport::Notifications::Event 的实例。

# app/pub_sub/subscriber.rb
module Subscriber
  # delegate to ActiveSupport::Notifications.subscribe
  def self.subscribe(event_name)
    if block_given?
      ActiveSupport::Notifications.subscribe(event_name) do |*args|
        event = ActiveSupport::Notifications::Event.new(*args)
        yield(event)
      end
    end
  end
end
# subscriber example usage
Subscriber.subscribe('user.created') do |event|
  error = "Error: #{event.payload[:exception].first}" if event.payload[:exception]
  puts "#{event.transaction_id} | #{event.name} | #{event.time} | #{event.duration} | #{event.payload[:user].id} | #{error}"
end

经典场景

用户注册后,为该用户发送欢迎邮件。

# app/pub_sub/publisher.rb
module Publisher
  extend ::ActiveSupport::Concern
  extend self

  included do
    # add support for namespace, one class - one namespace
    class_attribute :pub_sub_namespace

    self.pub_sub_namespace = nil
  end

  # delegate to class method
  def broadcast_event(event_name, payload={})
    if block_given?
      self.class.broadcast_event(event_name, payload) do
        yield
      end
    else
      self.class.broadcast_event(event_name, payload)
    end
  end

  module ClassMethods
    # delegate to ASN
    def broadcast_event(event_name, payload={})
      event_name = [pub_sub_namespace, event_name].compact.join('.')
      if block_given?
        ActiveSupport::Notifications.instrument(event_name, payload) do
          yield
        end
      else
        ActiveSupport::Notifications.instrument(event_name, payload)
      end
    end
  end
end
# app/pub_sub/publishers/registration.rb
module Publishers
  class Registration
    include Publisher

    self.pub_sub_namespace = 'registration'
  end
end

# broadcast event
if user.save
  Publishers::Registration.broadcast_event('user_signed_up', user: user)
end
# app/pub_sub/subscribers/base.rb
module Subscribers
  class Base
    class_attribute :subscriptions_enabled
    attr_reader :namespace

    def initialize(namespace)
      @namespace = namespace
    end

    # attach public methods of subscriber with events in the namespace
    def self.attach_to(namespace)
      log_subscriber = new(namespace)
      log_subscriber.public_methods(false).each do |event|
        ActiveSupport::Notifications.subscribe("#{namespace}.#{event}", log_subscriber)
      end
    end

    # trigger methods when an even is captured
    def call(message, *args)
      method  = message.gsub("#{namespace}.", '')
      handler = self.class.new(namespace)
      handler.send(method, ActiveSupport::Notifications::Event.new(message, *args))
    end
  end
end
# app/pub_sub/subscribers/registration_mailer.rb
module Subscribers
  class RegistrationMailer < ::Subscribers::Base
    def user_signed_up(event)
      # lets delay the delivery using delayed_job
      RegistrationMailer.delay(priority: 1).welcome_email(event.payload[:user])
    end
  end
end

# config/initializers/subscribers.rb
Subscribers::RegistrationMailer.attach_to('registration')

醉了吗?详细的解释可以看相关阅读中的文章。如果你的队友没有把 ASN 吃透,肯定会掀桌子。

所以是引入一个 gem 还是自己根据 ASN 来实现同样的功能,还是由你自己想吧。

相关阅读


4.22 补充

22 楼的 @satzcoal 提了如下几个问题,我也答应 @billy 在踩坑之后过来补充一下此文。@satzcoal 面对的问题有下面这些:

ActiveSupport::Notifications 在 sub 的管理上非常困难

ASN 对于 sub 的管理和 Wisper 其实并无太大差异。仍然可以进行全局订阅和临时订阅。如果你觉得难以管理,那我建议你跟我一样,在 initializes 文件夹下面建立一个 subscribers.rb 用来统一进行订阅。

subsribers = {
  todos: [ 'started', 'paused', 'completed', 'deleted' ],
  topics: ['created', 'deleted', 'sticked', 'unsticked', 'commented'],
  documents: ['created', 'deleted', 'updated', 'commented', 'recovered'],
}

subsribers.each do |key, value|
  value.each do |action|
    ActiveSupport::Notifications.subscribe("#{key}.#{action}") do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)
      WebhookService.call(event.name, event.transaction_id, event.payload)
    end
  end
end

很难验证 sub 的重复性

如果你使用了上面那种集中式管理 sub 的方法,这个问题就略过吧。

很难进行 unsub

可以利用 unsubscribe 来取消订阅,但是我在使用中,没有遇到这个场景。

很难管理 sub 的顺序

我猜你问得是很难定义 instrument 的顺序吧。从别人那里摘抄一个代码,你看看是不是你需要的?

ActiveSupport::Notifications.instrument 'user.signup' do
  # create user record
  ActiveSupport::Notifications.instrument 'twitter.location' do
    user.last_location = Twitter.user(user.twitter_username).location
  end
  # do more work
end

很难对 sub 中的异常进行处理

同上直接上代码了

ActiveSupport::Notifications.instrument 'twitter.location', :user => user.id do
  user.last_location = Twitter.user(user.twitter_username).location
end

# instrument message data - name, start, end, id, payload
=> ['twitter.location', 2015-04-22 17:15:44, 2015-04-22 17:15:44, 'LAiKEjiMCy8XYY9Y' { :user => 1023, :exception => ["Twitter::Error:NotFound", "not found"]}]

ASN 会在 payload 中返回 Exception Type 和 Exception Message 的。

小结

@billy 的报告。过去几周我使用 ASN 为我司的项目添加了 Webhook 功能。还是原来的答案,核心逻辑不该用 ASN。

ASN 可以使用在下列场景:

  • Custom Logging
  • New Relic Tracking
  • Google Analytics events
  • Gampfire / HipChat alerts
  • Schedule background jobs
  • Fire web hooks

好贴 :plus1:

排版很赞,文档很用心,赞

我个人是不太喜欢在同步代码中引入这些不必要的复杂性。在 Javascript 里面写消息非常自然,但在 Ruby 里面还是直接呼叫比较好理解和调试。

#3 楼 @billy 不是的,主要是按需选择。 直接呼叫适用于 你知道你需要谁来执行什么,而 P/S 模型适用于你不知道谁需要做些什么 或者说你不关注谁想继续做些什么

@taojay315 这种情况确实很多,但我觉得用 RabittMQ 或者 Sidekiq 或类似真正的异步方案才合适。同一个请求里面还有很多收听消息的,太复杂了。我觉得这应该也是 Rails 放弃 Observer pattern 的原因之一。

ActiveSupport::Notifications 真实应用场景我只见过 rack-attack 用来记日志

https://github.com/kickstarter/rack-attack#logging--instrumentation

#5 楼 @billy 还是看情况吧 比如下面的例子 这种 非功能性需求 上 RM 之类的就没有必要了 #6 楼 @Rei 还有 AR 的 LogSubscriber 应该还有性能的 subscriber

#5 楼 @billy 还是看情况吧 比如下面的例子 这种 非功能性需求 上 RM 之类的就没有必要了 #6 楼 @Rei 还有 AR 的 LogSubscriber 应该还有性能的 subscriber

我们在项目中用它来打 log 分析数据

@taojay315 进了 model, controller 的正常代码就不能算是非功能性的了。性能之类是确实非常有用,我们用过 sql.active_record 做索引的比较,这个又准又方便。

#11 楼 @billy 非功能性 也可以叫 非商业逻辑代码 比如 @huacnlee 说的那个情况。你可以读一下我附加的链接,有人也认为像 发送欢迎邮件 之类的不属于核心功能,可以用广播来发送。

归根结底它实现的功能就是大吼一嗓子:兄弟们,注意,我变形了!

至于其他人听到这声之后干啥,那就随便了。

@Victor 打 log,各种分析都是非常合适的用途。但欢迎邮件这些是不合适的,原因就在于你说的不属于核心功能,所以你不需要把所有的过程都加在一个请求里面,比如 UsersController#create。你看着代码是分散了,但实际上他们都是在一个进程,一个请求里面实现的。

编造和发出邮件需要时间,这些本来不需要加在#create 里面的。处理邮件过程中可能会出错,那么#create 可能还要想着怎么处理错误,这些本来也是不需要它关心的。

还有一个问题就是调试。本来 Rails 都是直来直去的代码,很好调试,但加入 Observer pattern 调试起来就比较麻烦。

我们处理邮件是用 RM,其他 Sidekiq, delayed_job 也都很合适,这些异步的才是真正的不关心。

#13 楼 @billy 异步队列和 Pub/Sub 模式不冲突。我在另外一篇帖子中提到这个问题了:Pub/Sub 模式中具体怎么处理事件,是直接打印输入,还是扔到队列中都可以啊。像你说的发送邮件这种事情,扔到队列中是毫无二异的。

我再来描述一下这个场景:

  1. 事件发布者广播:兄弟们,我要这注册了一个新人。
  2. 订阅者收到广播:靠,又要发邮件了,邮差你把这封信发一下。
  3. 邮差:扔到队列 (Sidekiq) 当中慢慢等着吧你。

这回理清了 Pub/Sub 和 Sidekiq 的关系了吗?

@Victor 你不需要和我解释 pub/sub 模式是怎么回事,基本的 pattern 没人不懂。你是看着这个好,没吃过苦头。要想分开责任,直接在 Controller 或者 Service Object 里面呼叫 EmailService 就可以了,单线程里面弄这么复杂是自找麻烦。

#15 楼 @billy 我是没看懂你在 #13 楼的回复。好像哪句都连不到一起。。。

我换个问法:注册用户触发发送邮件,你觉得写到哪里或者在哪调用发送邮件的方法好?

最简单就直接在 Controller 里面了。如果逻辑不多,Controller 也是最好的。

EmailService.send_welcome_email(@user) if @user.save
render @user

#17 楼 @billy 我也觉得你这样好。只是在研究 Pub/Sub 的时候,有人建议把这个模式和 Service Object 结合到一起。我确实没有实践过这种结合法,不知道有什么坑。不过我今天就要踩踩看了,如果有新进展,我再 @ 你,并补充此文。

@Victor 好,鼓励多尝试 :plus1:

#18 楼 @Victor 提醒:sub / pub 模式建议用 NodeJS 的解决方案。至少目前自己用的很顺。

我更倾向于在应用间使用 pub/sub 模式进行沟通. 现在实际应用中是使用队列进行沟通。

报告踩坑情况 本人的应用场景大致和注册发邮件类似,考虑到这个 pub 会有可能被多次应用到,就寻求一个类似 Observer 的模式,发现了 Observer 被干掉了之后搜到了本篇文章,然后就小实践了一下遇到的坑如下:

ActiveSupport::Notificationssub的管理上非常困难
很难验证sub的重复性
很难进行unsub
很难管理sub的顺序
很难对sub中的异常进行处理

大致就是这样,故不得不寻求其他的类 observer 方式

当然,就像楼上各位大神说的,ActiveSupport::Notifications 确实是一个非常好 P/S 的方式,但适用的场景可能更倾向于 单 sub 多 pub 的情况,对于单 pub 多 sub 的传统 Observer 模式似乎不太适合

我在正文中添加了 4.22 补充 用来回答 @satzcoal 的问题。并把尝试使用 ASN 实际开发项目之后的小结汇报给 @billy

@Victor 太客气了,可不敢当什么汇报,大家都是交流讨论 :) 我觉得你的总结很精彩,学到了很多。另外,关于管理的问题,确实分散了不好找,有一个土办法就是尽量写全名,而且名字尽量长一点点和特殊,类似于asn_twiiter.location, 这样 ROP(Regex Oriented Progamming) 就好用啦 :)

一直都很少用,因为觉得这种写法太罗嗦。而且封装的层太多,调试和测试难度加大。

你好,可以学习一下你实现 webhook 的思路吗?

You need to Sign in before reply, if you don't have an account, please Sign up first.