一个java项目中,如何使用sse协议,构造一个chatgpt的流式对话接口

news2024/10/7 6:37:41

前言
如何注册chatGPT,怎么和它交互,本文就不讲了;因为网上教程一大堆,而且你要使用的话,通常会再包一个算法服务,用来做一些数据训练和过滤处理之类的,业务服务基本不会直接与原生chatGPT交互。
而下面阐述的,就是业务服务与算法服务的交互。

业务需求-需要实现什么样的功能

想要一个类似与AI问答助手的机器人,可以实现根据某些场景对话提问的功能

  1. 可以直接提问,类似直接使用chatGPT,只不过这个提问的过程会做一些业务通用处理,比如问答数据的归纳反馈、敏感词过滤等等。
  2. 也可以给它喂一篇论文,喂一批近期的资讯,或者是一本小说之类的,根据指定的上下文去进行问答(这种场景需要先投递数据建立相关索引)
  3. ai的回答要求和chatGPT一样保持流式返回(也就是一个字一个字,一边生成一边返回,而不是等整个回答生成完之后一股脑返回)

剖析

重点是流式,这里我们预设算法侧已经有了一个流式返回的接口,整体的交互如下图所示
在这里插入图片描述
下面分别介绍几个关键节点的数据交互设计,仅供参考

q1

简述:页面发送问答数据给业务服务端

{
  "chatId": 233,
  "question": "这篇论文有几个论点?"
}
  • 这里的chatId可以理解为一个对话框id,业务服务端可以根据这个来进行问答归类、批量删除收藏、问答上下文查询等操作。
  • question就是问题的内容

这里需要注意就是,交互数据格式尽可能简单、易拓展,有些产品的页面交互设计的非常复杂,什么历史问答、角色信息之类的,套了一层又一层,其实很多都没必要的,这样前端组装起来也麻烦,也不利于数据的管理与后期功能的拓展。

q2

简述:就是业务服务根据前端传来的问题和所属的对话框,把相应的上下文查询出来(甚至可以前端维护一个是否发送上下文的开关,更动态一点),包装成算法服务所需要的问题数据,发给它。

{
          "chatId":  233,
          "userName":  "张三",
          "messageKey":  "0a795f6a-a029-435f-8d67-6f6f8e078cfe",
          "message":  "这篇论文有几个论点?",
          "chatHistory":  [
          	{
                              "messageKey":  "0a795f6a-a029-435f-8d67-6f6f8e07dasd",
                              "question":  "这篇论文的作者是谁",
                              "answer":  "这篇论文的作者是李四博士。"
                    }
          ],
          "callbackUrl":  "http://xxx/chat/question/callback"
}
  • 上述的messageKey就是一个消息的key,用以常规的接口调试
  • chatHistory就是历史问答记录,即上下文,众所周知chatGPT带不带上下文,回答的结果可能截然不同
  • callbackUrl是业务定义的一个回调接口,用来回调一些算法侧异步生产的信息,比如原文的定位信息、根据当前问题生成的推荐问答等,这些和流式的回答是不会一起返回的,所以额外提供一个接口来接收。

q3和a3

这两步不详述(主要我也不是研究算法模型的哈哈,不是很清楚细节)
我们只需要定义好a2返回的结果即可

a2

简述:主要是算法侧返回给业务服务的同步的流式回答,同时还可能有异步的额外信息的回调(q2的callbackUrl来接收)。所以a2的返回结果分为两个response
response1:同步的流式回答,一般在2-7s内返回第一个字符

data:data:data:... // 省略一些输出
data:data:

流式问答的规范可以参考:流式接口协议规范

response2:异步的拓展信息(可有可无)

{
    "messageKey":"0a795f6a-a029-435f-8d67-6f6f8e078cfe", //必传 回调的消息key,每次问答唯一
    "expand":{
        "recommendedQuestions":[ // 推荐问题
            "这篇论文的主要论点是什么?"
        ],
        "originalIndex":[{
            "sourceId":3432,
            "text":"首先第一个论点是......",
            "textIndex":90
        }]
    }
}

a1

简述:a1要返回的格式很好理解,就是把a2中的两个response组合在一起,需要注意的有几点

  1. a2的response2不一定有,需要设置超时策略,且需要在流式回答最后输出
  2. a2中的response1是流式,response2不是;但输出到a1的时候,需要保证都在流中
  3. 最好需要约定一些event来作为标识符
event:messageKey // 消息key事件

data:0a795f6a-a029-435f-8d67-6f6f8e078cfe

event:answer // 流式回答开始事件

data:"在论文"

data:"中"

data:","

data:"我"

data:"们"

data:"一"

data:"共"
...
data:"几"

data:"个"

data:"论"

data:"点"

event:endTime

data:2024-02-27 17:05:24

event:expand // 拓展信息开始事件,此处等待15s超时

data:{"recommendedQuestions":["这篇论文的主要论点是什么"],"originalIndex":{"sourceId":32133,"text":"首先第一个论点是......","textIndex":90}}

代码

代码省略了一些无关紧要的业务特有的部分,只保留通用的部分
工具类:SSEUtils,用来操作SSE客户端

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * description
 *
 * @author luhui
 * @date 2024/1/25
 */
@Slf4j
public class SSEUtils {
    /**
     * timeout 30min
     */
    private static final Long DEFAULT_TIME_OUT = 30 * 60 * 1000L;
    /**
     * 订阅表
     */
    private static final Map<String, EvaEmitter> EMITTER_MAP = new ConcurrentHashMap<>();

    public static final String MSG_DATA_PREFIX = "data:";
    public static final String MSG_EVENT_PREFIX = "event:";

    /**
     * description: 创建流
     *
     * @param messageKey 本次问答的消息key
     * @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
     * @author luhui
     * @date 2024/2/23 17:09
     */
    public static EvaEmitter getEmitter(String messageKey) {
        if (null == messageKey || "".equals(messageKey)) {
            return null;
        }

        EvaEmitter emitter = EMITTER_MAP.get(messageKey);
        if (null == emitter) {
            emitter = new EvaEmitter(DEFAULT_TIME_OUT);
            EMITTER_MAP.put(messageKey, emitter);
        }

        return emitter;
    }

    /**
     * description: 发消息
     *
     * @param messageKey 本次问答的消息key
     * @param msg        消息
     * @author luhui
     * @date 2024/2/23 17:09
     */
    public static void pushMsg(String messageKey, String msg) throws IOException {
        EvaEmitter emitter = EMITTER_MAP.get(messageKey);
        if (null != emitter) {
            emitter.send(EvaEmitter.event().data(msg));
        }
    }

    public static void pushEvent(String messageKey, String eventDesc) throws IOException{
        EvaEmitter emitter = EMITTER_MAP.get(messageKey);
        if (null != emitter) {
            emitter.send(EvaEmitter.event().name(eventDesc));
        }
    }

    /**
     * description: 关闭流
     *
     * @param messageKey 本次问答的消息key
     * @author luhui
     * @date 2024/2/23 17:08
     */
    public static void closeEmitter(String messageKey) {
        EvaEmitter emitter = EMITTER_MAP.get(messageKey);
        if (null != emitter) {
            try {
                emitter.complete();
                EMITTER_MAP.remove(messageKey);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

工具类:SSEClient ,用来获取SSE流

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;

/**
 * description
 *
 * @author luhui
 * @date 2024/1/25
 */
@Slf4j
public class SSEClient {
    // timeout
    public static Integer DEFAULT_TIME_OUT = 60 * 1000;

    /**
     * 获取SSE输入流
     */
    public static InputStream getSseInputStream(String urlPath, JSONObject param, int timeoutMill) {
        HttpURLConnection urlConnection = null;
        try {
            urlConnection = getHttpURLConnection(urlPath, timeoutMill);
            putData(urlConnection, param);
            InputStream inputStream = urlConnection.getInputStream();
            return new BufferedInputStream(inputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 读流数据
     */
    public static void readStream(InputStream is, MsgHandler msgHandler) throws IOException {
        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
        try {
            String line = "";
            while ((line = reader.readLine()) != null) {
                if ("".equals(line)) {
                    continue;
                }
                msgHandler.handleMsg(line);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 目前这里抛出的显式异常来自与用户手动关闭的连接,此时服务端与算法端的连接也捕获并关闭,无需存储
        } finally {
            // 服务器端主动关闭时,客户端手动关闭
            reader.close();
            is.close();
        }
    }

    private static HttpURLConnection getHttpURLConnection(String urlPath, int timeoutMill) throws IOException {
        URL url = new URL(urlPath);
        HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
        urlConnection.setDoOutput(true);
        urlConnection.setDoInput(true);
        urlConnection.setUseCaches(false);
        urlConnection.setRequestMethod("POST");
        urlConnection.setRequestProperty("Connection", "Keep-Alive");
        urlConnection.setRequestProperty("Charset", "UTF-8");
        urlConnection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
        urlConnection.setRequestProperty("accept", "text/event-stream");
        // 读过期时间
        urlConnection.setReadTimeout(timeoutMill);
        return urlConnection;
    }

    public static void putData(HttpURLConnection connection, JSONObject jsonStr) throws IOException {
        byte[] writebytes = jsonStr.toJSONString().getBytes();
        connection.setRequestProperty("Content-Length", String.valueOf(writebytes.length));
        DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
        wr.write(jsonStr.toJSONString().getBytes());
        wr.flush();
        wr.close();
    }

    /**
     * 消息处理接口
     */
    public interface MsgHandler {
        void handleMsg(String line) throws IOException;
    }
}

工具类:EvaEmitter,用来封装一些流信息

import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * description EvaEmitter
 *
 * @author luhui
 * @date 2024/02/22
 */
@Data
public class EvaEmitter extends SseEmitter {
    public EvaEmitter(Long timeout) {
        super(timeout);
    }

    @ApiModelProperty("版本id")
    private Long versionId;
    @ApiModelProperty("用户问题")
    private String question;
    @ApiModelProperty("唯一消息key")
    private String messageKey;
    @ApiModelProperty("当前用户")
    private Long currentUid;
    @ApiModelProperty("当前用户名")
    private String currentUserName;
    @ApiModelProperty("项目id")
    private Long projectId;
    @ApiModelProperty("ai回答")
    private String aiAnswer;
    @ApiModelProperty("拓展信息")
    private JSONObject expand;
    @ApiModelProperty("错误信息")
    private JSONObject error;
    @ApiModelProperty("提问开始时间")
    private DateTime startTime;

    public JSONObject getHistory() {
        JSONObject history = new JSONObject();
        history.put("question", question);
        history.put("answer", aiAnswer);
        history.put("expand", expand);
        history.put("error", error);
        return history;
    }
}

具体的chat交互方法

	String messageKey = UUID.randomUUID().toString();
	EvaEmitter emitter = SSEUtils.getEmitter(messageKey);
	emitter.setProjectId(111);
	// 初始化相关字段
	
	sseService.chatTransfer(messageKey);
    @Async
    @Override
    public void chatTransfer(String messageKey) {
        EvaEmitter emitter = SSEUtils.getEmitter(messageKey);

        // 正式参数
        JSONObject params = new JSONObject(true);
        params.put("versionId", emitter.getVersionId().toString());
        params.put("userName", emitter.getCurrentUserName());
        params.put("messageKey", emitter.getMessageKey());
        params.put("message", emitter.getQuestion());
        params.put("chatHistory", chatHistoryService.getChatHistory(emitter));
        params.put("callbackUrl", gateway + "/xxxchat/question/callback");

        InputStream inputStream = SSEClient.getSseInputStream(aiChatUrl, params, SSEClient.DEFAULT_TIME_OUT);
        try {
            StringBuilder answer = new StringBuilder();
            SSEUtils.pushEvent(messageKey, "messageKey");
            SSEUtils.pushMsg(messageKey, messageKey);
            SSEUtils.pushEvent(messageKey, "answer");
            AtomicReference<Boolean> sdkError = new AtomicReference<>(false);
            SSEClient.readStream(inputStream, line -> {
                log.info("messageKey:{}, chatTransfer:{}", emitter.getMessageKey(), line);
                String message = "";
                if (sdkError.get()) {
                    String errorStr = line.split(SSEUtils.MSG_DATA_PREFIX)[1].trim();
                    if (StringUtils.isNotBlank(errorStr)) {
                        // 做一些错误处理
                        message = "算法未知错误,请稍后再试";
                        emitter.setError(message);
                    }
                } else if (line.contains(SSEUtils.MSG_DATA_PREFIX)) {
                    message = line.split(SSEUtils.MSG_DATA_PREFIX)[1].trim();
                } else if (line.contains(SSEUtils.MSG_EVENT_PREFIX)) {
                    sdkError.set(true);
                } else {
                    message = "";
                }
                if (StringUtils.isNotBlank(message)) {
                    answer.append(message.replaceAll("\"", ""));
                    SSEUtils.pushMsg(messageKey, message);
                }
            });
            emitter.setAiAnswer(answer.toString());
            // 保存当前问答消息,自行实现
            ChatHistoryEntity message = chatHistoryService.saveHistory(messageKey);
            SSEUtils.pushEvent(messageKey, "endTime");
            SSEUtils.pushMsg(messageKey, DateUtil.formatDateTime(message.getEndTime()));
            SSEUtils.pushEvent(messageKey, "expand");
            chatHistoryService.pushExpand(messageKey);
        } catch (IllegalStateException | IOException e) {
            log.error("pushMsg error, web端流已被关闭");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 消息发送完或者出现异常的话,存储当前的消息,然后关闭流
            try {
                chatHistoryService.saveHistory(messageKey);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                SSEUtils.closeEmitter(messageKey);
            }
        }
    }
	@Override
    @Retryable(value = Exception.class, maxAttempts = 6, backoff = @Backoff(delay = 500, multiplier = 2))
    public void pushExpand(String messageKey) throws IOException {
       // 如果异步的拓展信息,即a2中的response2回调成功的话,会存储到这里
        Object expandObj = redisService.hGet(RedisConstants.CHAT_AI_RECOMMENDED_QUESTIONS, messageKey);
        if (expandObj == null) {
            log.error("未获取到相关拓展信息, 稍后重试");
            throw new RuntimeException("未获取到相关拓展信息");
        } else {
            JSONObject expand = JSONObject.parseObject(expandObj.toString());
            EvaEmitter emitter = SSEUtils.getEmitter(messageKey);
            emitter.setExpand(expand);
            SSEUtils.pushMsg(messageKey, expand.toJSONString());
            log.info("messageKey:{}, chatTransfer:{}", emitter.getMessageKey(), expand);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1618927.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

刷代码随想录有感(44):对称二叉树

题干&#xff1a; 代码&#xff1a; class Solution { public:bool compare(TreeNode* left, TreeNode* right){//传入左右子树if(left NULL && right ! NULL) return false;//子else if(left ! NULL && right NULL) return false;//子else if(left NULL &…

8.4.3 使用3:配置单臂路由实现VLAN间路由

1、实验目的 通过本实验可以掌握&#xff1a; 路由器以太网接口上的子接口配置和调试方法。单臂路由实现 VLAN间路由的配置和调试方法。 2、实验拓扑 实验拓扑如下图所示。 3、实验步骤 &#xff08;1&#xff09;配置交换机S1 S1(config)#vlan 2 S1(config-vlan)#exit S…

用户请求经过哪些处理(公网)

DNS服务器之间协作&#xff1a; 递归DNS查询&#xff1a;用户的请求首先发送到递归DNS服务器。 查询根DNS服务器&#xff1a;递归DNS服务器查询根DNS服务器&#xff0c;以找到管理.com顶级域的TLD DNS服务器。 查询TLD DNS服务器&#xff1a;根DNS服务器响应带有TLD DNS服务器…

40. UE5 RPG给火球术增加特效和音效

前面&#xff0c;我们将火球的转向和人物的转向问题解决了&#xff0c;火球术可以按照我们的想法朝向目标发射。现在&#xff0c;我们解决接下来的问题&#xff0c;在角色释放火球术时&#xff0c;会产生释放音效&#xff0c;火球也会产生对应的音效&#xff0c;在火球击中目标…

Prompt Engineering,提示工程

什么是提示工程&#xff1f; 提示工程也叫【指令工程】。 Prompt发送给大模型的指令。比如[讲个笑话]、[用Python编个贪吃蛇游戏]、[给男/女朋友写情书]等看起来简单&#xff0c;但上手简单精通难 [Propmpt]是AGI时代的[编程语言][Propmpt]是AGI时代的[软件工程][提示工程]是…

ROS 话题订阅模型之自定义消息类型 C++实现

ROS 话题订阅模型之自定义消息类型 1.自定义消息类型好处 ROS提供了许多标准的消息类型&#xff0c;如 std_msgs/String、sensor_msgs/Image 等&#xff0c;涵盖了很多常见的数据类型和传感器数据。但是&#xff0c;在实际的开发中&#xff0c;我们经常会遇到需要传输的数据类…

云计算中的过度授权:安全隐患与应对策略

云计算凭借其弹性、可扩展等优势&#xff0c;已经成为诸多企业组织拓展业务的重要基础设施之一。然而&#xff0c;与传统IT架构相比&#xff0c;云计算环境的安全管理也面临着新的挑战。过度授权 (Overprivileging) 便是云安全领域亟待解决的主要问题之一&#xff0c;本文将带领…

CPPTest实例分析(C++ Test)

1 概述 CppTest是一个可移植、功能强大但简单的单元测试框架&#xff0c;用于处理C中的自动化测试。重点在于可用性和可扩展性。支持多种输出格式&#xff0c;并且可以轻松添加新的输出格式。 CppTest下载地址&#xff1a;下载地址1  下载地址2 下面结合实例分析下CppTest如…

【Linux】解决ubuntu20.04版本插入无线网卡没有wifi显示【无线网卡Realtek 8811cu】

ubuntu为Realtek 8811cu安装驱动&#xff0c;解决wifi连接问题 1、确认无线网卡的型号-Realtek 8810cu2、下载并配置驱动 一句话总结&#xff1a;先确定网卡的型号&#xff0c;然后根据网卡的型号区寻找对应的驱动下载&#xff0c;下载完成之后在ubuntu系统中进行编译&#xff…

【STM32+HAL+Proteus】系列学习教程4---GPIO输入模式(独立按键)

实现目标 1、掌握GPIO 输入模式控制 2、学会STM32CubeMX配置GPIO的输入模式 3、具体目标&#xff1a;1、按键K1按下&#xff0c;LED1点亮&#xff1b;2、按键K2按下&#xff0c;LED1熄灭&#xff1b;2、按键K3按下&#xff0c;LED2状态取反&#xff1b; 一、STM32 GPIO 输入…

C语言 字符类型

下面 我们来说字符类型 我们来看这个 保险单 金额 和 总额 都可以用数字类型 而性别则需要字符型 字符数据的存储 – ASCI码 字符类型 char 就是专为存储字符(如字母&#xff0c;标点和数字)而设计的类型。 使用单引号包含单个字符或转义字符去表示一个 char 类型的常量。 …

mac电脑搭建vue环境(上篇)

第一步&#xff1a;mac电脑要有homebrew&#xff0c;如何安装homebrew 点击下方 MAC安装homebrew-CSDN博客 第二步&#xff1a;homebrew安装node.js 第三步&#xff1a;安装npm 第四步&#xff1a;安装webpack 第五步&#xff1a;安装vue脚手架 第六步&#xff1a;可以在…

解决双击PDF文件出现打印的问题【Adobe DC】

问题描述 电脑安装Adobe Acrobat DC之后&#xff0c;双击PDF文件就会出现打印&#xff0c;而无法直接打开。 右键PDF文件就会发现&#xff0c;第一栏出现的不是用Adobe打开&#xff0c;而是打印。 重装软件多次仍然无法解决。 原因 右键菜单被改写了。双击其实是执行右键菜…

idea上传项目到gitee(码云)

1、打开码云&#xff0c;新建仓库 2、创建 3、这就是创建成功的页面 4、复制仓库地址&#xff0c;后面需要用到 2、打开我们的项目&#xff1a;例如我现在的项目 1、idea创建git仓库 2、选择我们项目文件夹的目录 3、查看文件是否变色&#xff0c;变色表示成功了 4、添加到缓…

Amazon云计算AWS之[2]弹性计算云EC2

文章目录 说明EC2基本架构Amazon机器映象&#xff08;AMI&#xff09;实例&#xff08;Instance&#xff09;弹性块存储&#xff08;EBS&#xff09; EC2关键技术地理区域和可用区域EC2通信机制弹性负载均衡监控服务自动缩放服务管理控制台 EC2安全及容错机制EC2弹性IP地址 说明…

网盘——查看文件

本文主要讲解文件操作过程中&#xff0c;查看文件如何实现&#xff0c;实现步骤如下&#xff1a; 1、实现步骤&#xff1a; A、首先客户端发送查看请求&#xff08;包含目录信息&#xff09; B、服务器将文件名字还有文件的类型发送给客户端&#xff08;只发送文件的名字&am…

FebHost:科技企业如何规划并注册.AI域名?

为确保企业使用.AI域名的方式准确反映其对人工智能技术的关注&#xff0c;企业应考虑以下步骤&#xff1a; 了解法律和合规要求&#xff1a; 第一步是了解与 .AI 域名相关的独特法律和合规要求。由于.AI域名源于安圭拉&#xff0c;企业必须遵守安圭拉的限制和法律规定。这包括…

简单谈谈URL过滤在网络安全中的作用

用户花在网络上的时间越来越多&#xff0c;浏览他们最喜欢的网站&#xff0c;点击电子邮件链接&#xff0c;或利用各种基于网络的 SaaS 应用程序供个人和企业使用。虽然这种不受约束的网络活动对提高企业生产力非常有用&#xff0c;但也会使组织面临一系列安全和业务风险&#…

学习Rust第14天:HashMaps

今天我们来看看Rust中的hashmaps&#xff0c;在 std::collections crate中可用&#xff0c;是存储键值对的有效数据结构。本文介绍了创建、插入、访问、更新和迭代散列表等基本操作。通过一个计算单词出现次数的实际例子&#xff0c;我们展示了它们在现实世界中的实用性。Hashm…

笔记本电脑耗电和发热比较厉害怎么处理

工作中会遇到有同事反馈笔记本电脑耗电和发热比较厉害&#xff0c;主要检查以下几个地方 1、CPU频率 很多人觉得是cpu使用率高就代表电脑跑得快&#xff0c;发热量就大&#xff0c;其实不是的&#xff0c;主要是看的cpu频率&#xff0c;频率越高&#xff0c;电脑发热量越大。如…