Ruby 利用 Postgres Pub/Sub机制,透过 EventSource 推送通知

mimosa · 2023年12月05日 · 最后由 nonpareilboy 回复于 2024年12月05日 · 817 次阅读
  • 随便玩玩让大家见笑了😀
  • 前端需要解决的问题,用户开过个 tab 占用连接
  • 用 BroadcastChannel 和 SharedWorker
    • public/worker.js
  • 部分后端代码:

    • app/controllers/stream_controller.rb
    • models/concerns/notifiable.rb 当然也可以用 Postgres 的 trigger 和 function 实现
  • 在页面中调用 SharedWorker 进行 SSE 连接,打开多个标签页只需保持一个连接

const bc = new BroadcastChannel('/events');
const worker = new SharedWorker('worker.js');
worker.port.start();

bc.addEventListener('message', e => {
  console.log(e);
}, false);
class EventsController < StreamController
  def index
    on_stream do |stream|
      Comment.on_notify(interval: 3) do |payload|
        stream.write(payload, event: payload.key?(:ping) ? 'hearbeat' : 'message')
      end
    end
  end
end

Rails.application.routes.draw do
  ...
  resources :events, only: %i[index], constraints: ->(request){ request.headers['HTTP_ACCEPT'].include?('text/event-stream') }
  ...
end
  • 收到的 MessageEvent data
{
    "type": "UPDATE",
    "key": "id",
    "value": "fffb5065-80ca-4d75-bcb2-d946693196de",
    "columns": [...]
}
{
    "type": "DELETE",
    "key": "id",
    "value": "fffb5065-80ca-4d75-bcb2-d946693196de",
    "columns": [...]
}
{
    "type": "INSERT",
    "key": "id",
    "value": "fffb5065-80ca-4d75-bcb2-d946693196de",
    "columns": [...]
}

  • 代码:
# typed: false
# frozen_string_literal: true

class StreamController < ApplicationController
  include ActionController::Live

  private

  def on_stream(&block)
    set_stream_headers

    # https://www.wjwh.eu/posts/2018-10-29-double-hijack.html
    response.headers['rack.hijack'] = proc do |stream|
      Thread.new { perform_stream(stream, block) }
    end

    render nothing: true
  end

  def perform_stream(stream, block)
    sse = SSE.new(stream, retry: 300, event: 'open')
    block.call(sse)
  rescue ActionController::Live::ClientDisconnected, Errno::EPIPE
    sse.close
  ensure
    sse.close
  end

  def set_stream_headers
    response.headers['Connection']    = 'keep-alive'
    response.headers['Content-Type']  = 'text/event-stream'
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Last-Modified'] = Time.current.httpdate # Disables eTag middleware buffering
    response.headers['X-Accel-Buffering'] = 'no' # Disable buffering for nginx
  end
end

# typed: false
# frozen_string_literal: true

module Notifiable
  extend ActiveSupport::Concern

  included do
    after_commit :notify_callbacks
  end

  class_methods do
    def on_notify(interval: 10, channel: nil, &block)
      raise 'No block given' unless block

      channel ||= table_name
      connection.execute sanitize_sql("LISTEN #{channel}\;")
      hearbeat_at = Time.current

      loop do
        connection.raw_connection.wait_for_notify(interval) do |channel_name, _pid, payload|
          block.call MultiJson.load(payload, symbolize_keys: true) if channel_name == channel
        end

        if Time.current - hearbeat_at >= interval
          block.call(ping: true)
          hearbeat_at = Time.current
        end
      end
    ensure
      connection.execute sanitize_sql("UNLISTEN #{channel}\;")
    end
  end

  def notify(payload)
    return if payload[:type] == 'UPDATE' && payload[:columns].empty?

    connection.execute sanitize_sql("NOTIFY #{channel}, '#{MultiJson.dump(payload)}'\;")
  end

  delegate :table_name, :primary_key, :connection, :sanitize_sql, :connection_pool, to: :class

  private

  alias channel table_name

  def notify_callbacks
    notify event_payload
  end

  def event_payload
    {
      type: event_type,
      key: primary_key,
      value: send(primary_key),
      columns: previous_changes.keys
    }
  end

  def event_type
    (destroyed? ? 'DELETE' : (previously_new_record? ? 'INSERT' : 'UPDATE'))
  end
end

var STATE = { connected: false }

// Subscribe to the event source at `uri` with exponential backoff reconnect.
function subscribe(uri) {
  var retryTime = 3;

  function connect(uri) {
    const bc = new BroadcastChannel(uri);
    const evtSource = new EventSource(uri, {
      withCredentials: true
    });

    evtSource.addEventListener('message', (ev) => {
      const msg = JSON.parse(ev.data);

      bc.postMessage(msg);
    }, false);

    evtSource.addEventListener('open', () => {
      setConnectedStatus(true);
      retryTime = 3;
      console.log(`connected to event stream at ${uri}`);
    }, false);

    evtSource.addEventListener('error', () => {
      evtSource.close();
      bc.close(); // Disconnect the channel
      setConnectedStatus(false);

      let timeout = retryTime;
        retryTime = Math.min(64, retryTime * 2);
      console.log(`connection lost. attempting to reconnect in ${timeout}s`);

      setTimeout(() => connect(uri), (() => timeout * 1000)());
    }, false);
  }

  connect(uri);
}

// Set the connection status: `true` for connected, `false` for disconnected.
function setConnectedStatus(status) {
  STATE.connected = status;
}

// Subscribe to server-sent events.
subscribe('/events');

好厉害啊大佬

Rei 回复

ActionCable 是 WebSocket 协议最好单独起服务,从兼容性考虑:

  • WebSocket 下沉至
    • Server-sent events (SSE) 再下沉至
      • HTTP Long Polling

Truly, this article is really one of the very best in the history of articles. I am a antique ’Article’ collector and I sometimes read some new articles if I find them interesting. And I found this one pretty fascinating and it should go into my collection. Very good work! emoji

kendy 回复

我看你像个 bot,你的 IP 172.71.210.171 也来自香港机房,所以我把你禁言了。

你对楼主的溢美之词,就先留下。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号