SpringBoot项目整合智谱AI + SSE推送流式数据到前端展示 + RxJava得浅显理解

news2025/1/22 16:49:52

项目背景:

项目背景是一个这个AI答题应用平台,我引入AI得作用就是让AI根据我指定得这个题目的标题和描述来生成一些列的题目。(主要功能是这个,但是还用了AI给我评分,不过这个功能比较简单,在本文就简单介绍)。

引入并整合智谱AI功能:

引入:

首先还是先贴一个官方文档:

智谱AI开放平台 (bigmodel.cn)

大概的步骤就是:

引入依赖

直接复制官方文档的客户端代码(同步调用)

引入之后我们可以不着急去在项目中使用,可以先跑一个单元测试

@SpringBootTest
public class AItest {


    private final String Key = "自己的Key";
    //测试是否可以连接
    @Test
    public void test() {
        ClientV4 client = new ClientV4.Builder(Key).build();
        List<ChatMessage> messages = new ArrayList<>();
        ChatMessage chatMessage = new ChatMessage(ChatMessageRole.USER.value(), "作为一名营销专家,请为智谱开放平台创作一个吸引人的slogan");
        messages.add(chatMessage);
//        String requestId = String.format(requestIdTemplate, System.currentTimeMillis());

        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(Constants.ModelChatGLM4)
                .stream(Boolean.FALSE)
                .invokeMethod(Constants.invokeMethod)
                .messages(messages)
                .build();
        ModelApiResponse invokeModelApiResp = client.invokeModelApi(chatCompletionRequest);
        try {
            System.out.println("model output:" + invokeModelApiResp.getData().getChoices().get(0));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

整体的代码逻辑:

首先先创建一个AI的客户端

接着拼接需要传给AI的信息列表,这里是ChatMessage

然后就是拼接这个需要发送的请求了

        里面有蛮多的参数

        model:模型,这个直接用最新的就行

        stream:就是指定流式还是非流式

        调用方法:这个默认就行

        message:就是你要发送的信息

最后发送调用接收返回值

对AI的这种工具的一个封装和简化的思想

对于这种工具类,就可以把这个封装成自己的SDK,以后自己直接引入即可

1:编写一个配置类将这个AI的客户端交给IOC容器管理:

@Configuration
@Data
public class AiConfig {

    private final String aiKey = "自己的key";


    @Bean
    public ClientV4 clientV4() {
        return new ClientV4.Builder(aiKey).build();
    }
}

2:编写这个工具类:

这里叫不叫工具类都可以,鱼皮老师的定义是需要引入Bean对象的可以在Manager包下管理

首先可以只有一个最简单的方法:

我们可以利用Java的重载来一层一层向上封装

我们可以考虑到,我们和AI对话,一般都是直接问他对吧,所以作为我们用户,我们最乐意的应该是直接传一个文本,但是这里我们还需要注意,我们为了让这个返回的答案更准确,我们可以封装一个系统的prompt,给AI一个背景。

所以我们向上封装:

我们在这个方法中只需要传系统的prompt和用户的prompt就可以了

还可以再往上封装

我们可以根据传递的参数

我们可以封装同步和异步:

其它都是同理了,直接看代码:

package com.ljh.aiplatform.manager;

import com.ljh.aiplatform.common.ErrorCode;
import com.ljh.aiplatform.exception.BusinessException;
import com.zhipu.oapi.ClientV4;
import com.zhipu.oapi.Constants;
import com.zhipu.oapi.service.v4.model.*;
import io.reactivex.Flowable;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * AI的请求
 */
@Component
public class AiManager {

    @Resource
    private ClientV4 clientV4;

    // 稳定的随机数
    private static final float STABLE_TEMPERATURE = 0.05f;

    // 不稳定的随机数
    private static final float UNSTABLE_TEMPERATURE = 0.99f;

    /**
     * 同步请求(答案不稳定)
     *
     * @param systemMessage
     * @param userMessage
     * @return
     */
    public String doSyncUnstableRequest(String systemMessage, String userMessage) {
        return doRequest(systemMessage, userMessage, Boolean.FALSE, UNSTABLE_TEMPERATURE);
    }

    /**
     * 同步请求(答案较稳定)
     *
     * @param systemMessage
     * @param userMessage
     * @return
     */
    public String doSyncStableRequest(String systemMessage, String userMessage) {
        return doRequest(systemMessage, userMessage, Boolean.FALSE, STABLE_TEMPERATURE);
    }

    /**
     * 同步请求
     *
     * @param systemMessage
     * @param userMessage
     * @param temperature
     * @return
     */
    public String doSyncRequest(String systemMessage, String userMessage, Float temperature) {
        return doRequest(systemMessage, userMessage, Boolean.FALSE, temperature);
    }

    /**
     * 通用请求(简化消息传递)
     *
     * @param systemMessage
     * @param userMessage
     * @param stream
     * @param temperature
     * @return
     */
    public String doRequest(String systemMessage, String userMessage, Boolean stream, Float temperature) {
        List<ChatMessage> chatMessageList = new ArrayList<>();
        ChatMessage systemChatMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), systemMessage);
        chatMessageList.add(systemChatMessage);
        ChatMessage userChatMessage = new ChatMessage(ChatMessageRole.USER.value(), userMessage);
        chatMessageList.add(userChatMessage);
        return doRequest(chatMessageList, stream, temperature);
    }

    /**
     * 通用请求
     *
     * @param messages
     * @param stream
     * @param temperature
     * @return
     */
    public String doRequest(List<ChatMessage> messages, Boolean stream, Float temperature) {
        // 构建请求
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(Constants.ModelChatGLM4)
                .stream(stream)
                .temperature(temperature)
                .invokeMethod(Constants.invokeMethod)
                .messages(messages)
                .build();
        try {
            ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);
            return invokeModelApiResp.getData().getChoices().get(0).toString();
        } catch (Exception e) {
            e.printStackTrace();
            throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
        }
    }

    /**
     * 通用请求(流式输出)(简化消息传递)
     * @param systemMessage
     * @param userMessage
     * @param temperature
     * @return
     */
    public Flowable<ModelData> doStreamRequest(String systemMessage, String userMessage,Float temperature) {
        List<ChatMessage> chatMessageList = new ArrayList<>();
        ChatMessage systemChatMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), systemMessage);
        chatMessageList.add(systemChatMessage);
        ChatMessage userChatMessage = new ChatMessage(ChatMessageRole.USER.value(), userMessage);
        chatMessageList.add(userChatMessage);
        return doStreamRequest(chatMessageList,temperature);
    }

    /**
     * 通用请求(流式输出)
     * @param messages
     * @param temperature
     * @return
     */
    public Flowable<ModelData> doStreamRequest(List<ChatMessage> messages,Float temperature) {
        // 构建请求
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(Constants.ModelChatGLM4)
                .stream(Boolean.TRUE)
                .temperature(temperature)
                .invokeMethod(Constants.invokeMethod)
                .messages(messages)
                .build();
        try {
            ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);
            final Flowable<ModelData> flowable = invokeModelApiResp.getFlowable();
            return flowable;
        } catch (Exception e) {
            e.printStackTrace();
            throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
        }
    }
}

将AI结合自己的业务

先讲一个这个宏观的步骤:

如何将AI结合自己的业务

1:我们需要确定我们需要给AI什么样的prompt,AI会给我们更准确的答案(这个可能需要遵循一些给AI提问的技巧和长时间的调试)

2:我们接收到AI的请求之后,我们应该如何处理(我觉得这一步也很难)

秉承着上面的步骤,我们来将这个AI具体结合到我们的业务中

我们的业务目的是生成题目,所以我们要给AI的肯定是生成题目的标题和题目的描述

还有一个最重要的,一定要指定返回题目的类型,一定要和我们数据库中的题目格式一样

系统prompt实例:

你是一位严谨的出题专家(让GLM进行角色扮演),我会给你如下信息:(分隔符标示不同内容)

应用名称,
【【【应用描述】】】,(对于描述多行内容,可以通过特殊字符包起来,防止用户的输入干扰生成结果)
应用类别,
要生成的题目数,
每个题目的选项数

请你根据上述信息,按照以下步骤来出题:链式思考,将复杂任务拆解成简单的任务)

  1. 要求:题目和选项尽可能地短,题目不要包含序号,每题的选项数以我提供的为主,题目不能重复指定输出长度的示例

  2. 严格按照下面的 json 格式输出题目和选项

[{"options":[{"value":"选项内容","key":"A"},{"value":"","key":"B"}],"title":"题目标题"}]
(少样本学习)

title 是题目,options 是选项,每个选项的 key 按照英文字母序(比如 A、B、C、D)以此类推,value 是选项内容

  1. 检查题目是否包含序号,若包含序号则去除序号

  2. 返回的题目列表格式必须为 JSON 数组指定固定的输出格式

标红的技巧都是在智谱AI的官方文档中的Prompt工程的文档中有写 

用户prompt实例:

小学数学测验, 【【【小学三年级的数学题】】】, 得分类, 10, 3

MBTI 性格测试, 【【【快来测测你的 MBTI 性格】】】, 测评类, 10, 3

用户的prompt就根据这个系统的prompt给就好。 

具体的代码实现:

首先想清楚这个我们如果要生成题目,我们需要几个参数?

根据AiManager(默认非流式且题目较为稳定),我们只要传两个参数,一个是系统prompt,一个是用用户的prompt。

系统prompt那就是这个默认值,用户的prompt是由应用的标题,应用的描述,应用题目数量,每个题目的选项组成的,所以我们需要一个封装用户prompt的方法

首先先来个dto接收前端参数:

/***
 * ai生成题目请求
 */
@Data
public class AiGenerateQuestionRequest implements Serializable {


    /**
     * 应用id
     */
    private Long appId;

    /**
     * 要生成的题目数
     */
    private Integer questionsNum = 10;
    /**
     * 每个题目的选项数
     */
    private Integer optionsNum = 4;

    private static final long serialVersionUID = 1L;
}

我们只需要应用的id,我们就可以自己在数据库中查出对应的应用

还有生成的题目数,和对应题目的选线

private static final String SystemPromptQuestion = "你是一位严谨的出题专家,我会给你如下信息:\n" +
            "```\n" +
            "应用名称,\n" +
            "【【【应用描述】】】,\n" +
            "应用类别,\n" +
            "要生成的题目数,\n" +
            "每个题目的选项数\n" +
            "```\n" +
            "\n" +
            "请你根据上述信息,按照以下步骤来出题:\n" +
            "1. 要求:题目和选项尽可能地短,题目不要包含序号,每题的选项数以我提供的为主,题目不能重复\n" +
            "2. 严格按照下面的 json 格式输出题目和选项\n" +
            "```\n" +
            "[{\"options\":[{\"value\":\"选项内容\",\"key\":\"A\"},{\"value\":\"\",\"key\":\"B\"}],\"title\":\"题目标题\"}]\n" +
            "```\n" +
            "title 是题目,options 是选项,每个选项的 key 按照英文字母序(比如 A、B、C、D)以此类推,value 是选项内容\n" +
            "3. 检查题目是否包含序号,若包含序号则去除序号\n" +
            "4. 返回的题目列表格式必须为 JSON 数组\n";

    /**
     * 封装用户的prompt
     *
     * @param app
     * @param questionsNum
     * @param optionsNum
     * @return
     */
    public String getUserPrompt(App app, Integer questionsNum, Integer optionsNum) {
        ThrowUtils.throwIf(app == null, ErrorCode.NOT_FOUND_ERROR, "应用不存在");
        StringBuilder userPrompt = new StringBuilder();
        userPrompt.append("应用名称" + app.getAppName()).append("\n");
        userPrompt.append("应用描述" + app.getAppDesc()).append("\n");
        userPrompt.append("应用类别" + AppTypeEnum.getEnumByValue(app.getAppType()).getText()).append("\n");
        userPrompt.append(questionsNum).append("\n");
        userPrompt.append(optionsNum);
        return userPrompt.toString();
    }

系统的常量和封装用户的prompt方法

/**
     * AI生成题目(非流式)
     *
     * @param aiGenerateQuestionRequest
     * @return
     */
    @PostMapping("/ai_generate")
    public BaseResponse<List<QuestionContentDTO>> aiGenerateQuestion(@RequestBody AiGenerateQuestionRequest aiGenerateQuestionRequest) {
        ThrowUtils.throwIf(aiGenerateQuestionRequest == null, ErrorCode.PARAMS_ERROR, "ai参数为空");
        Long appId = aiGenerateQuestionRequest.getAppId();
        App app = appService.getById(appId);
        ThrowUtils.throwIf(app == null, ErrorCode.NOT_FOUND_ERROR, "应用不存在");
        Integer questionsNum = aiGenerateQuestionRequest.getQuestionsNum();
        Integer optionsNum = aiGenerateQuestionRequest.getOptionsNum();
        String userPrompt = getUserPrompt(app, questionsNum, optionsNum);
        String result = aiManager.doSyncStableRequest(SystemPromptQuestion,userPrompt);
        int startIndex = result.indexOf("[");
        int endIndex = result.lastIndexOf("]");
        String jsonStr = result.substring(startIndex, endIndex + 1);
        List<QuestionContentDTO> questionContentDTOS = JSONUtil.toList(jsonStr, QuestionContentDTO.class);
        return ResultUtils.success(questionContentDTOS);
    }

整体的代码逻辑其实很简单

就是先从数据库中查出对应的app

接着封装用户的prompt

然后调用自己封装的ai接口生成结果

最后处理结果

不过最后这个处理结果需要注意:
```json
[
  {"options":[{"value":"12 + 15", "key":"A"},{"value":"14 + 17", "key":"B"},{"value":"13 + 16", "key":"C"}], "title":"小学数学测验:哪个选项的结果是29?"},
  {"options":[{"value":"5 x 6", "key":"A"},{"value":"4 x 7", "key":"B"},{"value":"6 x 5", "key":"C"}], "title":"小学数学测验:哪个选项的结果是30?"}
]

这个是AI返回的result

[{"options":[{"result":"","score":0,"value":"","key":"A"}],"title":""}]

这个是我们数据库中存储的格式

我们发现,AI返回的时候多了什么 ```json

这种奇奇怪怪的字符

所以我们做一个处理获取第一个 "[" 和 最后一个"]"的位置,取一个字串即可

优化:

这里的优化点是什么呢:

我们生成题目会有重复

​​​我们每次都给AI发送同样的系统prompt和用户prompt,这样AI就很容易生成重复的题目

虽然说我们可以查询数据库然后将存入数据库中的重复题目删除

但是这样做用户体验并不好

我们最好的办法就是让这个AI不要生成重复的题目

这里就涉及到一个保存AI上下文的方法了

平常我们和AI聊天,都在一个界面中聊,但是你关闭了这个客户端之后,之前的记录就没了,如果你想引用之前的内容,就不太行了。

我们的解决办法就是保存用户的上下文

具体代码:

List<ChatMessage> chatMessageList = new ArrayList<>();
/**
     * AI生成题目(非流式)
     *
     * @param aiGenerateQuestionRequest
     * @return
     */
    @PostMapping("/ai_generate")
    public BaseResponse<List<QuestionContentDTO>> aiGenerateQuestion(@RequestBody AiGenerateQuestionRequest aiGenerateQuestionRequest) {
        ThrowUtils.throwIf(aiGenerateQuestionRequest == null, ErrorCode.PARAMS_ERROR, "ai参数为空");
        Long appId = aiGenerateQuestionRequest.getAppId();
        App app = appService.getById(appId);
        ThrowUtils.throwIf(app == null, ErrorCode.NOT_FOUND_ERROR, "应用不存在");
        Integer questionsNum = aiGenerateQuestionRequest.getQuestionsNum();
        Integer optionsNum = aiGenerateQuestionRequest.getOptionsNum();
        String userPrompt = getUserPrompt(app, questionsNum, optionsNum);
        chatMessageList.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), SystemPromptQuestion));
        chatMessageList.add(new ChatMessage(ChatMessageRole.USER.value(), userPrompt));
        String result = aiManager.doRequest(chatMessageList, Boolean.FALSE, null);
        System.out.println(result);
        int startIndex = result.indexOf("[");
        int endIndex = result.lastIndexOf("]");
        String jsonStr = result.substring(startIndex, endIndex + 1);
        chatMessageList.add(new ChatMessage(ChatMessageRole.ASSISTANT.value(), jsonStr));
        List<QuestionContentDTO> questionContentDTOS = JSONUtil.toList(jsonStr, QuestionContentDTO.class);
        return ResultUtils.success(questionContentDTOS);
    }

我们保存一个List<ChatMessage> chatMessageList = new ArrayList<>();这样的对话列表

每次和AI对话完就保存进去,然后传递给AI,那AI就知道了,就不会生成重复的题目了

注意点:

1:tokens的长度不要太小,在项目中,我们处理Ai返回的数据,是因为这段数据前面有一个 [,后面有一个 ],如果把token设置得太小,后面得 ] 可能就会消失。

2:就是关于temperature这个值得设置,多尝试

3:注意保存这个上下文,这个上下文进行保存是会消耗tokens的,所以不能保存过长。

将生成题目的功能改造成流式输出并且用SSE的方式推送到前端

SSE技术:

Server-Sent Events(SSE)是一种技术,用于在客户端和服务器之间建立持久的单向通信通道,从而允许服务器主动推送更新到客户端。与 WebSocket 和其他实时通信技术相比,SSE 主要用于简单的、单向的数据流传输,特别适用于需要从服务器向客户端发送实时更新的场景。

SSE是单向的(从服务端到客户端),特别适用于这种AI流式生成答案返回

WebSocket是双向的,比较适合于网页聊天这种

这就是AI的优化了

我们每次生成的时候都要等很长的事件

为了提高用户的体验,我们可以生成一道题就输出一道题

这个需求最难的是什么?

我们知道就是,我们只要在后端的参数一改

把stream的参数改成true就是流式输出了(最多我们再封装一个方法)。

但是我们如何处理流式的数据呢?

[
{"options":[{"score":0,"value":"15","key":"A"},{"score":0,"value":"20","key":"B"}],"title":"3 + 12 等于多少?"},{"options":[{"score":0,"value":"6","key":"A"},{"score":0,"value":"7","key":"B"}],"title":"9 减去几等于 3?"}
]

 这个是Ai给我们的answer

问题是:我们肯定不能说一个题目都没取完整就返回给前端

但是,我们什么时候能确定这个一个题目是取完整了呢?

首先我们要知道,AI返回的流不一定是一个子一个字返回的

有可能是一个 :tions":[    也有可能是一个: lue":"6",

这种奇奇怪怪的字符串。

这里鱼皮老师用了一个方法:就是将每次返回的一块进行分流

将一个大的流分成n个小流,并且呢,将每个字符串转成一个字符列表就是List<Charcter>

然后我们在 “下游” 就可以简单进行处理了。

后端代码:

/**
     * 通用请求(流式输出)(简化消息传递)
     * @param systemMessage
     * @param userMessage
     * @param temperature
     * @return
     */
    public Flowable<ModelData> doStreamRequest(String systemMessage, String userMessage,Float temperature) {
        List<ChatMessage> chatMessageList = new ArrayList<>();
        ChatMessage systemChatMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), systemMessage);
        chatMessageList.add(systemChatMessage);
        ChatMessage userChatMessage = new ChatMessage(ChatMessageRole.USER.value(), userMessage);
        chatMessageList.add(userChatMessage);
        return doStreamRequest(chatMessageList,temperature);
    }

    /**
     * 通用请求(流式输出)
     * @param messages
     * @param temperature
     * @return
     */
    public Flowable<ModelData> doStreamRequest(List<ChatMessage> messages,Float temperature) {
        // 构建请求
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(Constants.ModelChatGLM4)
                .stream(Boolean.TRUE)
                .temperature(temperature)
                .invokeMethod(Constants.invokeMethod)
                .messages(messages)
                .build();
        try {
            ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);
            final Flowable<ModelData> flowable = invokeModelApiResp.getFlowable();
            return flowable;
        } catch (Exception e) {
            e.printStackTrace();
            throw new BusinessException(ErrorCode.SYSTEM_ERROR, e.getMessage());
        }
    }
/**
     * AI生成题目(流式)
     *+
     * @param aiGenerateQuestionRequest
     * @return
     */
    @GetMapping("/ai_generate/sse")
    public SseEmitter aiGenerateQuestionSSE(AiGenerateQuestionRequest aiGenerateQuestionRequest) {
        ThrowUtils.throwIf(aiGenerateQuestionRequest == null, ErrorCode.PARAMS_ERROR, "ai参数为空");
        Long appId = aiGenerateQuestionRequest.getAppId();
        App app = appService.getById(appId);
        ThrowUtils.throwIf(app == null, ErrorCode.NOT_FOUND_ERROR, "应用不存在");
        Integer questionsNum = aiGenerateQuestionRequest.getQuestionsNum();
        Integer optionsNum = aiGenerateQuestionRequest.getOptionsNum();
        String userPrompt = getUserPrompt(app, questionsNum, optionsNum);
        Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(SystemPromptQuestion, userPrompt, null);
        AtomicInteger count = new AtomicInteger();//用AtomicInteger可以保证线程安全(因为在使用流的时候,其实是个多线程的环境,普通的int无法保证线程安全)
        SseEmitter sseEmitter = new SseEmitter();
        StringBuilder stringBuilder = new StringBuilder();
        //todo gpt提供了另一种方法
        modelDataFlowable.observeOn(Schedulers.io())
                .map(modelData -> modelData.getChoices().get(0).getDelta().getContent())
                .map(modelData -> modelData.replaceAll("\\s", ""))//\\s 是一个正则表达式,表示匹配任意空白字符。 replaceAll("\\s", "") 这行代码用于去除字符串中的所有空白字符,包括空格、制表符(Tab)、换行符和其他空白字符
                .filter(StringUtils::isNotBlank)
                .flatMap(message -> {
                    List<Character> characterList = new ArrayList<>();
                    for (char c : message.toCharArray()) {
                        characterList.add(c);
                    }
                    return Flowable.fromIterable(characterList);
                })//latMap 是 ReactiveX(RxJava)库中的一个非常重要且常用的操作符。它的作用是将源 Observable 或 Flowable 发送的每个数据项映射为一个新的 Observable 或 Flowable
                .doOnNext(character -> {
                    if (character.equals('{')) {
                        count.addAndGet(1);
                    }
                    if(count.get() > 0){
                        stringBuilder.append(character);
                    }
                    if (character.equals('}')) {
                        count.addAndGet(-1);
                        if (count.get() == 0){
                            sseEmitter.send(JSONUtil.toJsonStr(stringBuilder.toString()));
                            stringBuilder.setLength(0);
                        }
                    }
                })
                .doOnError(throwable -> sseEmitter.completeWithError(throwable))
                .doOnComplete(sseEmitter::complete)
                .subscribe();
        return sseEmitter;

    }

前面的逻辑和上面一样

整体的代码逻辑:

首先,这个AI流式返回是返回了一个Flowable<ModelData>这种流式对象

我们可以对这个流式对象链式调用,就和Java8中的stream一样

接着继续分析:

.map(modelData -> modelData.getChoices().get(0).getDelta().getContent())

将这个数据取出来

.map(modelData -> modelData.replaceAll("\\s", ""))//\\s 是一个正则表达式,表示匹配任意空白字符。 replaceAll("\\s", "") 这行代码用于去除字符串中的所有空白字符,包括空格、制表符(Tab)、换行符和其他空白字符

将这个一些无效字符进行过滤

.filter(StringUtils::isNotBlank)

然后将空字符串再进行一个过滤

.flatMap(message -> {
                    List<Character> characterList = new ArrayList<>();
                    for (char c : message.toCharArray()) {
                        characterList.add(c);
                    }
                    return Flowable.fromIterable(characterList);
                })

latMap 是 ReactiveX(RxJava)库中的一个非常重要且常用的操作符。它的作用是将源 Observable 或 Flowable 发送的每个数据项映射为一个新的 Observable 或 Flowable

就是上面说的进行了一个分流

.doOnNext(character -> {
                    if (character.equals('{')) {
                        count.addAndGet(1);
                    }
                    if(count.get() > 0){
                        stringBuilder.append(character);
                    }
                    if (character.equals('}')) {
                        count.addAndGet(-1);
                        if (count.get() == 0){
                            sseEmitter.send(JSONUtil.toJsonStr(stringBuilder.toString()));
                            stringBuilder.setLength(0);
                        }
                    }
                })

到这个地方就剩下这个字符列表了

我们怎么处理,这里涉及到一个算法(其实就是力扣上的括号匹配)

但是这里可以做一个简化,因为我们知道 "{" 先出来,并且"{}"两个数量是对称的

所以我们来一个计数器当有左大括号 ++ ,有右大括号 -- ,然后等到计数器为0 的时候,通过SSE的sseEmitter返回给前端即可。

最后订阅一下就行。

还有一个注意点:需要注意这里的返回值是一个sseEmitter这个对象。

前端代码:

const doSubmitSSE = async () => {
  if (!props.appId) {
    return;
  }
  submitting.value = true;
  //创建SSE请求
  const eventRouce = new EventSource(
    //填写后端的完整的地址
    `http://localhost:8100/api/question/ai_generate/sse?appId=${props.appId}&optionsNum=${form.optionsNum}&questionNum=${form.questionsNum}`
  );
  eventRouce.onmessage = function (event) {
    console.log(event.data);
    props.aiOnSSESuccess?.(JSON.parse(event.data));
  };
  eventRouce.onopen = function () {
    console.log("建立连接");
    props.aiOnSSEStart?.(event);
    handleCancel();
  };
  eventRouce.onerror = function () {
    if (event?.eventPhase == EventSource.CLOSED) {
      props.aiOnSSEClose?.(event);
      console.log("正常关闭");
      eventRouce.close();
    }
  };
  submitting.value = false;
};
整体的代码逻辑:

首先先创建一个SSE对象来接收请求(SSE接收请求和axios没关系,我们这里需要指定后端的完整路径名称,并且把参数带上)

接着SSE对象的三个方法:onmessage,onopen ,onerror

这里就是接收参数,然后把这个event.data的数据传递给父组件,所以其实没有什么逻辑

关于这个流式和异步处理的自我思考:

 首先智谱AI底层的流式处理用的是RxJava响应式编程的库

先说一下什么是响应式编程:

响应式编程(Reactive Programming)是一种编程范式,用于处理异步数据流和变化的事件

它特别适合处理用户界面、实时数据流、事件驱动系统等需要高效处理异步操作的场景。

结合我们项目中的AI答案的流式返回,就刚好很适合这个响应式编程

根据上面那个项目的整个流程,我觉得处理整个流式答案返回最重要的就是整个流的概念和RxJava提供了很多的操作符可以直接来操作这个流(代码案例如上)

就和这个Java8中的新特性一样

毕竟这个地方叫自我思考

我问了gpt一个问题,因为我在看响应式编程的时候老是看到一个词叫:异步处理

但是在我第一次接触 “异步” 这个概念的时候,我见到的是:异步调用

我一开始就把这两个东西搞混了,我以为是一个东西

首先异步调用:

异步调用 是指在程序中发起一个操作或任务时,不需要等待这个操作或任务完成就能继续执行后续代码。异步调用的主要目的是避免阻塞主线程或当前执行上下文,以提高应用的响应能力和性能。

换成人话说就是,向我们平常的代码都是一行一行执行下来的,都是同步的,如果有一行卡住了,那后面的都不能执行

但是异步的不一样,异步的啊,不能让你堵住,我就先执行后面的,在你这个地方留一个回调函数,等你执行完了我再回来执行你

之前在API开发平台的接口中有提到,就是在springgateway的过滤器默认应该用的是springFlux的异步调用, 我向记录日志,我需要等,这个调用的接口的人调用结束了,我才能记录日志。

但是这个默认应该是异步的,所以就先记录了空日志(因为我什么都没干,所以是空的)。

前端天生就是可以异步调用的

绑定一个按钮,然后点击一下,就调用

接着就是异步处理了

异步处理 是指处理任务或数据的方式,使得任务可以在后台执行,而主线程可以继续执行其他操作。异步处理通常涉及到多个异步操作的组合、管理和协调。它的目标是高效地处理异步任务,并管理任务之间的关系。

说人话:

我们上面的流就是异步处理的,比如AI返回给我们的数据是 : a,b,c,d

AI是流式返回给我们的

然后我们搞了两个map 第一个map 给每个数 +1 第二个map给每个数  ×2

当a来了,a就执行第一个map,这个时候a开始+1,加完1,之后,a就流到 “下游”了,b就来了

b就+1,a这个时候就在×2了

所以我们也能看出来,这种异步处理数据流的感觉了

这样也可以提高效率对吧,类似多线程同时运行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2080284.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python可视化-条形图

1、加载数据 import pandas as pd import seaborn as sns import matplotlib.pyplot as plt# 导入数据 df pd.read_csv(E:/workspace/dataset/seaborn-data-master/tips.csv) df.head()2、基于seaborn的条形图 # 利用barplot函数快速绘制 sns.barplot(x"total_bill&quo…

Python从0到100(五十三):机器学习-决策树及决策树分类器

前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Python爬虫、Web开发、 计算机视觉、机器学习、神经网络以及人工智能…

中微8S6990 EEPROM踩坑记录

中微8S6990 EEPROM内容丢失解决记录 问题描述: 问题程序如下: void temp_remember(uint16_t temperature,uint16_t address_H,uint16_t address_L) {uint8_t temp,temp1 0;temp temperature>>8;temp1 temperature;FLASH_UnLock();FLASH_Erase_DataArea(address_H);…

虹科方案 | 领航智能交通革新:虹科PEAK智行定位车控系统Demo版亮相

导读&#xff1a; 在智能汽车技术发展浪潮中&#xff0c;车辆控制系统的智能化、网络化已成为行业发展的必然趋势。虹科PEAK智行定位车控系统&#xff0c;集成了尖端科技&#xff0c;能够实现车辆全方位监控与控制的先进系统。从实时GPS定位到CAN/CANFD信号处理&#xff0c;虹科…

漏洞挖掘 | 记一次Spring横向渗透

0x1 前言 这篇文章给师傅们分享下&#xff0c;前段时间的一个渗透测试的一个项目&#xff0c;开始也是先通过各种的手段和手法利用一些工具啊包括空间引擎等站点对该目标公司进行一个渗透测试。前面找的突破口很少&#xff0c;不太好搞&#xff0c;但是后面找到了spring全家桶…

2024.8.27

130124202408271012 DATE #:20240827 ITEM #:DOC WEEK #:TUESDAY DAIL #:捌月廿肆 TAGS < BGM "Dragonflame--Kirara Magic" > < theme oi-contest > < theme oi-data structure Segment > < [空] > < [空] > 渊沉鳞潜&#xff0c…

搜维尔科技:Manus VR高精度手部动作捕捉数据手套为人形机器人、人工智能和人机交互赋能

Manus Quantum数据手套能够提供实时端到端的手部动作数据流与高精度数据集&#xff0c;助力人形机器人实现快速发展。 Quantum量子数据手套采用毫米级精度的磁性指尖跟踪传感器&#xff0c;融入尖端的EMF磁性定位追踪技术&#xff0c;无漂移&#xff0c;能提供高度准确且可靠的…

波导阵列天线学习笔记5 工作在K/Ka频带上的紧凑的共口径双频双圆极化波导天线阵列

摘要: 在本文中&#xff0c;一种紧凑的共口径双频双圆极化天线阵列被提出在K/Ka频段的全双工卫星通信中来实现高增益和宽带宽。所设计的天线阵列可以同时在20GHz频带实现右旋圆极化辐射同时在30GHz频带实现左旋圆极化辐射。此阵列包括圆极化波导天线单元和全公司馈网。脊频谱极…

CTFHub-SSRF过关攻略

第一题&#xff0c;内网访问 一&#xff0c;打开web/ssrf/内网访问 二&#xff0c;进入页面什么都没有查看一下上一步给的参数 三&#xff0c;输入http://127.0.0.1/flag.php回车显示flag 四&#xff0c;然后复制提交&#xff08;恭喜通关&#xff09; 第二题&#xff0c;伪协…

Glide生命周期监听原理以及简单应用利用空Fragment代理Activity

Glide关于生命周期监听的原理解析以及简单应用 文章目录 Glide关于生命周期监听的原理解析以及简单应用1.Glide生命周期监听原理1.1 从Glide初始化开始分析1.2 原理总结 2.简单应用2.1 应用场景1-主题切换之昼夜模式变化监听2.2 应用场景2--SDK打开特定应用或Activity 3.总结 相…

docker的部署及基本用法

目录​​​​​​​ 1 docker 介绍 1.1 什么是docker&#xff1f; 1.2 docker在企业中的应用场景 1.3 docker与虚拟化的对比 1.4 docker的优势 1.5 容器工作方式 2 部署docker 2.1 配置软件仓库 2.2 docker 安装 2.3 配置docker 镜像加速器 2.4 启动服务 2.5 激活内核网络选项…

ctfhub-web-SSRF通关攻略

一、内网访问 1.打开ctfhub给的环境地址 2.观察题目 发现让我们访问127.0.0.1下的flag.php 在地址栏后面有一个url参数 ?urlhttp://127.0.0.1/flag.php 提交即可 二、伪协议读取文件 1.打开ctfhub给的环境 2.观察题目 发现让我们读取flag.php文件 读取文件用到的协议是…

2024最值得购买的耳机?开放式耳机测评

在2024年&#xff0c;多款开放式耳机在市场上备受关注&#xff0c;它们各具特色&#xff0c;满足了不同消费者的需求。今天甜心根据当前市场情况和用户反馈&#xff0c;为大家推荐几款最值得购买的开放式耳机&#xff1a; 虹觅HOLME Fit2 虹觅HOLME Fit2是一款集颜值、舒适度、…

WireShark网络分析~环境搭建

一、虚拟网络设备搭建 &#xff08;一&#xff09;eNSP介绍 网络由网络设备和计算机构成&#xff0c;eNSP是模拟网络拓扑关系的软件。 &#xff08;二&#xff09;eNSP下载 华为官网&#xff1a;https://forum.huawei.com/enterprise/zh/thread/blog/580934378039689216 &am…

2k1000LA 调试4G

问题&#xff1a; 其实算不上 调试&#xff0c; 之前本来4G是好的&#xff0c;但是 我调试了触摸之后&#xff0c;发现4G用不了了。 其实主要是 pppd 这个命令找不到。 首先来看 为什么 找不到 pppd 这个命令。 再跟目录使用 find 命令&#xff0c;能够找到这个命令&#…

python可视化-密度图

1、加载数据 import pandas as pd import numpy as np from sklearn.datasets import load_iris import warnings# 禁用所有警告信息 warnings.filterwarnings(ignore)# 加载数据 iris load_iris() iris iris.keys() df pd.DataFrame(iris.data, columnsiris.feature_names)…

【JS】localeCompare实现中文排序

如何对两个中文进行字典顺序排序&#xff0c;如’本’拼音首字母’b’&#xff0c;‘初’拼音首字母’c’&#xff0c;所以’本’<‘初’。 JS默认根据编码顺序排序 使用localeCompare即可&#xff0c;如 ‘本’ < ‘初’ 则返回负数 使用方法 referenceStr.localeComp…

HR招聘面试人才测评工具,mbti职业性格测试

MBTI职业性格测试是国际最为流行的职业人格评估工具&#xff0c;作为一种对个性的判断和分析&#xff0c;是一个理论模型&#xff0c;从纷繁复杂的个性特征中&#xff0c;归纳提炼出4个关键要素——动力、信息收集、决策方式、生活方式&#xff0c;进行分析判断&#xff0c;从而…

万邑通信息科技笔试题库:北森测评言语数字图形真题答题要求及真题分享

万邑通&#xff08;上海&#xff09;信息科技股份有限公司是一家提供跨境电商整体供应链解决方案的企业。我们专注于为全球客户提供跨境售后物流服务&#xff0c;通过供应链管理与互联网技术相结合&#xff0c;有效降低库存成本&#xff0c;提升库存周转率和资金回报率。我们的…

【应用开发】解决正点原子I.MX6ull应用编程zlib移植问题

问题描述 在正点原子应用开发移植zlib库的时候&#xff0c;文档中有这样一段描述&#xff0c;先删除开发板中的zlib库&#xff0c;然后再拷贝zlib库 这就会导致在使用scp命令拷贝编译好的zlib库的时候报错没有zlib.so.1&#xff0c;如下图所示&#xff1a; 解决方法 千万不…