原文发布于个人博客站点:https://renny.ren/ch/articles/40
在使用 ChatGPT 的时候,你会注意到这个回复不是一次性生成完的,而是边生成边返回,像打字一样的效果:
那么这是如何实现的呢,这篇来研究一下相关的技术细节。
其实这种效果叫 streaming response (流式传输的回复),很形象。
提到 streaming response 就不得不提到 SSE
如果你看一下 OenAI API 文档,就会发现有一个参数叫 stream
If set, partial message deltas will be sent, like in ChatGPT. Tokens will be sent as data-only server-sent events as they become available, with the stream terminated by a
data: [DONE]
message.
所以什么是 SSE 呢?
简单来说,SSE (Server-Sent Event) 是一种从服务器流式传输事件的简单方式。它通过单个 HTTP 连接将实时更新从服务器发送到客户端。使用 SSE,只要建立连接后,服务器就可以将实时数据推送到客户端,无需靠客户端不断轮询来获取更新。
步骤如下:
https://www.host.com/stream
Connection: keep-alive
(从 HTTP/1.1 起,默认就使用的是长连接)Content-Type: text/event-stream
response headerevent: add
data: This is the first message, it
data: has two lines.
就是这么简单
那么 SSE 和 WebSocket 是不是差不多呢?总结了一下,它们都是可以用来在客户端和服务端做实时通信的,但有一些小的区别:
总的来说,SSE 使用简单,更适合传输小量的数据,特别是只需要服务端到客户端单向通信的时候。WebSocket 要更强大一些,可以用于更多的复杂场景,比如多人实时聊天、多人游戏等。
接下来以 Rails 提供后端接口为例,看看怎么调用 OpenAI 的接口 接收 SSE 事件,然后转发到我们的客户端。
工作流程是这样的:
stream: true
参数[Done]
,就可以关闭服务器与 OpenAI 之间的连接,然后客户端关闭到我们服务器的连接理解了 SSE 和工作流之后,接下来就是代码来实现整个过程。总共三个部分:
const fetchResponse = () => {
const evtSource = new EventSource(`/v1/completions/live_stream?prompt=${prompt}`)
evtSource.onmessage = (event) => {
if (event) {
const response = JSON.parse(event.data)
setMessage(response)
} else {
evtSource.close()
}
}
evtSource.onerror = () => {
evtSource.close()
}
}
使用上面提到的 EventSource
API 来建立 SSE 链接。
当收到新消息的时候,onmessage
事件会被触发
class CompletionsController < ApplicationController
include ActionController::Live
def live_stream
response.headers["Content-Type"] = "text/event-stream"
response.headers["Last-Modified"] = Time.now.httpdate
sse = SSE.new(response.stream, retry: 300)
ChatCompletion::LiveStreamService.new(sse, live_stream_params).call
ensure
sse.close
end
end
ActionController::Live
module 来开启 streaming responsetext/event-stream
response headerRails 7 默认是引入了 Rack::ETag
的,而这玩意会把 response 缓存起来,导致实时的 streaming response 就不能实现了。
这个问题我看 issue 里面讨论了好久,到最后也没有解决,不过有 hack 的解决方案,具体可以参考这里
总之如果你的 rack 版本是 2.2.x
就需要加下面这一行:
response.headers["Last-Modified"] = Time.now.httpdate
接下来是请求 OpenAI 接口的部分,自己封装了一个简单的 gem,支持流式传输
module ChatCompletion
class LiveStreamService
def call
client.create_chat_completion(request_body) do |chunk, overall_received_bytes, env|
data = chunk[/data: (.*)\n\n$/, 1]
send_message(data)
end
end
def send_message(data)
response = JSON.parse(data)
if response.dig("choices", 0, "delta", "content")
@result = @result + response.dig("choices", 0, "delta", "content")
end
sse.write(status: 200, content: @result)
end
private
def client
@client ||= OpenAI::Client.new(OPENAI_API_KEY)
end
end
end
最后,我上面写的是前后端分离的方案,如果你在用 Hotwire 的话,可以看看这篇 和 https://ruby-china.org/topics/42959
最后,我做了一个 demo,大家可以体验效果:https://aiichat.cn/chats/new (登录账号密码皆为 rubychina)