文章目录
- 前言
- 一、非流式输出设计
- 二、stream流式输出设计
- 三、手撸一个流式输出项目
- 总结
前言
之前对接过OpenAi大模型的官方API,可以看到它有一个Stream参数,设置成true的时候就是流式的对话输出,现象就是一段一段的往外崩。
官方手册的地址:https://platform.openai.com/docs/api-reference/chat
基本知识点是,在用户的提问以后,大模型底层的TransFromer架构能够根据自然语言理解逐词分析概率,继续预测下一部分的输出,这个我们理解为模型的推流过程。那么就会有两种方式进行输出:
(1)同步:等待这部分推流全部结束,通过HTTP响应一次性发送应答内容。
(2)异步:只要模型有输出,就针对推流的内容进行服务端的推送。
所以其实可以理解大模型本身就是异步推流的,区别在于是否等它全部生成完对话再推送。对于流式的服务端推送,整体上有两种实现方案:WebSocket和SSE
那么我们这个流式对话场景用的哪一种呢?
思考一下WebSocket和SSE的区别:
WebSocket全双工,适用于客户端和服务端双向交互,并且WebSocket是独立于HTTP协议之外的另一套协议,实现起来也相对复杂一些。SSE可以理解为单向的HTTP长连接,只需要前端或者浏览器发起一次HTTP get请求,后续服务端就可以向通道传输流式的数据了,SSE比较适合类似流式输出和股票信息实时推送这种场景。
顺便引入一下常见的几种实时通讯技术的对比
https://blog.csdn.net/xwh3165037789/article/details/128137023
另外其实官方说的比较直接了,设置之后如果数据推流的部分准备好了,就会通过SSE的方式进行服务端推送。
还有需要注意服务端对这个协议的实现要求,协议的描述参考下面地址
https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
https://www.ruanyifeng.com/blog/2017/05/websocket.html
通过PostMan请求返回如下,数据内容在Content里面,最后以DONE结尾。
一、非流式输出设计
stream参数为默认false,所以默认就是同步的。接入OpenAi的API以后,向对应地址发起HTTP请求,同步等待响应的结果就行了。所以本质上还是一次HTTP请求的过程。
但是一般为了进行后续的处理,比如对话做Summary、保存历史记录信息或者利用缓存的话,是可以把这次对话的内容入库的。
二、stream流式输出设计
理解一下整个流程设计:
假如做的是一个对话平台,项目的入口是一个Web页面,通过HTTP的POST请求发起对话,经过网关鉴权身份认证路由到接口,如果body携带的stream参数为true,进入streamChat的处理流程,走主服务转去请求OpenAi的对话接口地址,如果需要进行对话入库的话,这个时候我们的服务端作为大模型服务的客户端,Spring框架的WebFlux部分其实集成好了SseEmitter这个工具方便我们实现SSE功能,需要通过EventSource的factory建立sse连接,并且设置EventSourceListener监听器,重写它的一系列方法,包括onOpen、onEvent、onFailure、onClosed等。比如onEvent这个回调函数的作用就是当大模型输出事件触发的时候,将读取到的数据写入数据库,并且写入是可以通过请求内部的另外的写入HTTP的API触发的。在处理onClosed的时候需要考虑是否计算大模型的TokenSize占用作为附加信息入库和返回。
随后另外开启一个异步任务,线程池里面主要负责以下部分:
(1)心跳检测,SSE中断重连
(2)从数据库里面分页加载数据
(3)通过SSE通道推送数据给前端控制台
三、手撸一个流式输出项目
为了更好理解,我们可以手动实践,写一个这样的项目,做完了的话,估计大家也都能做GPT代理了,项目放在GitHub里面做个了记录。
后端(Spring Boot)
主应用代码
package com.example.ssedemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanBasePackages = {"com.example.*", "web"})
public class SsEdemoApplication {
public static void main(String[] args) {
SpringApplication.run(SsEdemoApplication.class, args);
}
}
Controller层代码
package web;// 导入相关依赖
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/api")
public class EventController {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private int i = 0;
@GetMapping("/stream")
public SseEmitter streamData() {
SseEmitter sseEmitter = new SseEmitter();
// 模拟大模型计算并分批次推送数据
executor.execute(() -> {
while (true) {
try {
// 这里是模拟计算过程,实际项目中可能是从数据库、文件或其他来源获取数据
String dataChunk = "Data Chunk " + ++i;
// 发送一个事件给前端
sseEmitter.send(SseEmitter.event()
.id(Long.toString(i)) // 可选,用于标识事件ID
.name("data") // 事件类型名
.data(dataChunk, MediaType.TEXT_PLAIN));
// 模拟延迟,比如每次间隔1秒
Thread.sleep(1000);
} catch (IOException | InterruptedException e) {
// 关闭 emitter 并处理异常
sseEmitter.completeWithError(e);
}
}
// 计算结束或达到某个条件时关闭 emitter
// sseEmitter.complete();
});
return sseEmitter;
}
}
CROS的配置
package web;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/**")
.allowedOrigins("http://yourfrontenddomain.com") // 替换为您的前端域名
.allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
.allowedHeaders("*")
.allowCredentials(true)
.maxAge(3600);
}
}
在使用SpringBoot3版本的时候可能会出现JDK版本报错,处理方式如下:
https://blog.csdn.net/giveupgivedown/article/details/134841844
https://blog.csdn.net/weixin_47889104/article/details/135289440
如果报错404了,记得添加包扫描路径,类似下面这种
@SpringBootApplication(scanBasePackages = {“com.example.demo”, “com.example.another.package”})
运行起来项目结构可能是下面这样
后端启动以后,直接浏览器访问80端口的API地址,
http://localhost:8080/api/stream
就能看到有流式的输出
其实不写下面的前端代码也行,利用SpringBoot内置的Tomcat直接查看浏览器页面。
前端(使用JavaScript,这里以原生Fetch API为例)
// 创建一个新的EventSource实例,连接到后端提供的SSE端点
var eventSource = new EventSource('http://localhost:8080/api/stream');
// 处理接收到的事件
eventSource.addEventListener('data', function(event) {
var dataChunk = event.data; // 接收的数据
console.log('Received chunk:', dataChunk);
// 在这里你可以更新UI,例如将数据添加到页面上
document.getElementById('output').innerText += dataChunk + '\n';
});
// 监听连接错误
eventSource.onerror = function(error) {
console.error('Error occurred:', error);
};
// 如果需要,在适当的时候可以关闭连接
// eventSource.close();
上述提供的JavaScript代码片段可以在浏览器环境下执行,它创建了一个与服务器建立连接的EventSource对象,用于接收来自服务器的Server-Sent Events(SSE)。只需要确保你在一个支持SSE的现代浏览器环境中,并且服务器端(这里是Spring Boot后端)已经正确设置了SSE服务接口。将这段JavaScript代码嵌入到HTML文档中,例如在 <\script> 标签内部,以便在浏览器加载页面时执行:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Streaming Demo</title>
<style>
#output {
height: 200px; /* 设置高度以限制输出区域 */
overflow-y: scroll; /* 添加滚动条 */
word-wrap: break-word; /* 自动换行 */
border: 1px solid #ccc; /* 边框仅用于区分展示区域 */
padding: 10px;
}
</style>
</head>
<body>
<div id="output"></div>
<script>
// JavaScript 代码放在这里
var eventSource = new EventSource('http://localhost:8080/api/stream');
eventSource.addEventListener('message', function(event) { // 修改为 'message' 事件,SSE 的标准事件类型
var dataChunk = event.data;
var outputElement = document.getElementById('output');
var newLineNode = document.createElement('p'); // 使用段落元素以保证格式清晰
newLineNode.textContent = dataChunk;
outputElement.appendChild(newLineNode);
outputElement.scrollTop = outputElement.scrollHeight; // 滚动到底部,展示最新内容
});
eventSource.onerror = function(error) {
console.error('Error occurred:', error);
};
</script>
</body>
</html>
其实IDEA支持直接打开或者转到浏览器打开的
如果出现这个CROS报错,需要给服务端加跨域支持的配置
关于如何在浏览器执行参考下面
https://www.runoob.com/js/js-chrome.html
https://jingyan.baidu.com/article/e4d08ffdc25e584ed3f60d48.html
https://www.yzktw.com.cn/post/644689.html
先启动服务端的SpringBoot工程,如果跨域配置好了,然后运行前端JS代码
就可以看到服务端推送的流式输出了。这个项目其实可以做得更复杂一点,后续有机会我们继续往里加功能加技术点进去。
其他参考地址
https://blog.csdn.net/fyk844645164/article/details/126680347
https://blog.csdn.net/xwh3165037789/article/details/128137023
https://juejin.cn/post/7209226686373609533
https://blog.csdn.net/weixin_40951507/article/details/135354852
http://wed.xjx100.cn/news/236173.html?action=onClick
https://zhuanlan.zhihu.com/p/674994371
https://blog.csdn.net/qq_45399396/article/details/130972849
https://www.cnblogs.com/1996-Chinese-Chen/p/17913287.html
https://blog.csdn.net/fengtaokelly/article/details/130702235
https://blog.csdn.net/Larry_Lee88/article/details/130995475
总结
以上就是对大模型流式和非流式输出的解析,做完这个真有点想基于GPT代理搞点额外东西了,大模型厂家也挺多的,有人喜欢chatGPT,最近感觉通义千问也挺好,问了很多人是如何利用大模型的,一个技术TL说他经常用大模型来写测试用例,也有人用它帮忙理清思路查Bug或者写Demo或者命令等,未来还是有比较广阔的应用场景的。