通过okhttp调用SSE流式接口,并将消息返回给客户端

news2024/11/25 4:19:56

通过一个完整的java示例来演示如何通过okhttp来调用远程的sse流式接口
背景:我们有一个智能AI的聊天界面,需要调用三方厂商的大模型chat接口,返回答案(因为AI去理解并检索你的问题的时候这个是比较耗时的,这个时候客户端需要同步的在等待最终结果),所以我们的方案是通过流的方式把结果陆续的返回给客户端,这样能极大的提高用户的体验

1.引入相关依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp-sse</artifactId>
            <version>4.2.0</version>
        </dependency>
        <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>

2. controller

package com.demo.controller;
import com.alibaba.fastjson.JSON;
import com.demo.listener.SSEListener;
import com.demo.params.req.ChatGlmDto;
import com.demo.utils.ApiTokenUtil;
import com.demo.utils.ExecuteSSEUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletResponse;

@RestController
@Slf4j
public class APITestController {

    private static final String API_KEY = "xxxx";
    
    private static final String URL = "xxx";


    @PostMapping(value = "/sse-invoke", produces = "text/event-stream;charset=UTF-8")
    public void sse(@RequestBody ChatGlmDto chatGlmDto, HttpServletResponse rp) {

        try {
            String token = ApiTokenUtil.generateClientToken(API_KEY);
            SSEListener sseListener = new SSEListener(chatGlmDto, rp);
            ExecuteSSEUtil.executeSSE(URL, token, sseListener, JSON.toJSONString(chatGlmDto));

        } catch (Exception e) {
            log.error("请求SSE错误处理", e);

        }

    }
}

3. 监听器

监听器里的事件可以自己定义,然后自己去实现自己相关的业务逻辑,onEvent主要用来接收消息

package com.demo.listener;
import com.alibaba.fastjson.JSON;
import com.demo.params.req.ChatGlmDto;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;

import javax.servlet.http.HttpServletResponse;
import java.util.concurrent.CountDownLatch;

@Slf4j
@Data
public class SSEListener extends EventSourceListener {

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private ChatGlmDto chatGlmDto;

    private HttpServletResponse rp;

    private StringBuffer output = new StringBuffer();

    public SSEListener(ChatGlmDto chatGlmDto, HttpServletResponse response) {
        this.chatGlmDto = chatGlmDto;
        this.rp = response;
    }

    /**
     * {@inheritDoc}
     * 建立sse连接
     */
    @Override
    public void onOpen(final EventSource eventSource, final Response
            response) {
        if (rp != null) {
            rp.setContentType("text/event-stream");
            rp.setCharacterEncoding("UTF-8");
            rp.setStatus(200);
            log.info("建立sse连接..." + JSON.toJSONString(chatGlmDto));
        } else {
            log.info("客户端非sse推送" + JSON.toJSONString(chatGlmDto));
        }
    }

    /**
     * 事件
     *
     * @param eventSource
     * @param id
     * @param type
     * @param data
     */
    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        try {
            output.append(data);
            if ("finish".equals(type)) {
                log.info("请求结束{} {}", chatGlmDto.getMessageId(), output.toString());
            }
            if ("error".equals(type)) {
                log.info("{}: {}source {}", chatGlmDto.getMessageId(), data, JSON.toJSONString(chatGlmDto));
            }
            if (rp != null) {
                if ("\n".equals(data)) {
                    rp.getWriter().write("event:" + type + "\n");
                    rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n");
                    rp.getWriter().write("data:\n\n");
                    rp.getWriter().flush();
                } else {
                    String[] dataArr = data.split("\\n");
                    for (int i = 0; i < dataArr.length; i++) {
                        if (i == 0) {
                            rp.getWriter().write("event:" + type + "\n");
                            rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n");
                        }
                        if (i == dataArr.length - 1) {
                            rp.getWriter().write("data:" + dataArr[i] + "\n\n");
                            rp.getWriter().flush();
                        } else {
                            rp.getWriter().write("data:" + dataArr[i] + "\n");
                            rp.getWriter().flush();
                        }
                    }
                }

            }
        } catch (Exception e) {
            log.error("消息错误[" + JSON.toJSONString(chatGlmDto) + "]", e);
            countDownLatch.countDown();
            throw new RuntimeException(e);
        }

    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void onClosed(final EventSource eventSource) {
        log.info("sse连接关闭:{}", chatGlmDto.getMessageId());
        log.info("结果输出:{}" + output.toString());
        countDownLatch.countDown();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void onFailure(final EventSource eventSource, final Throwable t, final Response response) {
        log.error("使用事件源时出现异常... [响应:{}]...", chatGlmDto.getMessageId());
        countDownLatch.countDown();
    }

    public CountDownLatch getCountDownLatch() {
        return this.countDownLatch;
    }
}

4. 相关工具类

获取token ApiTokenUtil类,这个根据自己的业务需求看是否需要,我这里为了程序能跑起来,就保留了

package com.demo.utils;

import com.alibaba.fastjson.JSON;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class ApiTokenUtil {

    public static String generateClientToken(String apikey) {
        String[] apiKeyParts = apikey.split("\\.");
        String api_key = apiKeyParts[0];
        String secret = apiKeyParts[1];

        Map<String, Object> header = new HashMap<>();
        header.put("alg", SignatureAlgorithm.HS256);
        header.put("sign_type", "SIGN");
        Map<String, Object> payload = new HashMap<>();
        payload.put("api_key", api_key);
        payload.put("exp", System.currentTimeMillis() + 5 * 600 * 1000);
        payload.put("timestamp", System.currentTimeMillis());
        String token = null;
        try {
            token = Jwts.builder().setHeader(header)
                    .setPayload(JSON.toJSONString(payload))
                    .signWith(SignatureAlgorithm.HS256, secret.getBytes(StandardCharsets.UTF_8))
                    .compact();
        } catch (Exception e) {
            System.out.println();
        }

        return token;
    }
}

ExecuteSSEUtil 类

package com.demo.utils;

import com.demo.listener.SSEListener;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSources;

@Slf4j
public class ExecuteSSEUtil {

    public static void executeSSE(String url, String authToken, SSEListener eventSourceListener, String chatGlm) throws Exception {
        RequestBody formBody = RequestBody.create(chatGlm, MediaType.parse("application/json; charset=utf-8"));
        Request.Builder requestBuilder = new Request.Builder();
        requestBuilder.addHeader("Authorization", authToken);
        Request request = requestBuilder.url(url).post(formBody).build();
        EventSource.Factory factory = EventSources.createFactory(OkHttpUtil.getInstance());
        //创建事件
        factory.newEventSource(request, eventSourceListener);
        eventSourceListener.getCountDownLatch().await();
    }


}

OkHttpUtil 类

package com.demo.utils;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import java.net.Proxy;
import java.util.concurrent.TimeUnit;

public class OkHttpUtil {
    private static OkHttpClient okHttpClient;

    public static ConnectionPool connectionPool = new ConnectionPool(10, 5, TimeUnit.MINUTES);

    public static OkHttpClient getInstance() {
        if (okHttpClient == null) { //加同步安全
            synchronized (OkHttpClient.class) {
                if (okHttpClient == null) { //okhttp可以缓存数据....指定缓存路径
                    okHttpClient = new OkHttpClient.Builder()//构建器
                            .proxy(Proxy.NO_PROXY) //来屏蔽系统代理
                            .connectionPool(connectionPool)
                            .connectTimeout(600, TimeUnit.SECONDS)//连接超时
                            .writeTimeout(600, TimeUnit.SECONDS)//写入超时
                            .readTimeout(600, TimeUnit.SECONDS)//读取超时
                            .build();
                    okHttpClient.dispatcher().setMaxRequestsPerHost(200);
                    okHttpClient.dispatcher().setMaxRequests(200);
                }
            }
        }
        return okHttpClient;
    }
}

ChatGlmDto 请求实体类

package com.demo.params.req;

import lombok.Data;

/**
 * Created by WeiRan on  2023.03.20 19:19
 */
@Data
public class ChatGlmDto {

    private String messageId;

    private Object prompt;

    private String requestTaskNo;

    private boolean incremental = true;

    private boolean notSensitive = true;
}

5. 接口调用调试

我这里就直接使用curl命令来调用了

curl 'http://localhost:8080/sse-invoke' --data '{"prompt":[{"role":"user","content":"泰山有多高?"}]}' -H 'Content-Type: application/json'

返回结果:
在这里插入图片描述

分割线---------------------------------------------------------------------------------------------------------------------------------

创作不易,三连支持一下吧 👍

最后的最后送大家一句话

白驹过隙,沧海桑田

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1090741.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

10-SRCNN-使用CNN实现超分辨成像

文章目录 utils_dataset.pymodel.pytrain.pyuse.py主要文件 utils_dataset.py 工具文件,主要用来制作dataset,便于加入dataloader,用于实现数据集的加载和并行读取 model.py 主要写入网络(模型) train.py 主要用于训练 use.py 加载训练好的模型,用于测试或使用 utils_dat…

Spring实战 | Spring AOP核心秘笈之葵花宝典

Spring实战系列文章&#xff1a; Spring实战 | Spring IOC不能说的秘密&#xff1f; 国庆中秋特辑系列文章&#xff1a; 国庆中秋特辑&#xff08;八&#xff09;Spring Boot项目如何使用JPA 国庆中秋特辑&#xff08;七&#xff09;Java软件工程师常见20道编程面试题 国庆…

IDEA的常用设置

【1】进入设置&#xff1a; 【2】设置主题&#xff1a; 【3】编辑区的字体变大或者变小&#xff1a; 【4】鼠标悬浮在代码上有提示&#xff1a; 【5】自动导包和优化多余的包&#xff1a; 手动导包&#xff1a;快捷键&#xff1a;altenter 自动导包和优化多余的包&#xf…

计算机网络第2章-HTTP和Web协议(2)

Web和HTTP 一个新型应用即万维网&#xff08;World Wide Web&#xff09;Web。 HTTP概况 Web的应用层协议是超文本传输协议&#xff08;HTPP&#xff09;&#xff0c;它是Web的核心。 HTTP由两个程序实现&#xff1a;一个用户程序和一个服务器程序。 Web页面&#xff08;W…

leetcode-518. 零钱兑换 II

1. 题目 链接: 零钱兑换II 2. 解决方案1 #include <stdio.h> #include <stdlib.h>int change(int amount, int* coins, int coinsSize){int dp[amount1];//确定dp大小memset(dp, 0, sizeof(int) * (amount1));dp[0] 1;//初始化为0for(int i 0 ; i < coins…

LED电子屏幕可以通过什么方式进行人屏互动

传统的LED大屏幕以单向传播的形式面向观众&#xff0c;不仅被动&#xff0c;而且逐渐缺乏动感和创新。随着LED显示技术的蓬勃发展&#xff0c;现在观众与LED电子大屏幕的方式越来越多。那么现阶段实现LED显示屏人屏互动的主要方式都有哪些呢&#xff1f;带你8分钟了解LED互动地…

KASan介绍

目录 概括介绍 配置说明 单独关闭读或写检查 操作使用 影响及注意事项 结果解读 使用注意 实现原理简介 KASAN原理 malloc原理 内容参考 概括介绍 KernelAddressSANitizer &#xff08;KASAN&#xff09; 是一个动态内存错误检测器。它提供了一个快速而全面的解决方…

D课堂 | 如何设置域名解析?解析记录类型选哪个?

上回&#xff0c;D妹和各位小伙伴们介绍了DNS的作用和原理——《什么是DNS&#xff1f;DNS是怎么运作的&#xff1f;》&#xff0c;相信大家对DNS已经有了一定的认识。 DNS是互联网不可或缺的基础服务&#xff0c;核心作用是将域名翻译成计算机可读取的IP地址&#xff0c;也就是…

VMware搭载linux出现的bugs

---------后续在实际Linux项目复盘过程中有遇到问题(解决办法)会不定时更新.......----------- ques: Linux自带的media目录用于挂载或可移动存储设备已满&#xff08;造成这一原因是由于我多次创建新的虚拟机并在同一虚拟目录下挂载同一镜象导致有些残存文件没有删除干净&…

【OpenCv光流法进行运动目标检测】

opencv系列文章目录 文章目录 opencv系列文章目录前言一、光流法是什么&#xff1f;二、光流法实例1.C的2.C版本3.python版本 总结 前言 随着计算机视觉技术的迅猛发展&#xff0c;运动目标检测在图像处理领域中扮演着至关重要的角色。在现实世界中&#xff0c;我们常常需要追…

JDK21要来了,协程对Java带来什么

目录 前言 协程是什么 多线程有什么问题&#xff1f; 协程的线程模型 Reactor模型 使用协程后 RPC并发 IO阻塞 网络IO 磁盘IO epoll为什么不支持磁盘io&#xff1f; Kotlin与Go的协程 Go 使用 Go的协程调度(GPM模型) Kotlin 使用 Kotlin协程调度 阿里Wisp协程…

Linux程序调试工具使用整理

Linux程序调试工具使用整理 GDB调试入门 GDB是GNU开源组织发布的一个强大的UNIX下的程序调试工具。或许&#xff0c;各位比较喜欢那种图形界面方式的&#xff0c;像VC、BCB等IDE的调试&#xff0c;但如果你是在 UNIX平台下做软件&#xff0c;你会发现GDB这个调试工具有比VC、…

万界星空科技可视化数字大屏应用场景及作用

一、MES系统大屏显示&#xff1a;实时监控生产数据的关键 随着制造业的发展&#xff0c;现代企业越来越依赖于高效的生产管理系统来保证生产效率和质量。其中&#xff0c;MES系统数据大屏显示成为了监控生产数据的关键工具。通过实时监控和显示生产数据&#xff0c;企业能够及…

智能网关在校园能耗监测系统中的应用介绍

安科瑞 崔丽洁 摘要&#xff1a;国家提出了全社会节能减排的战略举措&#xff0c;节约型校园的建设是实现这一举措的重要内容。为了对校园能耗实行量化管理、实时监测&#xff0c;需要建立一个完善的监管体系校园节能监管体系。而节能监管体系的核心是能耗监测平台&#xff0c;…

解决react集成typescript报错:找不到名称“div“之类的错误

现象&#xff1a; 原因&#xff1a;Typescript 不希望在 Typescript 文件中看到 JSX元素。 解决此问题的最简单方法是将文件后缀从 .ts 重命名为 .tsx 。

【学习笔记】DTM分布式事务

分布式事务是什么 本文的分布式事务指的是DTM下的分布式事务。 分布式事务有两类&#xff0c;这里指的是跨数据库、跨服务的分布式事务。 分布式事务指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。 CAP理论 C&#xff08;一致性&#x…

【UVM 验证平台打印时间单位控制】

UVM 验证平台打印时间单位控制 UVM 具有丰富的打印功能&#xff0c;打印信息会包含时间/打印位置等信息&#xff0c;根据打印时间可以方便的在波形上找到错误点。默认打印时间单位时fs&#xff0c;由于单位太小会导致打印信息上的时间信息比较长&#xff0c;不方便查看与查找。…

实现Element Select选择器滚动加载

<template><el-selectpopper-class"more-tag-data"v-model"tagId"filterableplaceholder"请选择"focus"focusTag"><el-optionv-for"(item, index) in taskTagLists":key"index":label"item.n…

软件开发无人天车智能控制系统智能库存管理单元解决方案

天车&#xff08;行吊 起重机&#xff09;智控系统在自动控制的基础上&#xff0c;添加了基于智能控制、数据分析存储等尖端技术研发出的各类算法&#xff0c;赋予天车更“聪明”的任务执行及决策制定能力。智控系统能够根据获取的数据和预设的任务需求&#xff0c;通过智能决策…

ftp发布服务器

ftp工具 发布测试 第一步&#xff1a;下载FileZilla 第二部建立站点 连接成功之后可以看到文件了 项目打包后上传 远程站点里的文件删除 左边本地站点上传。over