1216 字
6 分钟
SSE 在大语言模型中的流式数据处理简单封装
效果展示
背景
最近我在使用大语言模型做一些简单、好玩的工具应用,比如 算命、生成名人大事记等,然而大语言模型返回的数据量比较大,如果一次性返回,那么用户体验会比较差,所以需要使用 SSE 来实现 流式数据传输,提升用户体验。
SSE 是什么?
SSE(Server-Sent Events)是一种 单向服务器推送技术,即:
服务器 → 客户端
• 长连接(保持 HTTP 连接) • 逐步推送数据,适用于 流式响应(如大模型返回 Token)
与 WebSocket 的区别
特性 | SSE | WebSocket |
---|---|---|
连接方向 | 单向(服务器到客户端) | 双向 |
适用场景 | 流式数据 | 实时聊天、多人协作 |
兼容性 | 所有现代浏览器 | 需要服务器和客户端都支持 |
连接数 | 受浏览器限制 | 无限制 |
LLM 中的 SSE 工作流程
应用场景:ChatGPT、DeepSeek Chat、Gemini 等对话模型返回数据 逐步传输,而不是一次性返回整个 JSON。
步骤
- 客户端发起请求 • 使用 EventSource(浏览器)或 fetch + ReadableStream(Node.js)创建 SSE 连接
- 服务器逐步返回数据 • 服务器端不断向 SSE 连接推送 Token
- 客户端逐步解析数据 • 浏览器或前端代码监听 SSE 事件,逐步渲染消息
SSE 代码示例
服务端 Node.js 代码示例
import OpenAI from 'openai';
const openai = new OpenAI({ baseURL: 'https://api.deepseek.com', apiKey: process.env.DEEPSEEK_KEY,});
const stream = await openai.chat.completions.create({ model: "deepseek-chat", messages: [{ role: "user", content: "Hello, world!" }], stream: true,});
客户端代码示例 JavaScript
const stream = await fetch('http://localhost:3000/sse', { method: 'GET',}).then(res => res.body);
const reader = stream?.getReader();const decoder = new TextDecoder();while (true) { const { done, value } = await reader!.read(); if (done) break; const text = decoder.decode(value); console.log(text);}
简单封装
基于大模型返回的 SSE 数据,进行简单封装,实现流式数据传输以及页面渲染,包含思考模型处理。
服务端
函数定义
// @/utils/server.ts
/** * 获取大模型响应流 * @param completion 大模型响应流 await service.chat.completions.create 返回的流式数据 create({ stream: true }) * @param model 模型名称 * @param type 类型 * @returns 流式数据 */export const getStreamData = (completion: Stream<ChatCompletionChunk>) => { let count = 0; let thinkingCount = 0; const stream = new ReadableStream({ async start(controller) { try { console.log('Starting stream processing...'); for await (const chunk of completion) { const { choices } = chunk; const delta = choices[0]?.delta as Delta ?? {};
if (!delta) continue;
const { reasoning_content = null, content = null } = delta; // 添加 thinking 标签 if (count === 0 && reasoning_content) { controller.enqueue(new TextEncoder().encode('<thinking>')); // 等待 100ms 后避免批处理 await new Promise(resolve => setTimeout(resolve, 100)); } if (reasoning_content) { count++; thinkingCount++; controller.enqueue(new TextEncoder().encode(reasoning_content)); } if (count - thinkingCount === 0 && thinkingCount !== 0 && content) { controller.enqueue(new TextEncoder().encode('</thinking>')); // 等待 100ms 后避免批处理 await new Promise(resolve => setTimeout(resolve, 100)); } if (content) { count++; controller.enqueue(new TextEncoder().encode(content)); } } } catch (error) { console.error('Stream processing error:', error); controller.error(error); } finally { if (count === 0) { controller.enqueue(new TextEncoder().encode('[服务器繁忙,请稍后再试。]')); } else { controller.enqueue(new TextEncoder().encode('[DONE]')); } controller.close(); } }, }); return stream;};
使用
import OpenAI from 'openai';import { getStreamData } from '@/utils/server';
const openai = new OpenAI({ baseURL: 'https://api.deepseek.com', apiKey: process.env.DEEPSEEK_KEY,});
const completion = await openai.chat.completions.create({ model: "deepseek-chat", messages: [{ role: "user", content: "Hello, world!" }], stream: true,});const stream = await getStreamData(completion);
// 返回流式数据return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', },});
客户端
函数定义
// @/utils/client.ts
/** * 解析流式数据 * @param stream 流式数据 ReadableStream * @param options 选项 * @param options.output 输出类型 默认 text 如果 output 为 json 则onResult 的 result 返回解析后的 json 对象 * @param options.onStart 流式数据开始 * @param options.onEnd 流式数据结束 形参 thinking 为思考内容,result 为结果 * @param options.onchange 实时更新最新内容 形参 thinking 为思考内容,result 为结果 * @param options.onThinkingStart 如果是思考模型,则 onThinkingStart 会触发 * @param options.onThinkingEnd 如果是思考模型,则 onThinkingEnd 会触发 */export const parseReadableStream = async (stream: ReadableStream<Uint8Array<ArrayBufferLike>>, options: { output?: 'text' | 'json'; onStart?: () => void; onEnd?: (thinking: string, result: string | Record<string, unknown>) => void; onchange?: (thinking: string, result: string) => void; onThinkingStart?: () => void; onThinkingEnd?: () => void;}) => { const { output = 'text', onStart = () => {}, onEnd = () => {}, onThinkingStart = () => {}, onThinkingEnd = () => {}, onchange = () => {} } = options; const reader = stream?.getReader(); const decoder = new TextDecoder(); let isReasoning = false; let thinking = ''; let content = ''; onStart(); while (true) { const { done, value } = await reader!.read(); if (done) break; const text = decoder.decode(value); if (text.includes('<thinking>')) { isReasoning = true; onThinkingStart(); continue; } if (text.includes('</thinking>')) { isReasoning = false; onThinkingEnd(); continue; } if (text.includes('[DONE]')) { let result = content; if (output === 'json') { // 取出 ```json 和 ``` 之间的内容 const jsonContent = result.match(/```json\s*([\s\S]*?)\s*```/)?.[1] || ''; try { result = JSON.parse(jsonContent); } catch (error) { console.error(error); } } onEnd(thinking, result); break; } if (isReasoning) { thinking += text; } else { content += text; } onchange(thinking, content); }};
使用
import { parseReadableStream } from '@/utils/client';
const stream = await fetch('http://localhost:3000/sse', { method: 'GET',}).then(res => res.body);
try { parseReadableStream(stream, { output: 'json', onStart: () => { console.log('开始'); }, onEnd: (thinking, result) => { console.log('结束', thinking, result); }, onchange: (thinking, result) => { // 如果 output 为 json 则 result 为 已经解析后对象 console.log('实时更新', thinking, result); }, onThinkingStart: () => { console.log('思考开始'); }, onThinkingEnd: () => { console.log('思考结束'); }, });} catch (error) { console.error(error);}