🎉🎉欢迎光临,终于等到你啦🎉🎉
🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀
🌟持续更新的专栏《Spring 狂野之旅:从入门到入魔》 🚀
本专栏带你从Spring入门到入魔
这是苏泽的个人主页可以看到我其他的内容哦👇👇
努力的苏泽http://suzee.blog.csdn.net/
本文给大家带来的是SpringBoot整合WebSocket 实现一个简单的聊天功能 然后再进阶到语音的聊天 视频聊天
目录
在视频聊天的基础上 还要再实现 美颜、心跳检查掉线、掉帧优化。掉线重连等企业级业务需求
一、WebSocket概述:编辑
实现步骤
首先引入依赖
设置拦截器 自定义报错
这是我做的自定义类型 可以根据自己的修改
拦截器配置
拦截器实现
websocket服务实现
在视频聊天的基础上 还要再实现 美颜、心跳检查掉线、掉帧优化。掉线重连等企业级业务需求
一、WebSocket概述:
WebSocket是基于TCP协议的一种网络协议,它实现了浏览器与服务器全双工通信,支持客户端和服务端之间相互发送信息。在有WebSocket之前,如果服务端数据发生了改变,客户端想知道的话,只能采用定时轮询的方式去服务端获取,这种方式很大程度上增大了服务器端的压力,有了WebSocket之后,如果服务端数据发生改变,可以立即通知客户端,客户端就不用轮询去换取,降低了服务器的压力。目前主流的浏览器都已经支持WebSocket协议了。
WebSocket使用ws和wss作资源标志符,它们两个类似于http和https,wss是使用TSL的ws。主要有4个事件:
- onopen 创建连接时触发
- onclose 连接断开时触发
- onmessage 接收到信息时触发
- onerror 通讯异常时触发
实现步骤
首先引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!-- websocket -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<!-- fastjson -->
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
设置拦截器 自定义报错
@Slf4j
@RestControllerAdvice
public class WebExceptionAdvice {
@ExceptionHandler(RuntimeException.class)
public ResponseEntity<Result> handleRuntimeException(HttpServletRequest request, RuntimeException e) {
log.error(e.toString(), e);
Result result = Result.fail(e.getMessage());
HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;//500
if (e instanceof UnAuthorException) {
//这个是拦截器报错才设置的状态码
status = HttpStatus.UNAUTHORIZED;//401
}
ResponseEntity<Result> resultResponseEntity = new ResponseEntity<>(result, status);
log.error(resultResponseEntity.toString());
return resultResponseEntity;
}
}
这是我做的自定义类型 可以根据自己的修改
public class UnAuthorException extends RuntimeException {
public UnAuthorException(String message) {
super(message);
}
}
拦截器配置
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
//添加拦截器 InterceptorRegistry registry 拦截器的注册器 excludePathPatterns排除不需要的拦截的路径
// 只要跟登录无关就不需要拦截 拦截器的作用只是校验登录状态
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/index/**",
"/user/wechat/login",
"/user/zfb/login",
//...这里自己去设置 不想被拦截的页面 剩下的就是被拦截的
).order(1);
// order是设置先后
// 刷新token的拦截器
registry.addInterceptor(new RefreshTokeninterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
}
}
拦截器实现
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser() == null&&ListenerHolder.getListener()==null) {
System.out.println("拦截器报错啦!!!");
//response.getHeader("erro");
throw new UnAuthorException("用户未登录");
}
return true;
}
}
/*/**
*@author suze
*@date 2023-10-25
*@time 15:23
**/
public class RefreshTokeninterceptor implements HandlerInterceptor {
//而MvcConfig中使用了 LoginInterceptor 所以我们要去到MvcConfig进行注入
private StringRedisTemplate stringRedisTemplate;
//因为这个类不是spring boot构建的,而是手动创建的类,所以依赖注入不能用注解来注入,要我们手动使用构造函数来注入这个依赖
public RefreshTokeninterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String token2=request.getHeader("token2");
String ListenerKey = LOGIN_LISTENER_KEY + token2;
//这里的倾听者信息是在倾听者登录的函数里面把倾听者信息录入进去
String LisStr = stringRedisTemplate.opsForValue().get(ListenerKey);
if(LisStr== null || LisStr.isEmpty()){
System.err.println("倾听者token为空");
}
else {
Listener listener = JSON.parseObject(LisStr, Listener.class);
ListenerHolder.saveListener(listener);
stringRedisTemplate.expire(ListenerKey,15, TimeUnit.MINUTES);
return true;
}
//获取请求头中的token 在前端代码中详见authorization
String token = request.getHeader("token");
if(StrUtil.isBlank(token)){//判断是否为空
System.err.println("token为空");
return true;
}
// 基于token获取Redis用户
String key =LOGIN_USER_KEY+token;
String userstr = stringRedisTemplate.opsForValue().get(key);
//System.err.println("基于token获取Redis用户:"+userstr);
//判断用户是否存在 不存在的话就查询是否是倾听者的情况
if(userstr== null || userstr.isEmpty()){
System.err.println("用户为空");
return true;
}
// 将查询到的user的json字符串转化为user对象
User user = JSON.parseObject(userstr, User.class);
//存在 保存用户信息到TheadLocal
UserHolder.saveUser(user);
System.out.println("保存用户"+user.getOpenId()+"信息到TheadLocal了");
//刷新token有效期
stringRedisTemplate.expire(key,15, TimeUnit.MINUTES);
//放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户
UserHolder.removeUser();
ListenerHolder.removeListener();
}
}
根据自己需求 删掉一些我这边业务的部分 不删也行 也能用 就是有点慢
websocket服务实现
@ServerEndpoint(value = "/imserver/{userId}")
@Component
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 记录当前在线连接数
*/
public static final Map<String, Session> sessionMap = new ConcurrentHashMap<>();
//public static final Map<String, Session> UserMap = new ConcurrentHashMap<>();这里没有需要知道对方名字的需求 所以不需要加 需要再加
/**
* <<<<<<< HEAD
* 设置为静态的 公用一个消息map ConcurrentMap为线程安全的map HashMap不安全
*/
//这里的messageMap存的是某用户已经离线 他离线后收到的消息的集合 所以这里的key是接收者的key
private static ConcurrentMap<String, List<String>> messageMap = new ConcurrentHashMap<>();
/**
*
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
sessionMap.put(userId, session);
// stringRedisTemplate.opsForList().
log.info("有新用户加入,userId={}, 当前在线人数为:{}", userId, sessionMap.size());
JSONObject result = new JSONObject();
JSONArray array = new JSONArray();
result.set("users", array);
for (Object key : sessionMap.keySet()) {
JSONObject jsonObject = new JSONObject();
jsonObject.set("userId", key);
// {"userId": "aysgduiehfiuew", "userId": "admin"}
array.add(jsonObject);
}
//这里得到的是该用户的历史记录map userMessage
List<String> userMessage = messageMap.get(userId);
//载入历史记录 这个过程相当于重新把消息发给自己
if (userMessage!=null) {
for (int i = userMessage.size() - 1; i >= 0; i--) {
String message = userMessage.get(i);
//这里的session的作用是告诉sendMessage发给谁 这里是要加载自己错过的历史消息
// 所以是把历史记录发给自己 所以toSession填的是自己的session
this.sendMessage(message, session);
// Thread.sleep(10000);
}
messageMap.remove(userId);
}
// {"users": [{"userId": "zhang"},{ "userId": "admin"}]}
sendAllMessage(JSONUtil.toJsonStr(result)); // 后台发送消息给所有的客户端
}
/**
* 服务端发送消息给客户端
*/
private void sendMessage(String message, Session toSession) {
try {
log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
//String from = JSONUtil.parseObj(message).getStr("from");
if (!messageMap.get(toSession.getId()).isEmpty()) {
List<String> list = messageMap.get(toSession.getId());
log.info("有待发送的消息,继续存储");
list.add(message);
//toSession是被发送者的id
messageMap.put(toSession.getId(), list);
return;
} else {
List<String> list = new ArrayList<>();
//该用户发的离线消息的集合
list.add(message);
messageMap.put(toSession.getId(), list);
log.info("用户不在线保存信息");
return;
}
} catch (Exception e) {
log.error("服务端发送消息给客户端失败", e);
}
// {"users": [{"userId": "zhang"},{ "userId": "admin"}]}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
sessionMap.remove(userId);
log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", userId, sessionMap.size());
}
/**
* 收到客户端消息后调用的方法
* 后台收到客户端发送过来的消息
* onMessage 是一个消息的中转站
* 接受 浏览器端 socket.send 发送过来的 json数据
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session, @PathParam("userId") String userId) {
log.info("服务端收到用户username={}的消息:{}", userId, message);
JSONObject obj = JSONUtil.parseObj(message);
String toUserId = obj.getStr("to"); // to表示发送给哪个用户,比如 admin
String text = obj.getStr("text"); // 发送的消息文本 hello
//建立一个数组 把每一次的都装进去 然后下面
//TODO 这里要写 一个缓存历史记录的方法来处理 除了test123 是用于心跳的 就不用缓存
if(!toUserId.equals("test123")){
Session toSession = sessionMap.get(toUserId); // 根据 to userId来获取 session,再通过session发送消息文本
if (toSession != null) {
// 服务器端 再把消息组装一下,组装后的消息包含发送人和发送的文本内容
// {"from": "zhang", "text": "hello"}
JSONObject jsonObject = new JSONObject();
jsonObject.set("from", userId); // from 是 zhang
jsonObject.set("text", text); // text 同上面的text
this.sendMessage(jsonObject.toString(), toSession);
log.info("发送给用户username={},消息:{}", toUserId, jsonObject.toString());
} else {
log.info("发送失败,未找到用户username={}的session", toUserId);
}
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 服务端发送消息给所有客户端
*/
private void sendAllMessage(String message) {
try {
for (Session session : sessionMap.values()) {
log.info("服务端给客户端[{}]发送消息{}", session.getId(), message);
session.getBasicRemote().sendText(message);
}
} catch (Exception e) {
log.error("服务端发送消息给客户端失败", e);
}
}
}