webSocket对接参考
话不多说直接上代码
WebSocket
package com.student.config;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @Description:
* @Author: hwk
* @Date: 2024-07-17 17:46
* @Version: 1.0
**/
@Slf4j
@Component
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* gpt密钥
*/
private static final String key = "";
/**
* 请求地址
*/
private static final String url = "";
/**
* 用户ID
*/
private String userId;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
// 注:底下WebSocket是当前类名
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:" + message);
JSONObject jsonObject = new JSONObject();
JSONArray objects = new JSONArray();
JSONObject messages = new JSONObject();
messages.put("role", "user");
messages.put("content", message);
objects.add(messages);
jsonObject.put("model", "gpt-3.5-turbo");
jsonObject.put("messages", objects);
jsonObject.put("max_tokens", 1024);
jsonObject.put("temperature", 0);
jsonObject.put("stream", true);
Map<String, String> heads = new HashMap<>();
heads.put("Content-Type", "application/json");
heads.put("Accept", "application/json");
heads.put("Authorization", "Bearer "+key);
WebClient webClient = WebClient.create();
Flux<String> stringFlux = webClient.post()
.uri(url)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.header("Authorization", "Bearer " + key)
.accept(MediaType.TEXT_EVENT_STREAM)
.bodyValue(jsonObject)
.retrieve()
.bodyToFlux(String.class);
stringFlux.subscribe(s -> {
if (!Objects.equals(s, "[DONE]")) {
JSONObject parsed = JSONObject.parseObject(s);
JSONArray choices = parsed.getJSONArray("choices");
if (!choices.isEmpty()) {
JSONObject dataJson = JSONObject.parseObject(choices.get(0).toString());
String content = dataJson.getJSONObject("delta").getString("content");
if (StringUtils.hasLength(content)) {
try {
content = content.replaceAll("\n", "<br>");
content = content.replace(" ", "");
log.info(content);
if (sessionPool != null) {
sessionPool.get(userId).getBasicRemote().sendText(content);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
}
/**
* 发送错误时的处理
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 此为广播消息
*
* @param message
*/
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
for (WebSocket webSocket : webSockets) {
try {
if (webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 此为单点消息
*
* @param userId
* @param message
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 此为单点消息(多人)
*
* @param userIds
* @param message
*/
public void sendMoreMessage(String[] userIds, String message) {
for (String userId : userIds) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
WebSocketConfig
package com.student.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Description: WebSocketConfig配置
* @Author: hwk
* @Date: 2024-07-17 17:44
* @Version: 1.0
**/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
html代码
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>ChatGPT</title>
<script src="marked.min.js"></script>
<link rel="stylesheet" type="text/css" href="index.css">
<style>
.normal-text {
color: black;
}
.rich-text {
color: blue;
font-weight: bold;
}
</style>
</head>
<body>
<h2>ChatGPT</h2>
<div id="message-container">
<p id="message"></p>
</div>
<div id="footer">
<input id="text" class="my-input" type="text" />
<button onclick="send()">发送</button>
</div>
<div id="footer1">
<br />
<button onclick="closeWebSocket()">关闭WebSocket连接</button>
<button onclick="openWebSocket()">建立WebSocket连接</button>
</div>
<script>
marked.setOptions({
highlight: function (code, lang) {
return hljs.highlightAuto(code).value;
}
});
var websocket = null;
// 判断当前浏览器是否支持WebSocket,是则创建WebSocket
if ('WebSocket' in window) {
console.log("浏览器支持WebSocket");
websocket = new WebSocket("ws://127.0.0.1:8056/websocket/1");
} else {
alert('当前浏览器不支持WebSocket');
}
// 连接发生错误的回调方法
websocket.onerror = function () {
console.log("WebSocket连接发生错误");
setMessageInnerHTML("WebSocket连接发生错误");
};
// 连接成功建立的回调方法
websocket.onopen = function () {
console.log("WebSocket连接成功");
};
// 接收到消息的回调方法
websocket.onmessage = function (event) {
if (event.data) {
setMessageInnerHTML(event.data);
}
console.log(event.data);
};
// 连接关闭的回调方法
websocket.onclose = function () {
console.log("WebSocket连接关闭");
};
// 关闭WebSocket连接
function closeWebSocket() {
websocket.close();
}
// 发送消息
function send() {
var message = document.getElementById('text').value;
websocket.send(message);
}
// 建立连接的方法
function openWebSocket() {
websocket = new WebSocket("ws://127.0.0.1:8056/websocket/1");
websocket.onopen = function () {
console.log("WebSocket连接成功");
};
}
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
console.log(innerHTML);
// var element = document.getElementById('message');
// if (innerHTML.match(/```/g)) {
// element.innerHTML += marked(innerHTML); // 使用marked渲染Markdown
// } else {
// element.innerHTML += innerHTML; // 直接添加普通文本消息
// }
document.getElementById('message').innerHTML += innerHTML;
}
// 如果websocket连接还没断开就关闭了窗口,后台server端会抛异常。
// 所以增加监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接
window.onbeforeunload = function () {
closeWebSocket();
};
</script>
</body>
</html>
效果