解决websocket集群的session共享问题

news2025/1/11 22:53:00

在websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储到redis这些中间存储里面,因此这里我们只能把session存储在本地的内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享。在websocket中,其实是无法做到session共享的,目前通用的解决方案都是通过消息中间件,实现消息的发布与订阅,也就是每一个服务端实例都订阅某个消息队列的topic,根据对应的sessionid来判断是否在本地存储,如果在本地通过sessionid找到了session,则给客户端发送消息,如果在本地找不到对应的session,那么就直接把这条消息丢弃掉。具体的如下图所示:

 这里的图来自于网上,网上大多都是基于redis做发布与订阅,在真实的环境中,我们一般用kafka或者rocketmq等。根据上面的图示,我们介绍下整个流程:

1、我们同时有A,B,C,D四个websocket服务端,同时订阅消息队列的topic: test8
2、我们发送一条消息a1到消息队列的topic:test8
3、此时A,B,C,D四个websocket服务端都会收到这条消息a1
4、A根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
5、B根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
6、C根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
7、D根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,结果找到有对应的session,此时我们就把这条消息发送给=这个session。
8、客户端就收到了对应的消息。

一、创建一个公共的map,用来存放session

package com.websocket.utils;

import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.Session;

import org.springframework.stereotype.Component;

@Component
public class OnlineSessionCache {

	private ConcurrentHashMap<Integer, Session> onlines = new ConcurrentHashMap<Integer, Session>();

	public void setUserSession(Integer userId, Session session) {
		onlines.put(userId, session);
	}

	public Session getUserSession(Integer userId) {
		return onlines.get(userId);
	}

	public void removeUserSession(Integer userId) {
		onlines.remove(userId);
	}
	
	public ConcurrentHashMap<Integer, Session> getAllSession() {
		return this.onlines;
	}

}

二、在websocket连接和关闭的时候,把session关闭掉

@OnOpen
	public void onOpen(Session session,EndpointConfig config) {

		this.session = session;
		log.info("当前session id : {}  登录进来了", session.getId());
		OnlineCalUtils.addOnlineCount();
		onlineSessionCache.setUserSession(Integer.valueOf(session.getId()), session);
		log.info("存储session了多少个session:{}", onlineSessionCache.getAllSession().size());
		log.info("有新连接加入!当前在线人数为 :{} ", getOnlineCount());
	}
@OnClose
	public void onClose() {
		OnlineCalUtils.subOnlineCount();
		log.info("有一连接关闭!当前在线人数为: {}", getOnlineCount());
		onlineSessionCache.removeUserSession(Integer.valueOf(this.session.getId()));
		log.info("当前session id : {}  退出去了");
	}

三、编写一个接口,用来给指定的用户发送消息

package com.websocket.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.websocket.model.ChatModel;
import com.websocket.producer.RocketProducer;
import com.websocket.utils.ChatModelUtils;

import lombok.extern.slf4j.Slf4j;

@RestController
@Slf4j
public class ChatMsgController {

	@Autowired
	private RocketProducer rocketProducer;
	
	@RequestMapping("/sendToSimpleUser")
	public String sendToSimpleUser(Integer fromUserId,Integer toUserId) {
		
		ChatModel model = ChatModelUtils.createNewChatModel(fromUserId, toUserId, "手动发送消息");
		rocketProducer.sendDirectMessage(model);
		
		return "成功";
	}
	
	
}

这里我们是把消息直接发送给了rocketmq里面,发送者代码如下:

package com.websocket.producer;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;

@Component
public class RocketProducer {

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	public void sendDirectMessage(ChatModel message) {
		String msg = JSON.toJSONString(message);
        rocketMQTemplate.syncSend("test8", msg);
	}
	
}

四、编写消费者,获取mq的消息,并且发送消息给对应的session

package com.websocket.producer;

import javax.websocket.Session;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;
import com.websocket.product.SocketServerProduct;
import com.websocket.utils.OnlineSessionCache;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@RocketMQMessageListener(topic = "test8", consumerGroup = "${chat.group.groupname}")
public class RocketConsumer implements RocketMQListener<String>{

	@Autowired
	private OnlineSessionCache onlineSessionCache;

	@Autowired
	private SocketServerProduct socketServerProduct;
	
	@Value("${chat.group.groupname}")
	private String groupName;
	
	@Override
	public void onMessage(String message) {
		log.info("监听到的topic是:{}  groupname是:{}","test8",groupName);
		ChatModel model = JSON.parseObject(message, ChatModel.class);
		Integer userId = model.getToUserId();
		Session session = onlineSessionCache.getUserSession(userId);
		if (null != session) {
			log.info("找到了对应的session,准备回复消息");
			socketServerProduct.sendMessage(session, model.getMessage());
		}else {
			log.info("没有找到对应的session,准备丢弃");
		}
	}
}

以上就是一个完整的关于websocket服务端集群关于session共享的解决方案。

WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)_websocket心跳机制-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1282004.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库-PostgreSQL学习笔记

目录 PostgreSQL介绍与安装docker安装腾讯云免费领用1个月 数据库操作连接数据库实例创建数据库查看数据库列表使用数据库删除数据库修改数据库属性 表操作字段类型整数浮点数日期与时间类型字符串类型 DDLCREATEDROPALTER DMLINSERTUPDATEDELETE 查询查询所有数据查询部分列指…

YB2412B 600KHz 30V 3A 同步降压稳压器

YB2412B 600KHz 30V 3A 同步降压稳压器 简介&#xff1a; YB2412是一种具有内部功率 MOSFET 的高频、同步、整流、降压开关模式转换器。它提供了一个非常紧凑的解决方案&#xff0c;可以在一个广泛的输入供应范围内实现4a的峰值输出电流&#xff0c; 具有良好的负荷和线路调节能…

基于Java SSM框架实现美好生活九宫格日志网站系统项目【项目源码+论文说明】

基于java的SSM框架实现美好生活九宫格日志网站系统演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人…

爱德华|书客|飞利浦护眼台灯好不好用?多方位测评对比爆料!

说到护眼台灯相信大家都不陌生&#xff0c;很多办公族、学生党都会备上一台用于工作、学习。因为长时间的工作或者学习&#xff0c;会明显的感觉到眼睛疲劳和不适。而护眼台灯可以很好的解决这个难题&#xff0c;因为护眼台灯是经过科学的设计和研发的&#xff0c;护眼台灯可以…

ESP32-Web-Server编程-简单的照片浏览器

ESP32-Web-Server编程-简单的照片浏览器 概述 从本节开始我们开始制作一些有趣的多媒体 Web 的示例。 当你希望在网页上展示一些广告、照片&#xff0c;或者你的开发板带摄像头&#xff0c;能够采集一些图片&#xff0c;这时你希望可以通过手头的浏览器查看图片&#xff0c;…

训练lora小模型

训练lora小模型 一&#xff0c;安装部署本地训练环境1&#xff0c;下载源码2&#xff0c;下载模型 二&#xff0c;准备数据1&#xff0c;准备图片2&#xff0c;标注图片 三&#xff0c;修改配置1&#xff0c;修改文件名2&#xff0c;修改配置文件 &#xff0c;install.ps1 四&a…

聚观早报 |智界S7上路;荣耀与中国移动再牵手

【聚观365】12月4日消息 智界S7上路 荣耀与中国移动再牵手 新能源车11月销量成绩 比亚迪11月销量数据 赛力斯汽车11月销量数据 智界S7上路 华为举行智界S7及华为全场景发布会&#xff0c;带来了鸿蒙智行首款轿车智界S7&#xff0c;而其一经发布便在业内引起了关注。而其因…

(一)舒尔特表练习记

舒尔特表练习记 1 练习的开始 我们知道&#xff0c;一段时间中注意力的保持&#xff0c;对于学习效果的影响很大。我想应该不少朋友们有过和我相似的感觉&#xff0c;学着学着&#xff0c;就莫名开始容易走神&#xff1b;一行字看过去&#xff0c;发现自己脑子里什么也没有留…

Python读写XML文件:深入解析与技术实现

目录 一、引言 二、XML文件基础 1、XML文件结构 2、XML文件语法规则 三、Python读取XML文件 1、使用内置库xml.etree.ElementTree 2、使用第三方库lxml 四、Python写入XML文件 1、使用内置库xml.etree.ElementTree 五、注意事项 六、总结 一、引言 XML&#xff08;…

【自然语言处理】【大模型】VeRA:可调参数比LoRA小10倍的低秩微调方法

VeRA&#xff1a;可调参数比LoRA小10倍的低秩微调方法 《VeRA&#xff1a;Vector-based Random Matrix Adaptation》 论文地址&#xff1a;https://arxiv.org/pdf/2310.11454.pdf 相关博客 【自然语言处理】【大模型】VeRA&#xff1a;可调参数比LoRA小10倍的低秩微调方法 【自…

猜数字赢金币

充值金币后开始游戏&#xff0c;猜中奖励10金币退出&#xff0c;不中扣除1金币继续。 (笔记模板由python脚本于2023年12月03日 21:52:23创建&#xff0c;本篇笔记适合熟悉程序函数式编程&#xff0c;熟练掌握基本数据类型的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&…

echarts笔记-GeoJSON河北数据下并裁剪为冀北地图并使用echarts加载

首先找个网站把河北的GeoJSON数据下载下来&#xff0c;我用的是这个&#xff0c;理论上任意一个都可以 DataV.GeoAtlas地理小工具系列 将json数据下载后&#xff0c;进行裁剪&#xff0c;仅保留冀北数据。 如下&#xff0c;我裁剪的数据&#xff1a; {"type": &qu…

中缀表达式构建后缀表达式

中缀表达式构建后缀表达式 文章目录 中缀表达式构建后缀表达式一、构造符号优先关系表二、构造后缀表达式 一、构造符号优先关系表 首先&#xff0c;我们需要知道什么是优先函数。优先函数是一种用于表示算符优先关系的函数&#xff0c;它有两种形式&#xff1a;f 和 g。f(a) …

Python练习题(四)

本文主要是【Python】——Python练习题的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304;每日一句&#xff1a;狠狠沉淀&a…

openmmlab环境搭建及模拟kitti数据集跑pointpillars模型

点云训练—openmmlab环境搭建及模拟kitti数据集跑pointpillars模型 1 环境搭建 在我的 linux 服务器上&#xff0c;基于ubuntu20.04 参见&#xff1a;开始你的第一步 — MMDetection3D 1.3.0 文档 1.1 本地环境已安装anaconda. anaconda的安装参见博文&#xff1a;DS6.1-Y…

NAND Flash和NOR Flash的异同

NAND Flash和NOR Flash是两种常见的闪存类型。 NOR Flash是Intel于1988年首先开发出来的存储技术&#xff0c;改变了原先由EPROM和EEPROM一统天下的局面。 NAND Flash是东芝公司于1989年发布的存储结构&#xff0c;强调降低每比特的成本&#xff0c;更高的性能&#xff0c;并…

自动配置原理

自动配置原理 变更自动配置 视频地址&#xff1a; https://www.bilibili.com/video/BV15b4y1a7yG/?p160&spm_id_frompageDriver&vd_sourcef6debc5a79e3f424f9dde2f13891b158

上海亚商投顾:沪指探底回升 AI应用方向集体爆发

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数早间震荡调整&#xff0c;深成指盘中跌超1%&#xff0c;午后探底回升全线翻红&#xff0c;北证50指数…

MySQL 预写日志

什么是预写日志机制&#xff1f; 一般情况下&#xff0c;大部分数据库都是将表和索引存储在磁盘文件中。当新增数据时&#xff0c;数据库系统会先写入内存&#xff0c;然后将其写入磁盘上的数据文件。 那为什么不直接写入磁盘嘞&#xff1f;主要是每次新增都直接写入磁盘性能很…

智慧能源:数字孪生压缩空气储能管控平台

压缩空气储能在解决可再生能源不稳定性和提供可靠能源供应方面具有重要的优势。压缩空气储能&#xff0c;是指在电网负荷低谷期将电能用于压缩空气&#xff0c;在电网负荷高峰期释放压缩空气推动汽轮机发电的储能方式。通过提高能量转换效率、增加储能密度、快速启动和调节能力…