spring boot 集成科大讯飞星火认知大模型

news2024/11/24 12:26:12

一、安装依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.13</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.example</groupId>
    <artifactId>xunfeigpt</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>xunfeigpt</name>
    <description>xunfeigpt</description>
    <properties>
        <java.version>1.8</java.version>
        <netty.verson>4.1.45.Final</netty.verson>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.9.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.verson}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

二、配置文件
在科大讯飞官网申请key

server:
  port: 1644


xf:
  config:
    hostUrl: https://spark-api.xf-yun.com/v3.5/chat
    appId: xxxxxxx
    apiSecret: xxxxxxxx
    apiKey: xxxxxxx
    maxResponseTime: 30

三、项目结构
在这里插入图片描述
四、各个模块代码
1、在bean目录下
1)Choices类

package com.example.xunfeigpt.bean;

import lombok.Data;


import java.util.List;

@Data
public class Choices {
    private List<Text> text;
}

2)Header类型

import lombok.Data;

@Data
public class Header {
    private int code;

    private int status;

    private String sid;
}

3)JsonParse类型

import lombok.Data;

@Data
public class JsonParse {
    private Header header;

    private Payload payload;
}

4)NettyGroup类型

package com.example.xunfeigpt.bean;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

public class NettyGroup {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    private NettyGroup() {
    }

    /**
     * 获取channel组
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 获取连接channel map
     */
    public static ConcurrentHashMap<String, Channel> getUserChannelMap() {
        return channelMap;
    }
}

5)Payload类型

import lombok.Data;

@Data
public class Payload {
    private Choices choices;
}

6)ResultBean类型

package com.example.xunfeigpt.bean;

import lombok.*;

/**
 * @Author: ChengLiang
 * @CreateTime: 2023-05-22  11:04
 * @Description: TODO
 * @Version: 1.0
 */
@Getter
@Setter
@ToString(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
public class ResultBean<T> {
    private String errorCode;

    private String message;

    private T data;

    public ResultBean(T data) {
        this.errorCode = ErrorMessage.SUCCESS.getErrorCode();
        this.message = ErrorMessage.SUCCESS.getMessage();
        this.data = data;
    }

    public ResultBean(ErrorMessage errorMessage, T data) {
        this.errorCode = errorMessage.getErrorCode();
        this.message = errorMessage.getMessage();
        this.data = data;
    }


    public static <T> ResultBean success(T data) {
        ResultBean resultBean = new ResultBean(data);
        return resultBean;
    }

    public static <T> ResultBean fail(T data) {
        ResultBean resultBean = new ResultBean(ErrorMessage.FAIL.getErrorCode(), ErrorMessage.FAIL.getMessage(), data);
        return resultBean;
    }

    public enum ErrorMessage {

        SUCCESS("0", "success"),
        FAIL("001", "fail"),
        NOAUTH("1001", "非法访问");

        private String errorCode;
        private String message;

        ErrorMessage(String errorCode, String message) {
            this.errorCode = errorCode;
            this.message = message;
        }

        public String getErrorCode() {
            return errorCode;
        }

        public void setErrorCode(String errorCode) {
            this.errorCode = errorCode;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }
    }
}

7)RoleContent类型

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RoleContent {
    public static final String ROLE_USER = "user";

    public static final String ROLE_ASSISTANT = "assistant";

    private String role;

    private String content;

    public static RoleContent createUserRoleContent(String content) {
        return new RoleContent(ROLE_USER, content);
    }

    public static RoleContent createAssistantRoleContent(String content) {
        return new RoleContent(ROLE_ASSISTANT, content);
    }
}

8)Text类型

import lombok.Data;

@Data
public class Text {
    private String role;

    private String content;
}

2、在config目录下新建XFConfig类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties("xf.config")
public class XFConfig {
    private String appId;

    private String apiSecret;

    private String apiKey;

    private String hostUrl;

    private Integer maxResponseTime;
}

3、在listener目录下
1)新建XFWebClient类

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.example.xunfeigpt.bean.RoleContent;

import com.example.xunfeigpt.config.XFConfig;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;

@Slf4j
@Component
public class XFWebClient {
    @Autowired
    private XFConfig xfConfig;

    /**
     * 发送消息
     *
     * @param uid       每个用户的id,用于区分不同用户
     * @param questions 发送给大模型的消息,可以包含上下文内容
     * @return 获取websocket连接,以便于我们在获取完整大模型回复后手动关闭连接
     */
    /**
     * @description: 发送请求至大模型方法
     * @author: ChengLiang
     * @date: 2023/10/19 16:27
     * @param: [用户id, 请求内容, 返回结果监听器listener]
     * @return: okhttp3.WebSocket
     **/
    public WebSocket sendMsg(String uid, List<RoleContent> questions, WebSocketListener listener) {
        // 获取鉴权url
        String authUrl = null;
        try {
            authUrl = getAuthUrl(xfConfig.getHostUrl(), xfConfig.getApiKey(), xfConfig.getApiSecret());
        } catch (Exception e) {
            log.error("鉴权失败:{}", e);
            return null;
        }
        // 鉴权方法生成失败,直接返回 null
        OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
        // 将 https/http 连接替换为 ws/wss 连接
        String url = authUrl.replace("http://", "ws://").replace("https://", "wss://");
        Request request = new Request.Builder().url(url).build();
        // 建立 wss 连接
        WebSocket webSocket = okHttpClient.newWebSocket(request, listener);
        // 组装请求参数
        JSONObject requestDTO = createRequestParams(uid, questions);
        // 发送请求
        webSocket.send(JSONObject.toJSONString(requestDTO));
        return webSocket;
    }


    /**
     * @description: 鉴权方法
     * @author: ChengLiang
     * @date: 2023/10/19 16:25
     * @param: [讯飞大模型请求地址, apiKey, apiSecret]
     * @return: java.lang.String
     **/
    public static String getAuthUrl(String hostUrl, String apiKey, String apiSecret) throws Exception {
        URL url = new URL(hostUrl);
        // 时间
        SimpleDateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
        format.setTimeZone(TimeZone.getTimeZone("GMT"));
        String date = format.format(new Date());
        // 拼接
        String preStr = "host: " + url.getHost() + "\n" +
                "date: " + date + "\n" +
                "GET " + url.getPath() + " HTTP/1.1";
        // SHA256加密
        Mac mac = Mac.getInstance("hmacsha256");
        SecretKeySpec spec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), "hmacsha256");
        mac.init(spec);

        byte[] hexDigits = mac.doFinal(preStr.getBytes(StandardCharsets.UTF_8));
        // Base64加密
        String sha = Base64.getEncoder().encodeToString(hexDigits);
        // 拼接
        String authorization = String.format("api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"", apiKey, "hmac-sha256", "host date request-line", sha);
        // 拼接地址
        HttpUrl httpUrl = Objects.requireNonNull(HttpUrl.parse("https://" + url.getHost() + url.getPath())).newBuilder().//
                addQueryParameter("authorization", Base64.getEncoder().encodeToString(authorization.getBytes(StandardCharsets.UTF_8))).//
                addQueryParameter("date", date).//
                addQueryParameter("host", url.getHost()).//
                build();

        return httpUrl.toString();
    }

    /**
     * @description: 请求参数组装方法
     * @author: ChengLiang
     * @date: 2023/10/19 16:26
     * @param: [用户id, 请求内容]
     * @return: com.alibaba.fastjson.JSONObject
     **/
    public JSONObject createRequestParams(String uid, List<RoleContent> questions) {
        JSONObject requestJson = new JSONObject();
        // header参数
        JSONObject header = new JSONObject();
        header.put("app_id", xfConfig.getAppId());
        header.put("uid", uid);
        // parameter参数
        JSONObject parameter = new JSONObject();
        JSONObject chat = new JSONObject();
        chat.put("domain", "generalv2");
        chat.put("temperature", 0.5);
        chat.put("max_tokens", 4096);
        parameter.put("chat", chat);
        // payload参数
        JSONObject payload = new JSONObject();
        JSONObject message = new JSONObject();
        JSONArray jsonArray = new JSONArray();
        jsonArray.addAll(questions);

        message.put("text", jsonArray);
        payload.put("message", message);
        requestJson.put("header", header);
        requestJson.put("parameter", parameter);
        requestJson.put("payload", payload);
        return requestJson;
    }
}

2)新建XFWebSocketListener类

package com.example.xunfeigpt.listener;

import com.alibaba.fastjson.JSON;

import com.example.xunfeigpt.bean.JsonParse;
import com.example.xunfeigpt.bean.Text;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;

import java.io.IOException;
import java.util.*;

@Slf4j
public class XFWebSocketListener extends WebSocketListener {
    //断开websocket标志位
    private boolean wsCloseFlag = false;

    //语句组装buffer,将大模型返回结果全部接收,在组装成一句话返回
    private StringBuilder answer = new StringBuilder();

    public String getAnswer() {
        return answer.toString();
    }

    public boolean isWsCloseFlag() {
        return wsCloseFlag;
    }

    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
        log.info("大模型服务器连接成功!");
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
        JsonParse myJsonParse = JSON.parseObject(text, JsonParse.class);
        log.info("myJsonParse:{}", JSON.toJSONString(myJsonParse));
        if (myJsonParse.getHeader().getCode() != 0) {
            log.error("发生错误,错误信息为:{}", JSON.toJSONString(myJsonParse.getHeader()));
            this.answer.append("大模型响应异常,请联系管理员");
            // 关闭连接标识
            wsCloseFlag = true;
            return;
        }
        List<Text> textList = myJsonParse.getPayload().getChoices().getText();
        for (Text temp : textList) {
            log.info("返回结果信息为:【{}】", JSON.toJSONString(temp));
            this.answer.append(temp.getContent());
        }
        log.info("result:{}", this.answer.toString());
        if (myJsonParse.getHeader().getStatus() == 2) {
            wsCloseFlag = true;
            //todo 将问答信息入库进行记录,可自行实现
        }
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        super.onFailure(webSocket, t, response);
        try {
            if (null != response) {
                int code = response.code();
                log.error("onFailure body:{}", response.body().string());
                if (101 != code) {
                    log.error("讯飞星火大模型连接异常");
                }
            }
        } catch (IOException e) {
            log.error("IO异常:{}", e);
        }
    }
}

4、在netty目录下
1)新建NettyServer类

package com.example.xunfeigpt.netty;

import com.example.xunfeigpt.netty.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;

@Slf4j
@Component
public class NettyServer {

    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:62632}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    private String webSocketPath;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;

    private EventLoopGroup workGroup;

    /**
     * 启动
     *
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup, workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接到达时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 流水线管理通道中的处理程序(Handler),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
        /*
        说明:
        1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
        2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
         */
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
        /*
        说明:
        1、对应webSocket,它的数据是以帧(frame)的形式传递
        2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
        3、核心功能是将http协议升级为ws协议,保持长连接
        */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                // 自定义的handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);

            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 释放资源
     *
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }

    @PostConstruct()
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                start();
                log.info("消息推送线程开启!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

2)在handler目录下新建handler类

package com.example.xunfeigpt.netty.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.example.xunfeigpt.bean.NettyGroup;
import com.example.xunfeigpt.bean.ResultBean;
import com.example.xunfeigpt.service.PushService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Autowired
    private PushService pushService;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded被调用,{}", JSON.toJSONString(ctx));
        //todo 添加校验功能,校验合法后添加到group中

        // 添加到channelGroup 通道组
        NettyGroup.getChannelGroup().add(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());
        // 获取用户ID,关联channel
        JSONObject jsonObject = JSON.parseObject(msg.text());
        String channelId = jsonObject.getString("uid");

        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        //String channelId = CharUtil.generateStr(uid);
        NettyGroup.getUserChannelMap().put(channelId, ctx.channel());
        boolean containsKey = NettyGroup.getUserChannelMap().containsKey(channelId);
        //通道已存在,请求信息返回
        if (containsKey) {
            //接收消息格式{"uid":"123456","text":"中华人民共和国成立时间"}
            String text = jsonObject.getString("text");
            //请求大模型服务器,获取结果
            ResultBean resultBean = pushService.pushMessageToXFServer(channelId, text);
            String data = (String) resultBean.getData();
            //推送
            pushService.pushToOne(channelId, JSON.toJSONString(data));
        } else {
            ctx.channel().attr(key).setIfAbsent(channelId);
            log.info("连接通道id:{}", channelId);
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(ResultBean.success(channelId))));
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved被调用,{}", JSON.toJSONString(ctx));
        // 删除通道
        NettyGroup.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("通道异常:{}", cause.getMessage());
        // 删除通道
        NettyGroup.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyGroup.getUserChannelMap().remove(userId);
    }
}

5、在service目录下
1)新建PushService接口

import com.example.xunfeigpt.bean.ResultBean;

public interface PushService {
    void pushToOne(String uid, String text);

    void pushToAll(String text);

    ResultBean pushMessageToXFServer(String uid, String text);
}

2)在impl文件夹下新建PushServiceImpl类

package com.example.xunfeigpt.service.impl;

import com.alibaba.fastjson.JSON;

import com.example.xunfeigpt.bean.NettyGroup;
import com.example.xunfeigpt.bean.ResultBean;
import com.example.xunfeigpt.bean.RoleContent;
import com.example.xunfeigpt.config.XFConfig;
import com.example.xunfeigpt.listener.XFWebClient;
import com.example.xunfeigpt.listener.XFWebSocketListener;
import com.example.xunfeigpt.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import okhttp3.WebSocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
public class PushServiceImpl implements PushService {
    @Autowired
    private XFConfig xfConfig;

    @Autowired
    private XFWebClient xfWebClient;

    @Override
    public void pushToOne(String uid, String text) {
        if (StringUtils.isEmpty(uid) || StringUtils.isEmpty(text)) {
            log.error("uid或text均不能为空");
            throw new RuntimeException("uid或text均不能为空");
        }
        ConcurrentHashMap<String, Channel> userChannelMap = NettyGroup.getUserChannelMap();
        for (String channelId : userChannelMap.keySet()) {
            if (channelId.equals(uid)) {
                Channel channel = userChannelMap.get(channelId);
                if (channel != null) {
                    ResultBean success = ResultBean.success(text);
                    channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(success)));
                    log.info("信息发送成功:{}", JSON.toJSONString(success));
                } else {
                    log.error("该id对于channelId不存在!");
                }
                return;
            }
        }
        log.error("该用户不存在!");
    }

    @Override
    public void pushToAll(String text) {
        String trim = text.trim();
        ResultBean success = ResultBean.success(trim);
        NettyGroup.getChannelGroup().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(success)));
        log.info("信息推送成功:{}", JSON.toJSONString(success));
    }

    //测试账号只有2个并发,此处只使用一个,若是生产环境允许多个并发,可以采用分布式锁
    @Override
    public synchronized ResultBean pushMessageToXFServer(String uid, String text) {
        RoleContent userRoleContent = RoleContent.createUserRoleContent(text);
        ArrayList<RoleContent> questions = new ArrayList<>();
        questions.add(userRoleContent);
        XFWebSocketListener xfWebSocketListener = new XFWebSocketListener();
        WebSocket webSocket = xfWebClient.sendMsg(uid, questions, xfWebSocketListener);
        if (webSocket == null) {
            log.error("webSocket连接异常");
            ResultBean.fail("请求异常,请联系管理员");
        }
        try {
            int count = 0;
            int maxCount = xfConfig.getMaxResponseTime() * 5;
            while (count <= maxCount) {
                Thread.sleep(200);
                if (xfWebSocketListener.isWsCloseFlag()) {
                    break;
                }
                count++;
            }
            if (count > maxCount) {
                return ResultBean.fail("响应超时,请联系相关人员");
            }
            return ResultBean.success(xfWebSocketListener.getAnswer());
        } catch (Exception e) {
            log.error("请求异常:{}", e);
        } finally {
            webSocket.close(1000, "");
        }
        return ResultBean.success("");
    }
}

6、测试controller

import com.example.xunfeigpt.bean.ResultBean;
import com.example.xunfeigpt.service.PushService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/xfModel")
public class XFMessageController {
    @Autowired
    private PushService pushService;

    @GetMapping("/test")
    public ResultBean test(String uid, String text) {
        if (StringUtils.isEmpty(uid) || StringUtils.isEmpty(text)) {
            log.error("uid或text不能为空");
            return ResultBean.fail("uid或text不能为空");
        }
        return pushService.pushMessageToXFServer(uid, text);
    }
}

7、websocket测试
地址:ws://localhost:62632/webSocket

传参:

["uid":"xxxxxx","text";"水泥的种类有哪些?}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1472871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot003图书个性化推荐系统的设计与实现(源码+调试+LW)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SpringBoot的图书个…

SOLIDWORKS 查找并修复装配体配合错误

我们在SOLIDWORKS 正版软件进行装配体装配时&#xff0c;时常会出现一些报错&#xff0c;例如在配合、装配体特征或被装配体参考引用的零部件和子装配体中。一些常见的错误&#xff0c;如一个零部件的过定义会引发更多其他错误信息&#xff0c;并导致装配体停止解析配合关系。下…

RestTemplate启动问题解决

⭐ 作者简介&#xff1a;码上言 ⭐ 代表教程&#xff1a;Spring Boot vue-element 开发个人博客项目实战教程 ⭐专栏内容&#xff1a;个人博客系统 ⭐我的文档网站&#xff1a;http://xyhwh-nav.cn/ RestTemplate启动问题解决 问题&#xff1a;在SpringCloud架构项目中配…

汽车大灯尾灯划痕裂缝破洞破损掉角崩角等如何修复?根本没必要换车灯换总成,使用无痕修UV树脂胶液即可轻松搞定。

TADHE车灯无痕修复专用UV胶是一种经过处理的UV树脂胶&#xff0c;主要成份是改性丙烯酸UV树脂。应用在车灯的专业无痕修复领域。 车灯修复UV树脂有以下优点&#xff1a; 1. 快速修复&#xff1a;此UV树脂是一种用UV光照射在10秒内固化的材料。 2. 高强度&#xff1a;UV树脂固…

【npm下载包报错:CERT_HAS_EXPIRED,问题解决】

npm下载包报错&#xff1a;CERT_HAS_EXPIRED npm安装依赖的时候出现报错 根据第三行报错的提示得知报错原因是证书已过期 上网一查&#xff0c;原来常用的淘宝镜像早就换新域名了&#xff0c; 之前的镜像域名在2024年1月22日https证书到期了 替换为最新的地址就可以了 npm …

蛋白结构预测模型评价指标

欢迎浏览我的CSND博客&#xff01; Blockbuater_drug …点击进入 文章目录 前言一、蛋白结构预测模型评价指标TM-scorelDDT 二、Alphafold中的评价指标pLDDTpTMPAE 三、AlphaFold-multimer 蛋白结构的评价指标DockQipTM 总结参考资料 前言 本文汇总了AlphaFold和AlphaFold-mul…

线性表——单链表的增删查改(下)

本节继续上节未完成的链表增删查改接口的实现。这是上节的地址:线性表——单链表的增删查改&#xff08;上&#xff09;-CSDN博客 上节实现的接口如下&#xff1a; //申请链表节点函数接口 SLNode* BuySListNode(SLTDataType x); //单链表的打印函数接口 void SListPrint(SLNod…

探索比特币现货 ETF 对加密货币价格的潜在影响

撰文&#xff1a;Sean&#xff0c;Techub News 文章来源Techub News&#xff0c;搜Tehub News下载查看更多Web3资讯。 自美国比特币现货交易所交易基金&#xff08;ETF&#xff09;上市以来&#xff0c;比特币现货 ETF 的相关信息无疑成为了影响比特币价格及加密货币市场走向…

《Docker 简易速速上手小册》第10章 朝着 Docker Swarm 和 Kubernetes 迈进(2024 最新版)

文章目录 10.1 Docker Swarm 基础10.1.1 重点基础知识10.1.2 重点案例&#xff1a;Python Web 应用的 Docker Swarm 部署10.1.3 拓展案例 1&#xff1a;微服务架构的 Docker Swarm 部署10.1.4 拓展案例 2&#xff1a;使用 Docker Swarm 进行持续部署 10.2 Kubernetes 与 Docker…

nginx 从$http_x_forwarded_for 中获取第一个参数

在 Nginx 中&#xff0c;$http_x_forwarded_for 变量通常包含了客户端的原始 IP 地址以及可能经过的代理服务器的 IP 地址列表&#xff0c;这些地址由逗号分隔。如果你想从 $http_x_forwarded_for 中截取第一个参数&#xff08;即最左边的 IP 地址&#xff09;&#xff0c;你可…

C语言中的套娃——函数递归

目录 一、什么是递归 1.1.递归的思想 1.2.递归的限制条件 二、举例体会 2.1.求n的阶乘 2.2.顺序打印整数的每一位 2.3.斐波那契数列 三、递归与迭代 一、什么是递归 在学习C语言的过程中&#xff0c;我们经常会跟递归打交道&#xff0c;什么是递归呢&#xff1f;它其实…

用于自监督视觉预训练的屏蔽特征预测

Masked Feature Prediction for Self-Supervised Visual Pre-Training 一、摘要 提出了用于视频模型自监督预训练的掩模特征预测&#xff08;MaskFeat&#xff09;。首先随机屏蔽输入序列的一部分&#xff0c;然后预测屏蔽区域的特征。研究了五种不同类型的特征&#xff0c;发…

vue3 + TS + vite 搭建中后台管理系统(开箱即用)

[TOC](vue3 TS vite 搭建中后台管理系统) 开箱即用 前言 要成功&#xff0c;先发疯&#xff0c;头脑简单往前冲&#xff01; 三金四银&#xff0c;金九银十&#xff0c;多学知识&#xff0c;也不能埋头苦干&#xff0c;要成功&#xff0c;先发疯&#xff0c;头脑简单往前冲…

Java计划线程池ScheduledThreadPoolExecutor运行流程和源码分析

1. 计划线程池ScheduledThreadPoolExecutor简介 ScheduledThreadPoolExecutor继承自线程池ThreadPoolExecutor&#xff0c;并在其基础上增加了按时间调度执行任务的功能&#xff0c;如果对ThreadPoolExecutor还不是很熟悉&#xff0c;可以阅读一下这篇文章&#xff1a; Java线…

成都东部新区文化旅游体育局莅临国际数字影像产业园参观入驻企业,共促政产交流“零距离”

2月23日&#xff0c;成都东部新区文化旅游体育局投服处处长田东一行莅临国际数字影像产业园考察交流&#xff0c;树莓科技&#xff08;成都&#xff09;集团有限公司副总裁吴晓平、行政运营经理郭宇风、国际数字影像产业园项目负责人万里全程接待。 吴晓平副总带领田东处长一行…

开发知识点-.netC#图形用户界面开发之WPF

C#图形用户界面开发 框架简介WinForms(Windows Forms):WPF(Windows Presentation Foundation):UWP(Universal Windows Platform):MAUI(Multi-platform App UI):选择控件参考文章随笔分类 - WPF入门基础教程系列基于C#语言的GUI开发,主要介绍WPF框架

CAD怎么绘制建筑平面图纸?

CAD沪指图纸很简单&#xff0c;想要绘制一个简单的建筑图纸&#xff0c;该怎么绘制建筑平面图呢&#xff1f;下面我们就来看看详细的教程。 1、首先&#xff0c;运用绘图功能中的直线按照比例尺寸绘制出轴网。轴网绘制我们一般将轴网的颜色选择为红色&#xff0c;轴网的线型选择…

jdk21本地执行flink出现不兼容问题

环境说明&#xff1a;换电脑尝尝鲜&#xff0c;jdk&#xff0c;flink都是最新的&#xff0c;千辛万苦把之前的项目编译通过&#xff0c;跑一下之前的flink项目发现启动失败&#xff0c;啥都不说了上异常 Exception in thread "main" java.lang.IllegalAccessError: …

一次奇怪的事故:机器网络连接打满,导致服务不可用

业务背景 发生事故的业务系统是一个toB业务&#xff0c;业务是服务很多中小企业进行某项公共信息指标查询。系统特点:业务处理相对简单&#xff0c;但是流量大&#xff0c;且对请求响应要求较高&#xff1a; 业务请求峰值qps达50w&#xff0c;平时流量达20w左右。 请求响应时…

18 SpringMVC实战

18 SpringMVC实战 1. 课程介绍2. Spring与Spring MVC环境配置 1. 课程介绍 2. Spring与Spring MVC环境配置