关于springboot对接chatglm3-6b大模型的尝试

news2024/11/16 9:18:08

之前我们通过阿里提供的cloud ai对接了通义千问。cloud ai对接通义千问
那么接下来我们尝试一些别的模型看一下,其实这个文章主要是表达一种对接方式,其他的都大同小异。都可以依此方法进行处理。

一、明确模型参数

本次我们对接的理论支持来自于阿里云提供的文档。阿里云大3-6b模型文档
我们看到他其实支持多种调用方式,包括sdk和http,我本人是不喜欢sdk的,因为会有冲突或者版本之类的问题,不如直接调用三方,把问题都扔到三方侧。所以我们这里来展示一下使用http的调用方式。
而且大模型的chat一般都是流式的,非流式的没啥技术含量而且效果很low。所以我们直接参考这部分内容即可,
在这里插入图片描述
我们看到他们的服务端其实是支持SSE的推流方式的,具体SSE是啥可以自行百度。
而流式和非流式的区别就在于请求参数的设置。如果你配置了,那大模型端就会给你按照流式响应。
在这里插入图片描述
在有了以上理论支持之后,我们就来测试一下。

二、代码接入

我们看到他的示例请求参数为:

curl --location 'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation' \
--header 'Authorization: Bearer <YOUR-DASHSCOPE-API-KEY>' \  # 这里写你的appkey
--header 'Content-Type: application/json' \
--header 'X-DashScope-SSE: enable' \   # 开启流式
--data '{
    "model": "chatglm3-6b", # 模型名字
    "input":{
        "messages":[      
       
            {
                "role": "user",
                "content": "你好,请介绍一下故宫"
            }
        ]
    },
    "parameters": {
        "result_format": "message"
    }
}'

所以我们可以找到关键点就在以上三处,至于如何申请appkey,可以参考官方。
那么我们接下来就使用okhttp这种支持事件响应的来对接流式的输出。

1、编写返回内容反序列化类

首先我们先来处理返回格式,我决定用一个java类来接受,具体你觉得不灵活可以直接用Json,怎么弄都行。
我们来看一下官网的响应示例格式。

{"output":{"choices":[{"message":{"content":"\n 故宫是中国北京市中心的一座明清两代的皇宫,现已成为博物馆。故宫是中国最具代表性的古建筑之一,也是世界文化遗产之一,以其丰富的文化遗产和精美的建筑艺术而闻名于世界。故宫占地面积达72万平方米,拥有9000多间房屋和70多座建筑,由大小湖泊、宫殿、花园和殿堂组成,是中国古代宫殿建筑之精华。","role":"assistant"},"finish_reason":"stop"}]},"usage":{"total_tokens":105,"input_tokens":24,"output_tokens":81},"request_id":"9d970376-4ba3-98b8-8387-f95702280341"}

我们看到他是个字符串,然后在流式的最后一句他的finish_reason的值是stop,这时候我们就可以结束推流。
OK,我们就来接收一下。

import lombok.Data;

@Data
public class Chatglm36bResponse {
    private Output output;
    private Usage usage;
    private String requestId;

    @Data
    public static class Output {
        private Choice[] choices;

        @Data
        public static class Choice {
            private Message message;
            private String finishReason;


            @Data
            public static class Message {
                private String content;
                private String role;
            }
        }
    }

    @Data
    public static class Usage {
        private int totalTokens;
        private int inputTokens;
        private int outputTokens;
    }
}

2、编写event事件监听器


import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;

import java.io.IOException;

@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class ChatEventSourceListener extends EventSourceListener {

    private String clientId;

    @Override
    public void onOpen(EventSource eventSource, Response response) {
        log.info("ChatEventSourceListener onOpen invoke");
        super.onOpen(eventSource, response);
    }

    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        log.info("ChatEventSourceListener onEvent invoke");
        Chatglm36bResponse chatglm36bResponse = JSONObject.parseObject(data, Chatglm36bResponse.class);
        Chatglm36bResponse.Output output = chatglm36bResponse.getOutput();
        Chatglm36bResponse.Output.Choice[] choices = output.getChoices();
        for (Chatglm36bResponse.Output.Choice choice : choices) {
            String finishReason = choice.getFinishReason();
            String content = choice.getMessage().getContent();
            log.info("ChatEventSourceListener onEvent finishReason is:{},content is:{}", finishReason, content);

            try {
            	// 给前端推流,前端有组件可以接收这种流。
                SseEmitterUtils.sendMsg(clientId, content);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            // 结束了,取消事件,并且结束SSE推流
            if ("stop".equals(finishReason)) {
                eventSource.cancel();
                SseEmitterUtils.completeDelay(clientId);
            }
        }
        super.onEvent(eventSource, id, type, data);
    }

    @Override
    public void onClosed(EventSource eventSource) {
        log.info("ChatEventSourceListener onClosed invoke ******");
        super.onClosed(eventSource);
    }

    @Override
    public void onFailure(EventSource eventSource, Throwable t, Response response) {
        super.onFailure(eventSource, t, response);
        String message = response.message();
        response.close();
        log.info("ChatEventSourceListener onFailure invoke ****** Throwable is:{},res is {}", t.getMessage(),message);
    }
}

我们在每一类事件里面都做了相应的处理。
与之配套的是一个SSE的工具类。

package com.yxy.springbootdemo.utils.sse;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;

public class SseEmitterUtils {

    private static final Logger logger = LoggerFactory.getLogger(SseEmitterUtils.class);

    private static final ThreadPoolExecutor ssePool =  new ThreadPoolExecutor(
                20,
                200,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                runnable -> new Thread(runnable, "sse-sendMsg-pool"),
                new ThreadPoolExecutor.AbortPolicy()
    );

    // SSE连接关闭延迟时间
    private static final Integer EMITTER_COMPLETE_DELAY_MILLISECONDS = 5000;

    // SSE连接初始化超时时间
    private static final Long EMITTER_TIME_OUT_MILLISECONDS = 600_000L;

    // 缓存 SSE连接
    private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();

    /**
     * 获取 SSE连接 默认超时时间EMITTER_TIME_OUT_MILLISECONDS 毫秒
     *
     * @param clientId 客户端 ID
     * @return 连接对象
     */
    public static SseEmitter getConnection(String clientId) {
       return getConnection(clientId,EMITTER_TIME_OUT_MILLISECONDS);
    }

    /**
     * 获取 SSE连接
     *
     * @param clientId 客户端 ID
     * @param timeout  连接超时时间,单位毫秒
     * @return 连接对象
     */
    public static SseEmitter getConnection(String clientId,Long timeout) {
        final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
        if (Objects.nonNull(sseEmitter)) {
            return sseEmitter;
        } else {
            final SseEmitter emitter = new SseEmitter(timeout);

            // 初始化emitter回调
            initSseEmitter(emitter, clientId);

            // 连接建立后,将连接放入缓存
            SSE_CACHE.put(clientId, emitter);
            logger.info("[SseEmitter] 连接已建立,clientId = {}", clientId);
            return emitter;
        }
    }

    /**
     * 关闭指定的流连接
     *
     * @param clientId 客户端 ID
     */
    public static void closeConnection(String clientId) {
        final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
        logger.info("[流式响应-停止生成] 收到客户端关闭连接指令,Emitter is {},clientId = {}", null == sseEmitter ? "NOT-Exist" : "Exist", clientId);
        if (Objects.nonNull(sseEmitter)) {
            SSE_CACHE.remove(clientId);
            sseEmitter.complete();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(EMITTER_COMPLETE_DELAY_MILLISECONDS);
        } catch (InterruptedException ex) {
            logger.error("流式响应异常", ex);
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 推送消息
     *
     * @param clientId 客户端 ID
     * @param msg      消息
     * @return 连接是否存在
     * @throws IOException IO异常
     */
    public static boolean sendMsg(String clientId, String msg) throws IOException {
        final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
        if (Objects.nonNull(sseEmitter)) {
            try {
                sseEmitter.send(msg);
            } catch (Exception e) {
                logger.error("[流式响应-停止生成] ");
                return true;
            }
            return false;
        } else {
            return true;
        }
    }

    /**
     * 异步推送消息 TODO 目前未实现提供回调
     *
     * @param clientId 客户端 ID
     * @param msg      消息
     * @return 连接是否存在
     * @throws IOException IO异常
     */
    public static boolean sendMsgAsync(String clientId, String msg){
        final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
        if (Objects.nonNull(sseEmitter)) {
            try {
                ssePool.submit(()->{
                    try {
                        sseEmitter.send(msg);
                    } catch (IOException e) {
                        logger.error("[流式响应-停止生成] ");
                    }
                });
            } catch (Exception e) {
                logger.error("[流式响应-停止生成] ");
                return true;
            }
            return false;
        } else {
            return true;
        }
    }

    /**
     * 立即关闭SseEmitter,可能存在推流不完全的情况,谨慎使用
     *
     * @param clientId
     */
    public static void complete(String clientId) {
        completeDelay(clientId,0);
    }

    /**
     * 延迟关闭 SseEmitter,延迟一定时长时为了尽量保证最后一次推送数据被前端完整接收
     *
     * @param clientId 客户端ID
     */
    public static void completeDelay(String clientId) {
        completeDelay(clientId,EMITTER_COMPLETE_DELAY_MILLISECONDS);
    }

    /**
     * 延迟关闭 SseEmitter,延迟指定时长时为了尽量保证最后一次推送数据被前端完整接收
     *
     * @param clientId 客户端ID
     */
    public static void completeDelay(String clientId,Integer delayMilliSeconds) {
        final SseEmitter sseEmitter = SSE_CACHE.get(clientId);
        if (Objects.nonNull(sseEmitter)) {
            try {
                TimeUnit.MILLISECONDS.sleep(delayMilliSeconds);
                sseEmitter.complete();
            } catch (InterruptedException ex) {
                logger.error("流式响应异常", ex);
                Thread.currentThread().interrupt();
            }
        }
    }

    /**
     * 初始化 SSE连接 设置一些属性和回调之类的
     *
     * @param emitter 连接对象
     * @param clientId 客户端 ID
     */
    private static void initSseEmitter(SseEmitter emitter, String clientId){
        // 设置SSE的超时回调
        emitter.onTimeout(() -> {
            logger.info("[SseEmitter] 连接已超时,正准备关闭,clientId = {}", clientId);
            SSE_CACHE.remove(clientId);
        });

        // 设置SSE的结束回调
        emitter.onCompletion(() -> {
            logger.info("[SseEmitter] 连接已释放,clientId = {}", clientId);
            SSE_CACHE.remove(clientId);
        });

        // 设置SSE的异常回调
        emitter.onError(throwable -> {
            logger.error("[SseEmitter] 连接已异常,正准备关闭,clientId = {}", clientId);
            SSE_CACHE.remove(clientId);
        });
    }
}

3、编写调用接口


import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/chat")
public class StreamChatController {

    @PostMapping("/send")
    public SseEmitter sendMessage(@RequestParam String username, @RequestParam String message) {

        SseEmitter sseEmitter = SseEmitterUtils.getConnection(username);

        CompletableFuture.runAsync(()->send(username,message));

        return sseEmitter;
    }

    public void send(String username,String message){
        OkHttpClient client = new OkHttpClient();
        JSONObject inputJson = new JSONObject();
        JSONArray messagesArray = new JSONArray();
        JSONObject systemMessage = new JSONObject();
        systemMessage.put("role", "system");
        systemMessage.put("content", "You are a helpful assistant.");
        messagesArray.add(systemMessage);

        JSONObject userMessage = new JSONObject();
        userMessage.put("role", "user");
        userMessage.put("content", message);
        messagesArray.add(userMessage);

        inputJson.put("messages", messagesArray);

        JSONObject payloadJson = new JSONObject();
        payloadJson.put("model", "chatglm3-6b");
        payloadJson.put("input", inputJson);

        JSONObject parametersJson = new JSONObject();
        parametersJson.put("result_format", "message");
        payloadJson.put("parameters", parametersJson);

        String json = payloadJson.toString();

        RequestBody body = RequestBody.create(MediaType.parse("application/json"),json);

        Request request = new Request.Builder()
                .url("https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation")
                .post(body)
                .addHeader("Authorization", "Bearer 你得API-KEY")
                .addHeader("Content-Type", "application/json")
                .addHeader("X-DashScope-SSE", "enable")
                .build();

        // 创建事件监听器
        EventSourceListener eventSourceListener = new ChatEventSourceListener(username);

        EventSource.Factory factory = EventSources.createFactory(client);
        // 创建事件
        EventSource eventSource = factory.newEventSource(request, eventSourceListener);
        // 与服务器建立连接
        eventSource.request();
    }
}

4、编写前端

我这个有点粗糙,实际效果比这好的多。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Chat</title>
</head>
<body>
<h1>YXY-Chat</h1>

<div id="chat-messages"></div>
<form id="message-form">
    <input type="text" id="message-input" placeholder="输入消息">
    <button type="submit">发送</button>
</form>

<script>
    const chatMessages = document.getElementById('chat-messages');
    const messageForm = document.getElementById('message-form');
    const messageInput = document.getElementById('message-input');

    // 连接到聊天室
    const connectToChat = () => {
        const username = prompt('Enter your username:');
        const eventSource = new EventSource(`/chat/connect?username=${encodeURIComponent(username)}`);

        // 接收来自服务器的消息
        eventSource.onmessage = function(event) {
            const message = event.data;
            displayMessage(message);
        };

        // 处理连接错误
        eventSource.onerror = function(event) {
            console.error('EventSource error:', event);
            eventSource.close();
        };

        // 提交消息表单
        messageForm.addEventListener('submit', function(event) {
            event.preventDefault();
            const message = messageInput.value.trim();
            if (message !== '') {
                sendMessage(username, message);
                messageInput.value = '';
            }
        });
    };

    // 发送消息到服务器
    const sendMessage = (username, message) => {
        fetch(`/chat/send?username=${encodeURIComponent(username)}&message=${encodeURIComponent(message)}`, {
            method: 'POST'
        })
        .catch(error => console.error('Error sending message:', error));
    };

    // 在界面上显示消息
    const displayMessage = (message) => {
        const messageElement = document.createElement('div');
        messageElement.textContent = message;
        chatMessages.appendChild(messageElement);
    };
    // 发起连接
    connectToChat();

</script>
</body>
</html>

5、发起调用

在这里插入图片描述
我们看到其实是成功了,但是前端没有把流数据渲染上去,我不太懂前端,后面改一改试试。

三、总结

我们这只是其中一种模型的对接,其实别的也都差不多,都是基于流可以用http来操作,你可以在你的项目中建立一个AI中台,来对接各种模型,给别的服务提供调用。只是需要看明白每种模型的参数。
而且我们目前只是简单的实现,还存在很多问题,比如okhttp客户端没有做池化,每次都是new出来的。
CompletableFuture的异步调用没有指定线程池,还是共用的默认池,这样会导致可能被别的业务影响。
等等细节问题,我们这里先不做处理,后面如果真的要用,可以着手细节处的优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2087443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

模型 DFEAS营销法

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。需求触发&#xff0c;精准营销转化。 1 DFEAS营销法的应用 1.1 个性化健身应用的市场拓展策略 随着健康意识的增强&#xff0c;个性化健身应用市场迅速发展。一款名为“FitMyLife”的个性化健身应用…

FancyVideo

一、模型介绍 合成运动丰富且时间一致的视频仍然是人工智能领域的一项挑战&#xff0c;尤其是在处理较长的持续时间时。现有的文本到视频 (T2V) 模型通常采用空间交叉注意进行文本控制&#xff0c;等效地指导不同帧的生成而无需特定于帧的文本指导。因此&#xff0c;模型理解提…

经典算法之链表篇(二)

目录 一&#xff1a;重排链表&#xff08;LeetCode.143&#xff09; 二&#xff1a;删除链表的节点&#xff08;LCR 136. 删除链表的节点&#xff09; 三&#xff1a;K个一组反转链表&#xff08;LeetCode.25&#xff09; 有关经典算法链表的第一篇内容&#xff0c;可以查看我…

在线考试系统源码功能分析

在线考试系统源码的功能分析涵盖了多个关键方面&#xff0c;以确保系统能够满足教育机构和个人的需求。以下是一些常见的功能分析&#xff1a; 权限控制&#xff1a;系统通常支持多个角色&#xff0c;如教师、管理员和学生&#xff0c;并使用JWT等技术进行用户身份的合法性校验…

Leetcode JAVA刷刷站(101)对称二叉树

一、题目概述 二、思路方向 在Java中&#xff0c;要检查一个二叉树是否是轴对称的&#xff08;也称为镜像对称的&#xff09;&#xff0c;你可以通过递归地比较树的左子树和右子树是否镜像对称来实现。轴对称的二叉树意味着树的左子树和右子树关于根节点对称&#xff0c;即左子…

微信小程序:手机联调同一个网段无法找到本地接口

我们在开发微信小程序的时候&#xff0c;一般会启动本地服务器进行API连调&#xff0c;不过模拟器上面往往一些问题及细节发现不了&#xff0c;需要真机调试&#xff0c;结果调试的时候发现&#xff0c;不能访问到 localhost或者本机IP&#xff0c;也就访问不到本地接口&#x…

【HarmonyOS NEXT开发】鸿蒙开发环境准备,ArkTS基础语法入门

文章目录 鸿蒙开发环境准备&#xff0c;ArkTS基础语法入门大纲简介DevEco Studio简介运行环境要求 安装与配置开发工具下载Harmony OS 和 OpenHarmony 的区别Previewer汉化插件的配置 ArkTS基础快速入门1. 解释说明2. 变量与常量3. 变量命名规则4. 数组5. 函数定义函数调用函数…

Mini型LoRa DTU远距离无线传输“小体积大作为”

Mini型LoRa DTU&#xff08;数据传输单元&#xff09;CL61M凭借其小巧的体积、低功耗、远距离通信和高可靠性等特点&#xff0c;在远距离无线传输领域展现出了巨大的应用潜力。使RS485/232串口终端设备能够轻松实现十公里的远距离无线通信&#xff0c;适用于多种复杂环境&#…

(三)Kafka离线安装 - ZooKeeper开机自启

手动启动方式 一般通过指令手动来启动zookeeper的方法是&#xff0c;先进入到zookeeper的安装目录下的bin目录&#xff0c;然后执行启动指令。 cd /usr/local/zookeeper/zookeeper-3.8.4/bin/zkServer.sh start 停止指令 zkServer.sh stop 查看状态 zkServer.sh status 上…

如何在知行之桥上通过业务单号查找原始报文?

在知行之桥中接收或发送的数据通常是EDI原始报文&#xff0c;知行之桥会对EDI原始报文进行格式转换&#xff0c;以方便用户后端系统的处理。因此&#xff0c;一般情况下&#xff0c;用户看到的都是转换后的数据结构&#xff0c;例如Json、XML或Excel等&#xff0c;无需直接查看…

window上部署kafka3.6.1,并配置sasl认证

1 安装kafka 第一步安装kafka,并能成功启动&#xff0c;可参考文章Windows下安装Kafka3-CSDN博客 2 修改kafka的配置文件 server.properties是kafka的主要配置文件&#xff0c;里面有很多参数可以调整。 主要修改如下 listenersSASL_PLAINTEXT://127.0.0.1:9092 sasl.enable…

基于tkinter实现学生管理系统(四)

学生信息管理系统-修改学生 代码实现 在上一节中的class StudentManagerApp中添加如下方法&#xff1a; # 修改学生信息def modify_student(self):selection self.tree.selection()if not selection:messagebox.showwarning("警告", "请选择要修改的学员"…

04:创建PADS Logic软件逻辑库

1. 打开自带的库文件 2.保留common库&#xff0c;移除其他库文件 3.新建库 5点击封装工具栏 6选择2D线 7添加端点 8点击保存 9打开查看

SQLi-LABS靶场46-50通过攻略

less-46 1.判断注入点 ?sort1 页面出现报错 2.判断闭合方式 ?sort1 -- 3.查询数据库 因为页面有报错 所以使用报错注入 ?sort1 and updatexml(1,concat(1,database()),1)-- 4.查询数据库的所有表 ?sort1 and updatexml(1,concat(1,(select group_concat(table_name)…

【功能自动化】使用HTMLTestRunner生成测试报告

配置环境&#xff1a; 1.部署webtours网站 2.user.txt 3.HTMLTestRunner.py """ A TestRunner for use with the Python unit testing framework. It generates a HTML report to show the result at a glance.The simplest way to use this is to invoke it…

【Go高性能】测试(单元测试、基准测试)

Go测试 一、分类1. 单元测试2. 基准测试 二、基准测试1. 介绍2. 基准测试基本原则3. 使用testing包构建基准测试3.1 执行基准测试3.2 基准测试工作原理3.3 改进基准测试的准确性3.3.1 -benchtime3.3.2 -count3.3.3 -cpu 4. 使用benchstat工具比较基准测试(可跳过&#xff09;4.…

Leetcode 第 408 场周赛题解

Leetcode 第 408 场周赛题解 Leetcode 第 408 场周赛题解题目1&#xff1a;3232. 判断是否可以赢得数字游戏思路代码复杂度分析 题目2&#xff1a;3233. 统计不是特殊数字的数字数量思路代码复杂度分析 题目3&#xff1a;3234. 统计 1 显著的字符串的数量思路代码复杂度分析 题…

Pycharm can‘t open file ‘D:\\Program‘: [Errno 2] No such file or directory

问题描述 Pycharm 使用Python 3.11.9 版本调试代码报错&#xff1a; 解决方案 1、WindowsR&#xff0c;调起CMD&#xff08;PowerShell不行&#xff09;&#xff0c;执行以下指令&#xff1a; mklink /J "D:\PyCharm" "D:\Program Files\JetBrains\PyCharm 2…

react学习之useState和useEffect

useState useState 可以使函数组件像类组件一样拥有 state&#xff0c;函数组件通过 useState 可以让组件重新渲染&#xff0c;更新视图。 实际使用 setstate()中回调函数的返回值将会成为新的state值回调函数执行时&#xff0c; React会将最新的state值作为参数传递 const A…