分享 在 Rails 中使用 SSE 来实现一个 ChatGPT 应用

rennyallen · 2023年05月05日 · 最后由 ias 回复于 2024年07月02日 · 2190 次阅读
本帖已被管理员设置为精华贴

原文发布于个人博客站点:https://renny.ren/ch/articles/40


前言

在使用 ChatGPT 的时候,你会注意到这个回复不是一次性生成完的,而是边生成边返回,像打字一样的效果:

那么这是如何实现的呢,这篇来研究一下相关的技术细节。

其实这种效果叫 streaming response (流式传输的回复),很形象。

提到 streaming response 就不得不提到 SSE

关于 SSE

如果你看一下 OenAI API 文档,就会发现有一个参数叫 stream

If set, partial message deltas will be sent, like in ChatGPT. Tokens will be sent as data-only server-sent events as they become available, with the stream terminated by a data: [DONE] message.

所以什么是 SSE 呢?

简单来说,SSE (Server-Sent Event) 是一种从服务器流式传输事件的简单方式。它通过单个 HTTP 连接将实时更新从服务器发送到客户端。使用 SSE,只要建立连接后,服务器就可以将实时数据推送到客户端,无需靠客户端不断轮询来获取更新。

步骤如下:

  1. 客户端发送 GET 请求到服务器:https://www.host.com/stream
  2. 建立长连接,响应头里会有 Connection: keep-alive (从 HTTP/1.1 起,默认就使用的是长连接)
  3. 服务端设置 Content-Type: text/event-stream response header
  4. 服务端可以开始发送事件 (event) 了,类似这样:
event: add
data: This is the first message, it
data: has two lines.

就是这么简单

比较一下 SSE 和 WebSocket

那么 SSE 和 WebSocket 是不是差不多呢?总结了一下,它们都是可以用来在客户端和服务端做实时通信的,但有一些小的区别:

  1. SSE 提供的是单向通信渠道 (server -> client);而 WebSockets 是双向沟通,客户端也可以随时给服务器发消息
  2. SSE 是基于 HTTP 的,本质上还是使用长轮询技术来实现实时通信;而 WebSocket 则直接在 TCP 连接上发送和接收数据
  3. SSE 在连接丢失的时候会自动尝试重连,重连失败又会重连,无限重连。。所以你在浏览器看到的就是 GET 请求无限发送,一直到服务器返回连接成功为止,所以需要加上处理异常的代码,在 client 端关闭连接;而 WebSocket 如果连接丢失了一般是需要 client 重新建立一个新的连接

总的来说,SSE 使用简单,更适合传输小量的数据,特别是只需要服务端到客户端单向通信的时候。WebSocket 要更强大一些,可以用于更多的复杂场景,比如多人实时聊天、多人游戏等。

Workflow

接下来以 Rails 提供后端接口为例,看看怎么调用 OpenAI 的接口 接收 SSE 事件,然后转发到我们的客户端。

工作流程是这样的:

  1. 客户端使用 EventSource 接口向服务端发送请求
  2. 服务端收到请求,发送请求到 OpenAI 接口,带上 stream: true 参数
  3. 服务端收到来自 OpenAI 的 event,然后转发给客户端
  4. 当事件发送完毕后,OpenAI 会发送一个特殊的消息来告诉我们可以关闭连接了。比如当我们收到 [Done],就可以关闭服务器与 OpenAI 之间的连接,然后客户端关闭到我们服务器的连接

用 Rails 提供后端接口

理解了 SSE 和工作流之后,接下来就是代码来实现整个过程。总共三个部分:

  • client
const fetchResponse = () => {
  const evtSource = new EventSource(`/v1/completions/live_stream?prompt=${prompt}`)
  evtSource.onmessage = (event) => {
    if (event) {
      const response = JSON.parse(event.data)
      setMessage(response)
    } else {
      evtSource.close()
    }
  }
  evtSource.onerror = () => {
    evtSource.close()
  }
}

使用上面提到的 EventSource API 来建立 SSE 链接。 当收到新消息的时候,onmessage 事件会被触发

  • server
class CompletionsController < ApplicationController
  include ActionController::Live

  def live_stream
    response.headers["Content-Type"] = "text/event-stream"
    response.headers["Last-Modified"] = Time.now.httpdate
    sse = SSE.new(response.stream, retry: 300)
    ChatCompletion::LiveStreamService.new(sse, live_stream_params).call
  ensure
    sse.close
  end
end
  • 这里引入了 ActionController::Live module 来开启 streaming response
  • 上面提到了,需要设置 text/event-stream response header
  • 这里需要特别注意的是,如果你使用的是 Rails 7, Rails 7 默认是不支持 stream repsonse 的,这个问题我找了好久,最后发现是 rack 的问题

Rails 7 默认是引入了 Rack::ETag 的,而这玩意会把 response 缓存起来,导致实时的 streaming response 就不能实现了。

这个问题我看 issue 里面讨论了好久,到最后也没有解决,不过有 hack 的解决方案,具体可以参考这里

总之如果你的 rack 版本是 2.2.x 就需要加下面这一行:

response.headers["Last-Modified"] = Time.now.httpdate
  • OpenAI API

接下来是请求 OpenAI 接口的部分,自己封装了一个简单的 gem,支持流式传输

module ChatCompletion
  class LiveStreamService
    def call
      client.create_chat_completion(request_body) do |chunk, overall_received_bytes, env|
        data = chunk[/data: (.*)\n\n$/, 1]
        send_message(data)
      end
    end

    def send_message(data)
      response = JSON.parse(data)
      if response.dig("choices", 0, "delta", "content")
        @result = @result + response.dig("choices", 0, "delta", "content")
      end
      sse.write(status: 200, content: @result)
    end

    private

    def client
      @client ||= OpenAI::Client.new(OPENAI_API_KEY)
    end
  end
end

最后,我上面写的是前后端分离的方案,如果你在用 Hotwire 的话,可以看看这篇https://ruby-china.org/topics/42959

最后,我做了一个 demo,大家可以体验效果:https://aiichat.cn/chats/new (登录账号密码皆为 rubychina)

我好奇你那里有 ai 算命怎么实现的?

femto 回复

就是给模型加一个相应的 prompt,设定角色

支持支持。

我是定位到是 rack 的问题,之后懒得看了,直接起了个 sinatra 完事。。。😀

Rei 将本帖设为了精华贴。 05月07日 01:01

ruby-openai 已经支持了 这个

nine 回复

对,这个问题有点坑,刚开始没想到和 rack 有关,我还以为是我哪里配置错了

rennyallen 回复
headers = {"Cache-Control": "no-cache"}

会不会有效

zhugexinxin 回复

不行的,这个就是问题所在。即使设置了 "Cache-Control": "no-cache" 还是会生成 ETag,导致 stream 不可用。

可以看看这个 https://github.com/rack/rack/pull/1416

rennyallen 回复

暴风哭泣

这块 Nginx 应该怎么配置一下吗,我本地跑没问题,Nginx 转发之后就歇菜了,一次性把结果仍会来

rennyallen 回复

rails 7.0.4.3 puma (5.6.5)

开发环境下,我发现 sse 请求,返回大小为 5-8kb 左右时,会输出一大段然后卡住,耗时 20 秒之多,请问您这边有没有这样情况

应该是一次性发送太多的问题,测了下。

willx 回复

Nginx 不用配置的,你这个情况看下是不是你 Nginx 配置了什么缓存机制,比如设置了 ETag add_header ETag 或者 Cache-Control 之类的

zhugexinxin 回复

是会有偶尔卡住的问题,还没研究怎么解决。或许可以控制一下返回给客户端的速率,先把一大段存起来,然后一个字一个字地发送 event

rennyallen 回复

一种方法是设置头

response.headers['Last-Modified'] = Time.now.httpdate

一种方法是移除 Etag

config.middleware.delete Rack::ETag
zhugexinxin 回复

你好,最后怎么解决的,我也遇到了这个问题

需要 登录 后方可回复, 如果你还没有账号请 注册新账号