重构 发布 / 订阅模式

victor · 2015年03月30日 · 最后由 codemonkey 回复于 2015年09月23日 · 16945 次阅读
本帖已被管理员设置为精华贴

使用场景

很多项目中都有消息分发或者事件通知机制,尤其是模块化程度高的项目。

比如:在你的系统中,很多模块都对 新建用户 感兴趣。权限模块希望给新用户设置默认权限,报表模块希望重新生成当月的报表,邮件系统希望给用户发送激活邮件...诸如此类的代码都写到新建用户的业务逻辑后面,会加大耦合度,降低可维护性,并且对于每个模块都是一个独立系统的情况,这种方式更是不可取。

对于简单的情形,观察者模式 The Observer Pattern 就足够了。如果系统中有很多地方都需要收发消息,那么它就不适用了。否则会造成类数量的膨胀,增加类的复杂性,这时候就需要一种更集中的机制来处理这些业务逻辑。

什么是发布/订阅模式 (PUB-SUB)

现实中,并不是所有请求都期待答复,而不期待答复,自然就没有了状态。广播听过吧?收音机用过吧?就这个意思。

发布/订阅模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。

特点

  • 一个订阅者可以订阅多个发布者
  • 消息是会到达所有订阅者处,订阅者根据 filter 丢掉自己不需要的消息 (filter 是在订阅端起作用的)
  • 每个订阅者都会接收到每条消息的一个副本
  • 基于推送 push,其中消息自动地向订阅者广播,它们无须请求或轮询主题来获得新消息

发布/订阅模式内部,有多种不同类型的订阅者。

  • 非持久订阅者是临时订阅类型,它们只是在主动侦听主题时才接收消息。
  • 持久订阅者将接收到发布的每条消息的一个副本,即便在发布消息,它们处于"离线"状态时也是如此。
  • 另外还有动态持久订阅者和受管的持久订阅者等类型。

优势

  • 降低了模块间的耦合度:发布者与订阅者松散地耦合,并且不需要知道对方的存在。相关操作都集中在 Publisher 中。
  • 可扩展性强:系统复杂后,可以把消息订阅和分发机制单独作为一个模块来实现,增加新特性以满足需求

缺陷

与其说缺陷,不如说它设计本身就有如下特点。但不管怎么说,这种模式在逻辑上不可靠的。主要体现在:

  • 发布者不知道订阅者是否收到发布的消息
  • 订阅者不知道自己是否收到了发布者发出的所有消息
  • 发送者不能获知订阅者的执行情况
  • 没人知道订阅者何时开始收到消息

Wisper

你可能早就看过 RailsCasts 上的 #260 Messaging with Faye#316 Private Pub。但今天要来介绍的是另外一个 gem。

首先看看传统的利用 callback 的实现

Post 模型通过回调,与 Feed 模型和 User::NotifyFollowers 服务紧密的耦合在一起。

# app/models/post.rb
class Post
  after_create :create_feed, :notify_followers

  def create_feed
    Feed.create!(self)
  end

  def notify_followers
    User::NotifyFollowers.call(self)
  end
end

# app/controllers/api/v1/posts_controller.rb
class Api::V1::PostsController < Api::V1::ApiController
  def create
    @post = current_user.posts.build(post_params)
    if @post.save
      render_created(@post)
    else
      render_unprocessable_entity(@post.errors)
    end
  end
end

利用 Wisper 的 PUB-SUB 模式

# app/models/post.rb
class Post
  # no callbacks in the models!
end

Publishers 在对象状态改变且需要触发事件的时候发布事件。

# app/controllers/api/v1/posts_controller.rb
# corresponds to the publisher in the previous figure
class Api::V1::PostsController < Api::V1::ApiController

  include Wisper::Publisher
  def create
    @post = current_user.posts.build(post_params)
    if @post.save
      # Publish event about post creation for any interested listeners
      publish(:post_create, @post)
      render_created(@post)
    else
      # Publish event about post error for any interested listeners
      publish(:post_errors, @post)
      render_unprocessable_entity(@post.errors)
    end
  end
end

Subscribers 仅接收它们能响应的事件。

# app/listener/feed_listener.rb
class FeedListener
  def post_create(post)
    Feed.create!(post)
  end
end
# app/listener/user_listener.rb
class UserListener
  def post_create(post)
    User::NotifyFollowers.call(self)
  end
end

Event Bus 用来管理系统中订阅者都订阅哪些频道。

# config/initializers/wisper.rb

Wisper.subscribe(FeedListener.new)
Wisper.subscribe(UserListener.new)

进一步,根据单一职责原则把 PUB-SUB 模式和 Service Objects 联合起来

Publisher

# app/service/financial/order_review.rb
class Financial::OrderReview
  include Wisper::Publisher
  def self.call(order)
    if order.approved?
      publish(:order_create, order)
    else
      publish(:order_decline, order)
    end
  end
end

Subscribers

# app/listener/client_listener.rb
class ClientListener
  def order_create(order)
    # can implement transaction using different service objects
    Client::Charge.call(order)
    Inventory::UpdateStock.call(order)
  end

  def order_decline(order)
    Client::NotifyDeclinedOrder(order)
  end
end

更多用法,请参考 Wisper wiki

后记

本文的唯一作用可能就是,能让大家在设计 Service Objects 时候再多思考一点。虽然到底要不要抽出 Service Objects 也是一个见仁见智的问题。在相关连接中我有贴出 Gourmet Service Objects 一文,另外本站的 Service Object: What? Why? and How? 也请抽空读一下。

这里没有交代如何在发布/订阅模式中引入队列系统的方法,所以上面的代码你是绝对没办法直接拿去用的。

相关链接

#1 楼 @Rei 用 Rails build-in 的一套,确实比引入一个新 gem 更好。我再读一下 ActiveSupport::Notifications 相关文章,稍后将心得补充成另外一篇帖子。

我了个去 在地铁看这篇帖子 坐过站了

赞一个,上次写 Service Object 的文章只是把 Wisper 简单地看了一下。楼主这篇文章又让我把 Wisper 过了一遍。

个人感觉这是一个可以用在很多地方的 gem,十分灵活。但也是这点反而导致我不知道怎么用了。比如全局 listeners。

我是看楼主的文章才知道全局 listeners Global Listeners 的。Wisper 官方的解释是方便按关注点切分模块(报表,日志,等)。除此之外还可以设置局部的 listeners。

order_review = Financial::OrderReview
order_review.subscribe(ClientListener.new)
order_review.execute

但这里隐藏的问题是,某些情况很难判断应该用全局 listeners 还是局部 listeners(这不仅仅是职责切分的问题)。并且两个都有各自的优缺点。

全局 listeners 只用在 initializer 里面定义,看起来集中化了,但对发布者而言,它完全不知道哪些类会响应消息,开发者自己也需要去一个一个排查才知道。我不知道在同步模式下订阅者抛出异常会不会导致发布者挂掉,如果是这样的话,那就跟把逻辑塞进 ActiveRecord callback 有一样的问题了:一块逻辑有多个看不见的依赖逻辑,所以改动时牵一发动全身。我猜测 Wisper 的全局 listeners 更适合搭配异步模式,它也支持订阅者在 Sidekiq 这类异步任务里跑。

局部 listeners 面对以上的情况有很大的好转,因为代码更集中,易于查看和修改。但这里有另外的问题:

  1. subscribe 到底在哪里写比较好?如果写在调用 OrderReview 的地方,是不是说明还有其他调用 OrderReview 的地方不需要 subscribe ClientListener?
  2. 如果 subscribe 的模块基本不变,写在 OrderReview 的 initialize 里面是不是更省事呢?不过既然所有 subscribe 都写到一起去了,说明本身逻辑是有一定强耦合的,这时候可以直接抽 Service Object 是不是更简单呢?

比如上面的例子也可以改成:

class OrderReview
  def self.execute
    ClientListener.new.order_create
  end
end

BTW 感觉 Wisper 这个 API 似乎把 subscribe 理解反了,怎么看都应该是 ClientListener subscribe OrderReview 才对。

以上想法是基于代码维护性的方面思考的。也仅仅是个人看法。希望楼主分享下自己在什么场景下使用 Wisper 这种 gem 的。也让大家开阔一下视野。

对于 全局 listeners 只用在 initializer 里面定义,看起来集中化了,但对发布者而言,它完全不知道哪些类会响应消息,开发者自己也需要去一个一个排查才知道。 这个问题

发布 / 订阅模式的设计思路就是 发布者不知道哪些订阅者收到哪些消息。所以这个模式不适合做核心重要操作。我读的几篇关于 ASN 的例子也是拿来做日志,性能分析类事务。


对于代码的可维护性来说,我觉得不论全局还是局部。只要内部有个统一约定就好。我个人倾向结合 Service Object 用的时候还是局部订阅吧。就像你说的 报表,日志,统计 这些常规操作(非商业逻辑)放在全局订阅更好。


有些拿不准的,我也不乱回复了。等将来有新的实践经验再 @ 你,或坐等其他大神解答。

发布订阅模式的使用原因是:实际上是将发布方的 callback 逻辑抽离出来,单独作为一个 Service Object(即订阅端)。逻辑的信息量并没有变,但将这部分信息量分化出来以降低复杂度。

这里的 ClientListener,其实是一个总 callback 类,而到底需要 callback 哪一个,则由 ClientListener 方法决定,而非发布方决定。

@Victor 嗯,这个是 pub/sub 模式的特点,你总结的比我好,就不多说什么了。所以为了避免误会我才加上了 开发者自己也需要去一个一个排查才知道 。这点是我比较在意的。因为能够维护的前提在于“开发者知道自己在干什么”。

最开始我去翻 Service Object 的目的很简单:

  1. 代码逻辑比较集中,方便查看。
  2. 修改比较简单,不用担心触发 side effect。

Wisper 是个好东西,作者也考虑很周到,代码写在 initializer 里面还特意提醒读者这样做是 threadsafe 的。前一篇回复是想说太过灵活的工具有可能造成一些新的 trivial choices,总结相关的最佳实践又要花一番功夫了。

@whitecrow 这个例子里 ClientListener 的用途是比较泛的。它就是个总的 callback,里面调用两个 Service Object。如果本来就是强耦合关系,我会选择直接把两个 Service Object 写进 OrderReview。还可以省一个中间类。

class ClientListener
  def order_create(order)
    Client::Charge.call(order)
    Inventory::UpdateStock.call(order)
  end
end

当然,在更加灵活多变的场景下单独抽一个类是必然的选择。只是抽象层次有时候也是跟代码易读性成反比的,然后也会造成维护成本提升。好的结构是为了更好地维护代码,但维护代码并不一定需要完美的解耦和层次。

发布订阅模式在 NodeJS 中比较容易实现,而且也有一个 ruby 版本:http://faye.jcoglan.com/

Meteor是一个内置发布订阅机制的 Web 架构,实现源于Socket.io

Websocket-rails大家千万别用,坑很多!

#13 楼 @gazeldx 你肯定没注意到我在文章中提到了 #260 Messaging with Faye

#13 楼 @gazeldx 来,分享分享 Websocket-rails 的坑~

同上楼

1 虽说 wiki 中指出了支持 background jobs 中调用 websocket-rails 的事件,synchronize = true,但实践操作起来没有成功。

用 Sneakers 消费消息的时候 websocket-rails 的 synchronize = true,Sneakers 出现了报错信息。但貌似消息本身被正常的消费掉了, 但并没能成功的调用 websocket-rails,以至于我只好通过 http::get 的方式调用 websocket-rails,虽然设置了 timeout,但这也是性能上的一大缺憾,直接导致程序无法支持高并发!

2 在尝试 rails production 模式下运行 websocket-rails 貌似没法成功建立 websocket 连接,报错 500 错误。具体错误信息还需要我 进一步查看一下 production.log 的日志信息。 #16 楼 @linjunzhugg

你不会是群里的方方吧

需要 登录 后方可回复, 如果你还没有账号请 注册新账号