public/worker.js
部分后端代码:
app/controllers/stream_controller.rb
models/concerns/notifiable.rb
当然也可以用 Postgres 的 trigger 和 function 实现在页面中调用 SharedWorker 进行 SSE 连接,打开多个标签页只需保持一个连接
const bc = new BroadcastChannel('/events');
const worker = new SharedWorker('worker.js');
worker.port.start();
bc.addEventListener('message', e => {
console.log(e);
}, false);
class EventsController < StreamController
def index
on_stream do |stream|
Comment.on_notify(interval: 3) do |payload|
stream.write(payload, event: payload.key?(:ping) ? 'hearbeat' : 'message')
end
end
end
end
Rails.application.routes.draw do
...
resources :events, only: %i[index], constraints: ->(request){ request.headers['HTTP_ACCEPT'].include?('text/event-stream') }
...
end
{
"type": "UPDATE",
"key": "id",
"value": "fffb5065-80ca-4d75-bcb2-d946693196de",
"columns": [...]
}
{
"type": "DELETE",
"key": "id",
"value": "fffb5065-80ca-4d75-bcb2-d946693196de",
"columns": [...]
}
{
"type": "INSERT",
"key": "id",
"value": "fffb5065-80ca-4d75-bcb2-d946693196de",
"columns": [...]
}
# typed: false
# frozen_string_literal: true
class StreamController < ApplicationController
include ActionController::Live
private
def on_stream(&block)
set_stream_headers
# https://www.wjwh.eu/posts/2018-10-29-double-hijack.html
response.headers['rack.hijack'] = proc do |stream|
Thread.new { perform_stream(stream, block) }
end
render nothing: true
end
def perform_stream(stream, block)
sse = SSE.new(stream, retry: 300, event: 'open')
block.call(sse)
rescue ActionController::Live::ClientDisconnected, Errno::EPIPE
sse.close
ensure
sse.close
end
def set_stream_headers
response.headers['Connection'] = 'keep-alive'
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Cache-Control'] = 'no-cache'
response.headers['Last-Modified'] = Time.current.httpdate # Disables eTag middleware buffering
response.headers['X-Accel-Buffering'] = 'no' # Disable buffering for nginx
end
end
# typed: false
# frozen_string_literal: true
module Notifiable
extend ActiveSupport::Concern
included do
after_commit :notify_callbacks
end
class_methods do
def on_notify(interval: 10, channel: nil, &block)
raise 'No block given' unless block
channel ||= table_name
connection.execute sanitize_sql("LISTEN #{channel}\;")
hearbeat_at = Time.current
loop do
connection.raw_connection.wait_for_notify(interval) do |channel_name, _pid, payload|
block.call MultiJson.load(payload, symbolize_keys: true) if channel_name == channel
end
if Time.current - hearbeat_at >= interval
block.call(ping: true)
hearbeat_at = Time.current
end
end
ensure
connection.execute sanitize_sql("UNLISTEN #{channel}\;")
end
end
def notify(payload)
return if payload[:type] == 'UPDATE' && payload[:columns].empty?
connection.execute sanitize_sql("NOTIFY #{channel}, '#{MultiJson.dump(payload)}'\;")
end
delegate :table_name, :primary_key, :connection, :sanitize_sql, :connection_pool, to: :class
private
alias channel table_name
def notify_callbacks
notify event_payload
end
def event_payload
{
type: event_type,
key: primary_key,
value: send(primary_key),
columns: previous_changes.keys
}
end
def event_type
(destroyed? ? 'DELETE' : (previously_new_record? ? 'INSERT' : 'UPDATE'))
end
end
var STATE = { connected: false }
// Subscribe to the event source at `uri` with exponential backoff reconnect.
function subscribe(uri) {
var retryTime = 3;
function connect(uri) {
const bc = new BroadcastChannel(uri);
const evtSource = new EventSource(uri, {
withCredentials: true
});
evtSource.addEventListener('message', (ev) => {
const msg = JSON.parse(ev.data);
bc.postMessage(msg);
}, false);
evtSource.addEventListener('open', () => {
setConnectedStatus(true);
retryTime = 3;
console.log(`connected to event stream at ${uri}`);
}, false);
evtSource.addEventListener('error', () => {
evtSource.close();
bc.close(); // Disconnect the channel
setConnectedStatus(false);
let timeout = retryTime;
retryTime = Math.min(64, retryTime * 2);
console.log(`connection lost. attempting to reconnect in ${timeout}s`);
setTimeout(() => connect(uri), (() => timeout * 1000)());
}, false);
}
connect(uri);
}
// Set the connection status: `true` for connected, `false` for disconnected.
function setConnectedStatus(status) {
STATE.connected = status;
}
// Subscribe to server-sent events.
subscribe('/events');