📞 文章简介:WebSocket实时通知Demo
💡 创作目的:因为公司正在从零搭建CRM,其中有一个需求是系统通知管理,老板发布通知给员工。简单的用数据库实现感觉缺少一些实时性,不是那么生动。于是想到了使用WebSocket建立通讯,让系统中在线的员工可以实时接收到系统通知。借此学习一下WebSocket,📝 每日一言:学习如一粒种子,只有努力播种才会有收获。
☀️ 今日天气:2022-11-19 多云 满是灰色的🤫
文章目录
- WebSocket核心代码
- WebSocket核心业务类
- 消息实体
- webSocket 配置bean
- 关于session 、socket池的管理
- 多例注入单例获取bean的方法 (感谢大佬 😍)
- 总结
效果演示🌈😁
注意:因为是个实现效果的小demo,所以用了若依开源框架快速集成。也借此机会使用一下若依其他生态项目。gif有些掉帧😮
WebSocket核心代码
话不多说直接上代码
WebSocket核心业务类
WebSocket.java
//springboot 组建注解
@Component
//核心socket路径注解
@ServerEndpoint("/websocket/{userId}")
@CrossOrigin
@Lazy
//此注解相当于设置访问URL
public class WebSocket {
//注入session对象用来读取发送消息
private Session session;
//因为涉及到发布短信接收短信还有一些短信的状态在数据库所以引入了messageService
//这里遇到一个问题,就是socket每次连接会创建多个实例 但是引入了注入到Spring中的bean
//普通的 @Autowired 注解注入不进来,所以额外写了一个读取Bean的类下面会有
ICclMessageService cclMessageService = (ICclMessageService) SpringContext.getBean("cclMessageServiceImpl");
//初始化socket实例,建立连接
//userId 这个参数自定义,根据路径上面的值而定,可以根据业务修改或者多参传入
@OnOpen
public void onOpen(Session session,@PathParam("userId")Integer userId) {
this.session = session;
//将连接添加到sockets池
//CurPool是自定义的一个socket池,将他们集中管理起来
CurPool.webSockets.put(userId,this);
System.out.println("【websocket消息】有新的连接,总数为:"+CurPool.webSockets.size());
//下面是业务逻辑,每次连接会查询没有发送的通知,并接收通知
LambdaQueryWrapper<CclMessage> lq =new LambdaQueryWrapper<>();
lq.eq(CclMessage::getSysUserId,userId);
lq.eq(CclMessage::getSendStatus,0);
//查询离线的时候发送过来的通知
List<CclMessage> list = cclMessageService.list(lq);
if (list!=null){
//进行遍历发送
List<String> msgList =new ArrayList<>();
for (CclMessage cclMessage : list) {
String mes = JSON.toJSONString(cclMessage);
msgList.add(mes);
cclMessage.setSendStatus(1);
}
//调用发送多条消息方法,进行通知
sendAllMessageByUserId(msgList,userId);
//发送状态改变
cclMessageService.updateBatchById(list);
}
}
//连接关闭走的方法
@OnClose
public void onClose() {
// 断开连接删除用户删除session
Integer userId = Integer.parseInt(this.session.getRequestParameterMap().get("userId").get(0));
//从socket池中移除实例
CurPool.webSockets.remove(userId);
System.out.println("【websocket消息】连接断开,总数为:"+CurPool.webSockets.size());
}
//发送消息实例(处理发送消息类型,进行发送)
@OnMessage
public void onMessage(String message) {
MessageVo messageVo = JSON.parseObject(message, MessageVo.class);
System.out.println("现在存活个数"+CurPool.webSockets.size());
//发送消息
String message1 = JSON.toJSONString(messageVo);
String[] userIds = messageVo.getUserIds().split(",");
Integer[] userIdsInt = new Integer[userIds.length==0?1:userIds.length];
int count =0;
for (String userId : userIds) {
userIdsInt[count] = Integer.valueOf(userId);
count++;
}
sendByUserIdMessage(message1,userIdsInt);
}
// 指定多人通知 (具体发送消息方法)
public void sendByUserIdMessage(String message,Integer[] userIds) {
//遍历发送的用户id,给他们发送消息
for (Integer userId : userIds) {
//在socket池中获取实例
WebSocket webSocket = CurPool.webSockets.get(userId);
//如果为null则说明不在线
if (webSocket == null){
System.out.println(userId+"需要稍后通知!!!");
}else {
try {
//发送消息给指定用户
webSocket.session.getAsyncRemote().sendText(message);
System.out.println("用户名为"+userId+"已经发送完:"+message);
//修改数据库中发送状态
LambdaQueryWrapper<CclMessage> lq =new LambdaQueryWrapper<CclMessage>();
lq.eq(CclMessage::getSysUserId,userId);
cclMessageService.update(new CclMessage().setSendStatus(1),lq);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//发送多条消息给个人
public void sendAllMessageByUserId(List<String> messageList,Integer userId ){
WebSocket webSocket = CurPool.webSockets.get(userId);
if (webSocket != null){
for (String msg : messageList) {
webSocket.session.getAsyncRemote().sendText(msg);
System.out.println(msg+"消息已经发送完");
try {
Thread.sleep(500);
}catch (Exception e){
System.out.println("延迟失败");
e.printStackTrace();
}
}
}
}
}
上面的是主要的WebSocket业务处理代码,因为写的小demo,想着实现功能,逻辑可能过于不注重性能,勿喷!
重点
- @ServerEndpoint(“/websocket/{userId}”) 这是定义ws连接路径,根据实际业务自定义参数传递及名称
- @OnOpen 连接进来以后初始化session实例的方法,主要走一些实例存储等逻辑
- @OnClose 连接关闭后需要进行的业务逻辑,比如说在池中删除实例、界面对应响应等等等
- @OnMessage 做一些发送消息,接收消息的业务逻辑
消息实体
MessageVo.java
public class MessageVo {
//标题
private String title;
//内容
private String content;
//用户id
private String userIds;
}
get、set、toString 我就省略了太占地方了
webSocket 配置bean
WebSocketConfig.java
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
将WebSocket注入到bean
关于session 、socket池的管理
CurPool.java
/**
* 统一管理session、websocket
*/
public class CurPool {
public static Map<Integer, WebSocket> webSockets = new ConcurrentHashMap<>();
// list 里面第一个存sessionId,第二个存session
public static Map<Integer, List<Object>> sessionPool = new ConcurrentHashMap<>();
}
主要存储session跟socket实例管理,可以通过池的管理进行范围的区分,后期可以扩展到以组为维度,并非个人。实现群发,创建部门群等操作。
多例注入单例获取bean的方法 (感谢大佬 😍)
@Component
public class SpringContext implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(SpringContext.class);
private static ApplicationContext applicationContext;
private static DefaultListableBeanFactory beanFactory;
public static <T> T getBean(String name, Class<T> clazz) {
return applicationContext.getBean(name, clazz);
}
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
public static DefaultListableBeanFactory getBeanFactory() {
return beanFactory;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContext.applicationContext = applicationContext;
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext ;
beanFactory = (DefaultListableBeanFactory) configurableApplicationContext
.getBeanFactory();
}
public static void printBeanDefinitionNames() {
String[] beanDefinitionNames = applicationContext
.getBeanDefinitionNames();
for (int i = 0; i < beanDefinitionNames.length; i++) {
logger.info(beanDefinitionNames[i]);
}
}
public static XmlBeanDefinitionReader getXmlBeanDefinitionReader() {
return new XmlBeanDefinitionReader((BeanDefinitionRegistry) beanFactory);
}
/**
* 动态加载bean
* @param fileName
* @throws BeanDefinitionStoreException
* @throws IOException
*/
public static void loadBean(String fileName)
throws BeanDefinitionStoreException, IOException {
XmlBeanDefinitionReader beanDefinitionReader = getXmlBeanDefinitionReader();
beanDefinitionReader.setResourceLoader(applicationContext);
beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(
applicationContext));
beanDefinitionReader.loadBeanDefinitions(applicationContext
.getResources(fileName));
}
public static void loadBean(String fileName, String propertyHolderBeanName)
throws BeanDefinitionStoreException, IOException {
XmlBeanDefinitionReader beanDefinitionReader = getXmlBeanDefinitionReader();
beanDefinitionReader.setResourceLoader(applicationContext);
beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(
applicationContext));
beanDefinitionReader.loadBeanDefinitions(applicationContext
.getResources(fileName));
if (propertyHolderBeanName != null) {
postProcessBeanFactory(propertyHolderBeanName);
}
}
public static void postProcessBeanFactory(String propertyHolderBeanName)
throws BeanDefinitionStoreException, IOException {
BeanFactoryPostProcessor bfpp = (BeanFactoryPostProcessor) SpringContext
.getBean(propertyHolderBeanName);
bfpp.postProcessBeanFactory(SpringContext.getBeanFactory());
}
}
哈哈,这是遇到问题的时候去解决在网上大佬发布的,小弟属实佩服!!!
总结
工作以后不像原来系统化的学习,盲目去学,而是抓住工作中的需求。根据工作的需求找到合理的解决方案,在解决问题的同时升华自己,从中不断吸取知识。这强于系统化的学习很多倍。
🌈欢迎大佬们阅读,也希望可以有更好的想法弹出。学习如一粒种子,只有努力播种才会有收获。