Ruby 利用 Postgres Pub/Sub机制,透过 EventSource 推送通知

mimosa · December 05, 2023 · Last by Angel17 replied at December 09, 2023 · 649 hits
  • 随便玩玩让大家见笑了😀
  • 前端需要解决的问题,用户开过个 tab 占用连接
  • 用 BroadcastChannel 和 SharedWorker
    • public/worker.js
  • 部分后端代码:

    • app/controllers/stream_controller.rb
    • models/concerns/notifiable.rb 当然也可以用 Postgres 的 trigger 和 function 实现
  • 在页面中调用 SharedWorker 进行 SSE 连接,打开多个标签页只需保持一个连接

const bc = new BroadcastChannel('/events');
const worker = new SharedWorker('worker.js');
worker.port.start();

bc.addEventListener('message', e => {
  console.log(e);
}, false);
class EventsController < StreamController
  def index
    on_stream do |stream|
      Comment.on_notify(interval: 3) do |payload|
        stream.write(payload, event: payload.key?(:ping) ? 'hearbeat' : 'message')
      end
    end
  end
end

Rails.application.routes.draw do
  ...
  resources :events, only: %i[index], constraints: ->(request){ request.headers['HTTP_ACCEPT'].include?('text/event-stream') }
  ...
end
  • 收到的 MessageEvent data
{
    "type": "UPDATE",
    "key": "id",
    "value": "fffb5065-80ca-4d75-bcb2-d946693196de",
    "columns": [...]
}
{
    "type": "DELETE",
    "key": "id",
    "value": "fffb5065-80ca-4d75-bcb2-d946693196de",
    "columns": [...]
}
{
    "type": "INSERT",
    "key": "id",
    "value": "fffb5065-80ca-4d75-bcb2-d946693196de",
    "columns": [...]
}

  • 代码:
# typed: false
# frozen_string_literal: true

class StreamController < ApplicationController
  include ActionController::Live

  private

  def on_stream(&block)
    set_stream_headers

    # https://www.wjwh.eu/posts/2018-10-29-double-hijack.html
    response.headers['rack.hijack'] = proc do |stream|
      Thread.new { perform_stream(stream, block) }
    end

    render nothing: true
  end

  def perform_stream(stream, block)
    sse = SSE.new(stream, retry: 300, event: 'open')
    block.call(sse)
  rescue ActionController::Live::ClientDisconnected, Errno::EPIPE
    sse.close
  ensure
    sse.close
  end

  def set_stream_headers
    response.headers['Connection']    = 'keep-alive'
    response.headers['Content-Type']  = 'text/event-stream'
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Last-Modified'] = Time.current.httpdate # Disables eTag middleware buffering
    response.headers['X-Accel-Buffering'] = 'no' # Disable buffering for nginx
  end
end

# typed: false
# frozen_string_literal: true

module Notifiable
  extend ActiveSupport::Concern

  included do
    after_commit :notify_callbacks
  end

  class_methods do
    def on_notify(interval: 10, channel: nil, &block)
      raise 'No block given' unless block

      channel ||= table_name
      connection.execute sanitize_sql("LISTEN #{channel}\;")
      hearbeat_at = Time.current

      loop do
        connection.raw_connection.wait_for_notify(interval) do |channel_name, _pid, payload|
          block.call MultiJson.load(payload, symbolize_keys: true) if channel_name == channel
        end

        if Time.current - hearbeat_at >= interval
          block.call(ping: true)
          hearbeat_at = Time.current
        end
      end
    ensure
      connection.execute sanitize_sql("UNLISTEN #{channel}\;")
    end
  end

  def notify(payload)
    return if payload[:type] == 'UPDATE' && payload[:columns].empty?

    connection.execute sanitize_sql("NOTIFY #{channel}, '#{MultiJson.dump(payload)}'\;")
  end

  delegate :table_name, :primary_key, :connection, :sanitize_sql, :connection_pool, to: :class

  private

  alias channel table_name

  def notify_callbacks
    notify event_payload
  end

  def event_payload
    {
      type: event_type,
      key: primary_key,
      value: send(primary_key),
      columns: previous_changes.keys
    }
  end

  def event_type
    (destroyed? ? 'DELETE' : (previously_new_record? ? 'INSERT' : 'UPDATE'))
  end
end

var STATE = { connected: false }

// Subscribe to the event source at `uri` with exponential backoff reconnect.
function subscribe(uri) {
  var retryTime = 3;

  function connect(uri) {
    const bc = new BroadcastChannel(uri);
    const evtSource = new EventSource(uri, {
      withCredentials: true
    });

    evtSource.addEventListener('message', (ev) => {
      const msg = JSON.parse(ev.data);

      bc.postMessage(msg);
    }, false);

    evtSource.addEventListener('open', () => {
      setConnectedStatus(true);
      retryTime = 3;
      console.log(`connected to event stream at ${uri}`);
    }, false);

    evtSource.addEventListener('error', () => {
      evtSource.close();
      bc.close(); // Disconnect the channel
      setConnectedStatus(false);

      let timeout = retryTime;
        retryTime = Math.min(64, retryTime * 2);
      console.log(`connection lost. attempting to reconnect in ${timeout}s`);

      setTimeout(() => connect(uri), (() => timeout * 1000)());
    }, false);
  }

  connect(uri);
}

// Set the connection status: `true` for connected, `false` for disconnected.
function setConnectedStatus(status) {
  STATE.connected = status;
}

// Subscribe to server-sent events.
subscribe('/events');

好厉害啊大佬

Reply to Rei

ActionCable 是 WebSocket 协议最好单独起服务,从兼容性考虑:

  • WebSocket 下沉至
    • Server-sent events (SSE) 再下沉至
      • HTTP Long Polling

Truly, this article is really one of the very best in the history of articles. I am a antique ’Article’ collector and I sometimes read some new articles if I find them interesting. And I found this one pretty fascinating and it should go into my collection. Very good work! emoji

Reply to kendy

我看你像个 bot,你的 IP 172.71.210.171 也来自香港机房,所以我把你禁言了。

你对楼主的溢美之词,就先留下。

You need to Sign in before reply, if you don't have an account, please Sign up first.