1.基本概念
WebSocket 支持双方通信即服务端可以主动推送给用户端,用户端也可以主动推送消息给服务器。前端必须进行协议升级为 WebSocket
名称 值 Upgrade websocket
2. 后端代码
package com.koshi.websocket.server;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
@ServerEndpoint("/api/pushMessage/{userId}")
public class WebSocketServer {
/**静态变量,用来记录当前在线连接数*/
private static final AtomicInteger onlineCount = new AtomicInteger(0);
/**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/
private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;
/**接收userId*/
private String userId;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
// 加入map中
webSocketMap.put(userId, this);
} else {
// 加入map中
webSocketMap.put(userId, this);
// 在线数加1
onlineCount.incrementAndGet();
}
System.out.println("用户连接:" + userId + ",当前在线人数为:" + onlineCount);
sendMessage("连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
// 在线人数减1
onlineCount.decrementAndGet();
}
log.info("用户退出:" + userId + ",当前在线人数为:" + onlineCount);
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:" + userId + ",报文:" + message);
// 解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
// 获取需要转发的用户id
String toUserId = jsonObject.getString("toUserId");
// 传送给对应toUserId用户的websocket
if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).sendMessage(message);
log.info("消息发送成功");
} else {
log.error("请求的userId:" + toUserId + "不在该服务器上");
}
}
/**
* 发生异常调用方法
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) {
this.session.getAsyncRemote().sendText(message);
}
/**
*发送自定义消息
**/
public static void sendInfo(String message, String userId) {
log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用户" + userId + ",不在线!");
}
}
}
3. 进行测试
进行连接
接着发送数据用户 1 给用户 2
用户 2 接收情况
4. 总结以及场景延申
4.1 WebSocket 是什么东西
WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信。
4.2 如何使用?
- 先导入 webSocket 的包
- 然后配置 webSocket 的配置类
- 开始配置 WebSocket 的参数,首先要存放当前在线的用户人数,这里推荐使用 AtomicInteger, 原子性操作,可以防止并发出现的问题;里面还有 ConcureentMap, 这个是线程线程安全的 Map,使用的版本是大于 1.8 的,底层是 Synchronized+node 节点实现的,保证了线程安全。还有 userId, 确定接收用户的 id;Session 是用户会话保存。
- 首先发送协议。ws:…. xx: 请求,然后我有一个 @OnOpen 的注解,连接的时候,会自动走他这个语句,主键是 userId, 值是 session,然后调用 session 的异步调用确定发送参数,提示用户连接成功
- 接着我用户发送消息,我通过你的 message 参数里面提取我要的字段,进行处理,然后异步推送即可
4.3 相关业务场景拷打
WebSocket 实现服务端接收移动端定位在网页前端显示_服务端获取客户端定位 - CSDN 博客
业务类似于这个