1216 字
6 分钟
SSE 在大语言模型中的流式数据处理简单封装
2025-04-01

效果展示#

背景#

最近我在使用大语言模型做一些简单、好玩的工具应用,比如 算命生成名人大事记等,然而大语言模型返回的数据量比较大,如果一次性返回,那么用户体验会比较差,所以需要使用 SSE 来实现 流式数据传输,提升用户体验。

SSE 是什么?#

SSE(Server-Sent Events)是一种 单向服务器推送技术,即:

服务器 → 客户端

• 长连接(保持 HTTP 连接) • 逐步推送数据,适用于 流式响应(如大模型返回 Token)

与 WebSocket 的区别

特性SSEWebSocket
连接方向单向(服务器到客户端)双向
适用场景流式数据实时聊天、多人协作
兼容性所有现代浏览器需要服务器和客户端都支持
连接数受浏览器限制无限制

LLM 中的 SSE 工作流程#

应用场景:ChatGPT、DeepSeek Chat、Gemini 等对话模型返回数据 逐步传输,而不是一次性返回整个 JSON。

步骤#

  1. 客户端发起请求 • 使用 EventSource(浏览器)或 fetch + ReadableStream(Node.js)创建 SSE 连接
  2. 服务器逐步返回数据 • 服务器端不断向 SSE 连接推送 Token
  3. 客户端逐步解析数据 • 浏览器或前端代码监听 SSE 事件,逐步渲染消息

SSE 代码示例#

服务端 Node.js 代码示例

import OpenAI from 'openai';

const openai = new OpenAI({
  baseURL: 'https://api.deepseek.com',
  apiKey: process.env.DEEPSEEK_KEY,
});

const stream = await openai.chat.completions.create({
  model: "deepseek-chat",
  messages: [{ role: "user", content: "Hello, world!" }],
  stream: true,
});

客户端代码示例 JavaScript

const stream = await fetch('http://localhost:3000/sse', {
  method: 'GET',
}).then(res => res.body);

const reader = stream?.getReader();
const decoder = new TextDecoder();
while (true) {
  const { done, value } = await reader!.read();
  if (done) break;
  const text = decoder.decode(value);
  console.log(text);
}

简单封装#

基于大模型返回的 SSE 数据,进行简单封装,实现流式数据传输以及页面渲染,包含思考模型处理。

服务端#

函数定义#

// @/utils/server.ts

/**
 * 获取大模型响应流
 * @param completion 大模型响应流 await service.chat.completions.create 返回的流式数据 create({ stream: true })
 * @param model 模型名称
 * @param type 类型
 * @returns 流式数据
 */
export const getStreamData = (completion: Stream<ChatCompletionChunk>) => {
  let count = 0;
  let thinkingCount = 0;
  const stream = new ReadableStream({
    async start(controller) {
      try {
        console.log('Starting stream processing...');
        for await (const chunk of completion) {
          const { choices } = chunk;
          const delta = choices[0]?.delta as Delta ?? {};
          
          if (!delta) continue;

          const { reasoning_content = null, content = null } = delta;
          // 添加 thinking 标签
          if (count === 0 && reasoning_content) {
            controller.enqueue(new TextEncoder().encode('<thinking>'));
            // 等待 100ms 后避免批处理
            await new Promise(resolve => setTimeout(resolve, 100));
          }
          if (reasoning_content) {
            count++;
            thinkingCount++;
            controller.enqueue(new TextEncoder().encode(reasoning_content));
          }
          if (count - thinkingCount === 0 && thinkingCount !== 0 && content) {
            controller.enqueue(new TextEncoder().encode('</thinking>'));
            // 等待 100ms 后避免批处理
            await new Promise(resolve => setTimeout(resolve, 100));
          }
          if (content) {
            count++;
            controller.enqueue(new TextEncoder().encode(content));
          }
        }
      } catch (error) {
        console.error('Stream processing error:', error);
        controller.error(error);
      } finally {
        if (count === 0) {
          controller.enqueue(new TextEncoder().encode('[服务器繁忙,请稍后再试。]'));
        } else {
          controller.enqueue(new TextEncoder().encode('[DONE]'));
        }
        controller.close();
      }
    },
  });
  return stream;
};

使用#

import OpenAI from 'openai';
import { getStreamData } from '@/utils/server';

const openai = new OpenAI({
  baseURL: 'https://api.deepseek.com',
  apiKey: process.env.DEEPSEEK_KEY,
});

const completion = await openai.chat.completions.create({
  model: "deepseek-chat",
  messages: [{ role: "user", content: "Hello, world!" }],
  stream: true,
});
const stream = await getStreamData(completion);

// 返回流式数据
return new Response(stream, {
  headers: {
    'Content-Type': 'text/event-stream',
  },
});

客户端#

函数定义#

// @/utils/client.ts

/**
 * 解析流式数据
 * @param stream 流式数据 ReadableStream
 * @param options 选项
 * @param options.output 输出类型 默认 text 如果 output 为 json 则onResult 的 result 返回解析后的 json 对象
 * @param options.onStart 流式数据开始
 * @param options.onEnd 流式数据结束 形参 thinking 为思考内容,result 为结果
 * @param options.onchange 实时更新最新内容 形参 thinking 为思考内容,result 为结果
 * @param options.onThinkingStart 如果是思考模型,则 onThinkingStart 会触发
 * @param options.onThinkingEnd 如果是思考模型,则 onThinkingEnd 会触发
 */
export const parseReadableStream = async (stream: ReadableStream<Uint8Array<ArrayBufferLike>>, options: {
  output?: 'text' | 'json';
  onStart?: () => void;
  onEnd?: (thinking: string, result: string | Record<string, unknown>) => void;
  onchange?: (thinking: string, result: string) => void;
  onThinkingStart?: () => void;
  onThinkingEnd?: () => void;
}) => {
  const { output = 'text', onStart = () => {}, onEnd = () => {}, onThinkingStart = () => {}, onThinkingEnd = () => {}, onchange = () => {} } = options;
  const reader = stream?.getReader();
  const decoder = new TextDecoder();
  let isReasoning = false;
  let thinking = '';
  let content = '';
  onStart();
  while (true) {
    const { done, value } = await reader!.read();
    if (done) break;
    const text = decoder.decode(value);
    if (text.includes('<thinking>')) {
      isReasoning = true;
      onThinkingStart();
      continue;
    }
    if (text.includes('</thinking>')) {
      isReasoning = false;
      onThinkingEnd();
      continue;
    }
    if (text.includes('[DONE]')) {
      let result = content;
      if (output === 'json') {
        // 取出 ```json 和 ``` 之间的内容
        const jsonContent = result.match(/```json\s*([\s\S]*?)\s*```/)?.[1] || '';
        try {
          result = JSON.parse(jsonContent);
        } catch (error) {
          console.error(error);
        }
      }
      onEnd(thinking, result);
      break;
    }
    if (isReasoning) {
      thinking += text;
    } else {
      content += text;
    }
    onchange(thinking, content);
  }
};

使用#

import { parseReadableStream } from '@/utils/client';

const stream = await fetch('http://localhost:3000/sse', {
  method: 'GET',
}).then(res => res.body);

try {
  parseReadableStream(stream, {
  output: 'json',
  onStart: () => {
    console.log('开始');
  },
  onEnd: (thinking, result) => {
    console.log('结束', thinking, result);
  },
  onchange: (thinking, result) => {
    // 如果 output 为 json 则 result 为 已经解析后对象
    console.log('实时更新', thinking, result);
  },
  onThinkingStart: () => {
    console.log('思考开始');
  },
    onThinkingEnd: () => {
      console.log('思考结束');
    },
  });
} catch (error) {
  console.error(error);
}
SSE 在大语言模型中的流式数据处理简单封装
https://www.mihouo.com/posts/front/sse-for-streaming-data-processing-in-large-language-models/
作者
发布于
2025-04-01
许可协议
CC BY-NC-SA 4.0