LLM流式方案解决方案和客户端解决方案

news2024/11/15 19:51:15

背景

接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。

本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码)

什么是SSE

SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP的服务器到客户端的单向通信技术,用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义,并且其定义被包含在HTML Living Standard中。

SSE允许服务器通过HTTP连接向客户端发送数据,而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序,例如实时股票报价、即时通讯、实时监控等场景。

基本上,SSE由以下要素组成:

  1. 服务器:负责向客户端发送事件流的HTTP服务器。
  2. 客户端:通过浏览器中的EventSource API与服务器建立连接,接收服务器发送的事件。
  3. 事件流(Event Stream):服务器向客户端发送的数据流,格式为纯文本,使用一种特定的格式进行编码,例如MIME类型为"text/event-stream"。

SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而,它也有一些限制,例如不能支持双向通信,与WebSocket相比,SSE的实时性稍逊一筹。

Java框架说明

pom 文件引入的核心依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>aip.com</groupId>
    <artifactId>aip-com</artifactId>
    <version>0.0.1</version>
    <name>aip-com</name>
    <description>aip com project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Java后端核心代码

本方法是标准的SSE协议标准


    private final ExecutorService executorService = Executors.newFixedThreadPool(5);

    
    /**
     * 会话请求
     *
     * @return String
     */
    @PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)
    @Operation(summary = "会话请求")
    public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        SseEmitter emitter = new SseEmitter();

        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 向客户端发送事件
                    emitter.send(
                            SseEmitter.event()
                                    .name("message")
                                    .data(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                            .ended(false)
                                             .message(String.valueOf(i))
                                            .build()))
                    );
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public SseEmitter stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        SseEmitter emitter = new SseEmitter();

        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 向客户端发送事件
                    emitter.send(
                            SseEmitter.event()
                                    .name("message")
                                    .data(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                            .ended(false)
                                             .message(String.valueOf(i))
                                            .build()))
                    );
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;

Flux 和 Flowable 对比

Flux 和 Flowable 都是响应式编程库中的数据流类型,用于处理异步和基于事件的流式数据。它们分别来自于不同的库,Flux 是 Reactor 库的一部分,而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别:

  1. 库的来源:

    • Flux 来自于 Reactor 库,是 Reactor 的核心组件之一,React的核心模块用于基于反应式流规范处理数据流。
    • Flowable 来自于 RxJava 库,是 RxJava 的核心类之一,RxJava 是 Java 平台的反应式扩展库,用于处理异步和基于事件的编程。
  2. 背压策略:

    • Flux 默认采用背压策略为 BUFFER,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。
    • Flowable 默认也是支持背压的,但是相比 Flux,Flowable 提供了更多的背压策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
  3. 反应式规范:

    • Flux 遵循 Reactor 库的反应式流规范,使用 Mono 和 Flux 来表示异步流和单个结果。
    • Flowable 遵循 RxJava 库的反应式流规范,使用 Observable 和 Flowable 来表示异步流和单个结果。
  4. 生态系统:

    • Reactor 生态系统主要用于基于 Reactor 的应用程序。
    • RxJava 生态系统则更广泛,它是 ReactiveX 的一部分,支持多种语言和平台,并有许多衍生项目。

总的来说,Flux 和 Flowable 在概念上很相似,都用于处理异步和基于事件的流式数据,但它们来自于不同的库,并且有一些细微的区别,如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。

Java后端Flowable方式

本方法是Flowable方式,非标准流式规则

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public Flowable<String> stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);

        Flowable<String> typingFlow = Flowable.create(emitter -> {
            executorService.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {

                        emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                .ended(false)
                                .message(String.valueOf(i))
                                .build()));

                        Thread.sleep(1000);
                    }
                    emitter.onComplete();
                } catch (Exception e) {

                }
            });
        }, BackpressureStrategy.BUFFER);

        return typingFlow;
    }

Java后端Flux方式

本方法是Flux方式,非标准流式规则

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public Flux<String> stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);

        Flux<String> typingFlow = Flux.create(emitter -> {
            executorService.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {

                        emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                .ended(false)
                                .message(String.valueOf(i))
                                .build()));

                        Thread.sleep(1000);
                    }
                    emitter.complete();
                } catch (Exception e) {

                }
            });
        }, FluxSink.OverflowStrategy.BUFFER);

        return typingFlow;
    }
}

HTML 客户端接收示例程序

function EventSourceGetRequest() SSE 默认方法,只支持GET请求,适合演示用途以及后端包装好服务

function fetchPostRequest() fetch POST 请求实现SSE,支持所有请求(POST,GET等)以及传递参数

sse.html 内容

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SEE Example</title>
    <script>

        // SSE 默认方法,只支持GET请求
        function EventSourceGetRequest() {
            if(typeof(EventSource)!=="undefined")
            {
                var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');
                eventSource.onmessage = function(event)
                {
                    document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);
                    console.log(event)
                };
            }
            else
            {
                document.getElementById("result").innerHTML="抱歉,你的浏览器不支持 server-sent 事件...";
            }
        }

        // fetch POST 请求实现SSE
        function fetchPostRequest() {
            fetch('http://127.0.0.1:8090/v1/chat/completions', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify({}),
            })
            .then(response => {
                // 检查响应是否成功
                if (!response.ok) {
                    throw new Error('Network response was not ok');
                }
                // 返回 ReadableStream 对象
                return response.body;
            })
            .then(stream => {
                // 创建一个新的文本解码器
                const decoder = new TextDecoder();
                
                // 获取一个 reader 对象
                const reader = stream.getReader();
                
                let chunk = ''
                
                // 逐块读取数据
                function read() {
                    reader.read().then(({ done, value }) => {
                        if (done) {
                            document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);
                            console.log('Stream has ended');
                            return;
                        }
                        // 将数据块转换为字符串并显示
                        const tmp = decoder.decode(value, { stream: true });
                        if (tmp.startsWith('event:') && chunk!='') {
                            document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);
                            chunk = tmp
                        }else{
                            chunk = chunk + tmp
                        }
                        // 继续读取下一块数据
                        read();
                    });
                }
                // 开始读取数据
                read();
            })
            .catch(error => {
                // 处理错误
                console.error('There was a problem with the fetch operation:', error);
            });
        }

        // EventSourceGetRequest();
        fetchPostRequest();
    </script>
</head>
<body>
	<h1>SEE result</h1>
    <div id="result"></div>
</body>
</html>
  • 标准SSE示例

标准SSE

  • 扩展SSE

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1528255.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【靶机测试--PHOTOGRAPHER: 1【php提权】】

前期准备 靶机下载地址&#xff1a; https://vulnhub.com/entry/photographer-1%2C519/ 信息收集 nmap 扫描同网段 ┌──(root㉿kali)-[/home/test/桌面] └─# nmap -sP 192.168.47.0/24 --min-rate 3333 Starting Nmap 7.92 ( https://nmap.org ) at 2024-03-19 07:37 …

数学建模软件及算法模型典型问题汇总

一、 软件篇 编程、MATLAB&#xff08;物理建模&#xff09;、python&#xff08;数据分析&#xff09;、R、其他&#xff08;SPSS、Stata、Origin&#xff09; 这里其实还有一个 Lingo 软件&#xff0c;不过我不推荐&#xff0c;有更好的替代方案&#xff0c;就是 Yalmip 工…

React的基本使用

安装VSCode插件 ES7 Reactopen in browser React基本使用 基本使用步骤 引入两个JS文件&#xff08; 注意引入顺序 &#xff09; <!-- react库, 提供React对象 --> //本地 <script src"../js/react.development.js"></script> //线上 //<scr…

理解和调试深度学习模型:探索人工智能可解释性方法

关键要点 深度学习模型可能非常复杂&#xff0c;理解其内部原理可能具有挑战性在机器学习中&#xff0c;提供可解释性的方法有多种为了确保这些自动化系统的可靠性&#xff0c;可以使用可解释性工具来深入了解模型的决策过程模型不可知的可解释性工具在不同模型之间是模块化的…

如何判断竞价托管代运营公司或SEM营销优化师水平高低

竞价托管代运营公司或营销优化师的能力评估需要从多个角度来考虑&#xff0c;通常有以下几种评估方式&#xff0c;一般来说&#xff0c;按照遨游建站多年经验来分析评估比较靠谱&#xff0c;对于不懂SEM的人来说也最适合&#xff0c;不需要许多专业的知识&#xff0c;也能判断出…

深度强化学习05策略学习

蒙特卡洛近似 梯度上升 总结

C语言项目:数组与函数实践:扫雷游戏

目录 目录&#xff1a; 1.扫雷游戏分析与设计 1.1扫雷游戏的功能说明&#xff1a; 1.1.1使用控制台实现经典扫雷的游戏 1.1.2游戏可以通过菜单实现继续玩或者退出游戏 1.1.3扫雷棋盘是9*9的格子 1.1.4默认随机布置10个雷 1.1.5 可以排查雷 2.扫雷游戏的代码实现 1.遇到的问题…

Latex插入pdf图片,去除空白部分

目录 参考链接&#xff1a; 流程&#xff1a; 参考链接&#xff1a; ​科研锦囊之Latex-如何插入图片、表格、参考文献 http://t.csdnimg.cn/vpSJ3 流程&#xff1a; Latex的图片插入支持PDF文件&#xff0c;这里笔者建议都使用PDF文件进行图片的插入&#xff0c;因为PDF作…

SinoDB数据库运行分析

SinoDB数据库运行主要从数据库互斥资源等待、数据库写类型、备份文件有效性、Chunk状态等15个方向进行分析&#xff0c;具体说明如下&#xff1a; 一、数据库互斥资源等待 检查项目 数据库互斥资源等待 检查命令 onstat -g con |head -20 说明 onstat -g con 查看目前数据处…

【C++练级之路】【Lv.14】二叉搜索树(进化的二叉树——BST)

快乐的流畅&#xff1a;个人主页 个人专栏&#xff1a;《C语言》《数据结构世界》《进击的C》 远方有一堆篝火&#xff0c;在为久候之人燃烧&#xff01; 文章目录 引言一、二叉搜索树介绍二、二叉搜索树的模拟实现2.1 结点2.2 成员变量2.3 默认成员函数2.3.1 constructor2.3.2…

汽车功能安全整体方法

摘 要 ISO26262道路车辆功能安全标准已经制定实践了多年&#xff0c;主要目标是应对车辆的电子和电气&#xff08;E/E&#xff09;系统失效。该方法践行至今&#xff0c;有些系统功能安全方法已经成熟&#xff0c;例如电池管理系统&#xff08;BMS&#xff09;&#xff0c;并且…

MindGraph:文字生成知识图

欢迎来到MindGraph&#xff0c;这是一个概念验证、开源的、以API为先的基于图形的项目&#xff0c;旨在通过自然语言的交互&#xff08;输入和输出&#xff09;来构建和定制CRM解决方案。该原型旨在便于集成和扩展。以下是关于X的公告&#xff0c;提供更多背景信息。开始之前&a…

每日OJ题_牛客HJ75 公共子串计算(IO型OJ)

目录 牛客HJ75 公共子串计算 解析代码 牛客HJ75 公共子串计算 公共子串计算_牛客题霸_牛客网 解析代码 #include <iostream> using namespace std; int main() {string str1 "", str2 "";cin >> str1 >> str2;int n1 str1.size()…

【Selenium(一)】

简介 Selenium是一个开源的自动化测试工具&#xff0c;主要用于Web应用程序的自动化测试。它支持多种浏览器&#xff0c;包括Chrome、Firefox、Internet Explorer等&#xff0c;以及多种编程语言&#xff0c;如Java、Python、C#、Ruby等&#xff0c;使得它成为Web自动化测试中…

一个用稳压二极与MOS管构成的过压保护电路

一个用稳压二极与MOS管构成的过压保护电路 如图&#xff0c;利用稳压管和PMOS管组成一个保护电路&#xff0c;起过压保护和防反接的的作用。 分析&#xff1a; 1.当输入端是5V左右的电压的时候&#xff08;VDD-IN5V&#xff09;&#xff0c;稳压二极管D1没有被反向击穿&#…

【异常处理】SpringMVC无法跳转视图问题

浏览器发送请求给控制器&#xff0c;但是结果是404报错&#xff0c;又试了一下返回json字符串&#xff0c;json可以获取到&#xff0c;所以应该springmvc出了问题。 查看controller&#xff0c;发现无法加载视图

RealBasicVSR使用记录

对各种场景图片、视频超分结果都很不错的模型。 paper&#xff1a;https://arxiv.org/pdf/2111.12704.pdf code&#xff1a;https://github.com/ckkelvinchan/RealBasicVSR 一、使用步骤 1. git clone https://github.com/ckkelvinchan/RealBasicVSR.git 2. 我的环境已安装…

问界汽车提车全流程及注意点【伸手党福利】

问界汽车提车全流程及注意点 目录 说明为没买车和没提车的小伙伴提供参考全程必须车主办理&#xff08;人必须在场&#xff09;&#xff0c;如果不是车主授权书很难办。时间&#xff1a;提车用时4小时&#xff0c;2个人 提车提前联系-交付专员做好需求调研当天-到店验车-千万不…

并发编程Semaphore(信号量)浅析

目录 一、简介二、API三、使用3.1 demo13.1 demo2 四、适用场景 一、简介 Semaphore&#xff08;信号量&#xff09;是 Java 中用于控制同时访问特定资源的线程数量的工具类。Semaphore 维护了一组许可证&#xff0c;线程在访问资源之前必须先获取许可证&#xff0c;访问完毕后…

前端 -- 基础 表单标签 -- 表单域

表单域 # 表单域是一个包含 表单元素 的区域 在 HTML 标签中&#xff0c; <form> 标签 用于定义表单域&#xff0c; 以实现用户信息的收集和传递 简单通俗讲&#xff0c; 就是 <form> 会把它范围内的表单元素信息提交给后台&#xff08;服务器) 对于上面讲…