Websocket集群解决方案以及实战(附图文源码)

news2025/1/24 14:51:30

最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket推送消息。

所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发起请求,服务接收后返回消息。websocket建立起连接之后,客户端和服务端都能主动向对方发送消息。

websocket在单机模式下进行消息的发送和接收:
在这里插入图片描述

用户A和用户B和web服务器建立连接之后,用户A发送一条消息到服务器,服务器再推送给用户B,在单机系统上所有的用户都和同一个服务器建立连接,所有的session都存储在同一个服务器中。

单个服务器是无法支撑几万人同时连接同一个服务器,需要使用到分布式或者集群将请求连接负载均衡到到不同的服务下。消息的发送方和接收方在同一个服务器,这就和单体服务器类似,能成功接收到消息:

在这里插入图片描述
但负载均衡使用轮询的算法,无法保证消息发送方和接收方处于同一个服务器,当发送方和接收方不是在同一个服务器时,接收方是无法接受到消息的:

在这里插入图片描述

websocket集群问题解决思路
客户端和服务端每次建立连接时候,会创建有状态的会话session,服务器的保存维持连接的session。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。

要解决集群的问题,应该考虑session共享的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。

方案一:session 共享(不可行)
和websocket类似的http是如何解决集群问题的?解决方案之一就是共享session,客户端登录服务端之后,将session信息存储在Redis数据库中,连接其他服务器时,从Redis获取session,实际就是将session信息存储在Redis中,实现redis的共享。

session可以被共享的前提是可以被序列化,而websocket的session是无法被序列化的,http的session记录的是请求的数据,而websocket的session对应的是连接,连接到不同的服务器,session也不同,无法被序列化。

方案二:ip hash(不可行)
http不使用session共享,就可以使用Nginx负载均衡的ip hash算法,客户端每次都是请求同一个服务器,客户端的session都保存在服务器上,而后续请求都是请求该服务器,都能获取到session,就不存在分布式session问题了。

websocket相对http来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的session,即两个session不处于同一个服务端,也就无法推送消息。如下图所示:

在这里插入图片描述

解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。

方案三:广播模式
将消息的发送方和接收方都处于同一个服务器下才能发送消息,那么可以转换一下思路,可以将消息以消息广播的方式通知给所有的服务器,可以使用消息中间件发布订阅模式,消息脱离了服务器的限制,通过发送到中间件,再发送给订阅的服务器,类似广播一样,只要订阅了消息,都能接收到消息的通知:
在这里插入图片描述
发布者发布消息到消息中间件,消息中间件再将发送给所有订阅者:
在这里插入图片描述

广播模式的实现
搭建单机 websocket
参考以前写的websocket单机搭建 文章,先搭建单机websocket实现消息的推送。

  1. 添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 创建 ServerEndpointExporter 的 bean 实例
    ServerEndpointExporter 的 bean 实例自动注册 @ServerEndpoint 注解声明的 websocket endpoint,使用springboot自带tomcat启动需要该配置,使用独立 tomcat 则不需要该配置。
@Configuration
public class WebSocketConfig {
    //tomcat启动无需该配置
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
  1. 创建服务端点 ServerEndpoint 和 客户端端
    服务端点
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {

 private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

 private Session session;

 @OnOpen
 public void onOpen(Session session) throws SocketException {
  this.session = session;
  webSocketSet.put(this.session.getId(),this);

  log.info("【websocket】有新的连接,总数:{}",webSocketSet.size());
 }

 @OnClose
 public void onClose(){
  String id = this.session.getId();
  if (id != null){
   webSocketSet.remove(id);
   log.info("【websocket】连接断开:总数:{}",webSocketSet.size());
  }
 }

 @OnMessage
 public void onMessage(String message){
  if (!message.equals("ping")){
   log.info("【wesocket】收到客户端发送的消息,message={}",message);
   sendMessage(message);
  }
 }

 /**
  * 发送消息
  * @param message
  * @return
  */
 public void sendMessage(String message){
  for (WebSocket webSocket : webSocketSet.values()) {
   webSocket.session.getAsyncRemote().sendText(message);
  }
  log.info("【wesocket】发送消息,message={}", message);

 }

}

客户端点

<div>
    <input type="text" name="message" id="message">
    <button id="sendBtn">发送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {
        console.log("Connection open ...");
    };

    ws.onmessage = function(evt) {
        console.log( "Received Message: " + evt.data);
        var p = $("<p>"+evt.data+"</p>")
        $("#content").prepend(p);
        $("#message").val("");
    };

    ws.onclose = function(evt) {
        console.log("Connection closed.");
    };

    $("#sendBtn").click(function(){
        var aa = $("#message").val();
        ws.send(aa);
    })

</script>

服务端和客户端中的OnOpen、onclose、onmessage都是一一对应的。

服务启动后,客户端ws.onopen调用服务端的@OnOpen注解的方法,储存客户端的session信息,握手建立连接。
客户端调用ws.send发送消息,对应服务端的@OnMessage注解下面的方法接收消息。
服务端调用session.getAsyncRemote().sendText发送消息,对应的客户端ws.onmessage接收消息。
添加 controller

@GetMapping({"","index.html"})
public ModelAndView index() {
 ModelAndView view = new ModelAndView("index");
 return view;
}

效果展示
打开两个客户端,其中的一个客户端发送消息,另一个客户端也能接收到消息。

在这里插入图片描述

添加 RabbitMQ 中间件
这里使用比较常用的RabbitMQ作为消息中间件,而RabbitMQ支持发布订阅模式:

添加消息订阅
交换机使用扇形交换机,消息分发给每一条绑定该交换机的队列。以服务器所在的IP + 端口作为唯一标识作为队列的命名,启动一个服务,使用队列绑定交换机,实现消息的订阅:

@Configuration
public class RabbitConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
    }

    @Bean
    public Queue psQueue() throws SocketException {
        // ip + 端口 为队列名 
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }

    @Bean
    public Binding routingFirstBinding() throws SocketException {
        return BindingBuilder.bind(psQueue()).to(fanoutExchange());
    }
}

修改服务端点 ServerEndpoint
在WebSocket添加消息的接收方法,@RabbitListener 接收消息,队列名称使用常量命名,动态队列名称使用 #{name},其中的name是Queue的bean 名称:

@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {
  System.out.println(message);
  sendMessage(message);
}

然后再调用sendMessage方法发送给所在连接的客户端。

修改消息发送
在WebSocket类的onMessage方法将消息发送改成RabbitMQ方式发送:

@OnMessage
public void onMessage(String message){
  if (!message.equals("ping")){
    log.info("【wesocket】收到客户端发送的消息,message={}",message);
    //sendMessage(message);
    if (rabbitTemplate == null) {
      rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
    }
    rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
  }
}

消息通知流程如下所示:

在这里插入图片描述
启动两个实例,模拟集群环境
打开idea的Edit Configurations:
在这里插入图片描述
点击左上角的COPY,然后添加端口server.port=8081:
在这里插入图片描述
启动两个服务,端口分别是8080和8081。在启动8081端口的服务,将前端连接端口改成8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");

效果展示

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1033343.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何借用敏捷实现IT对数字化转型支持 | 2023佛山敏捷之旅成功举办

9月17日&#xff0c;2023年佛山之旅暨DevOps Meetup在佛山圆满落幕。本次大会以助力大湾区金融和互联网企业敏捷DevOps实施和效能提升为主题&#xff0c;吸引了150余位来自各地的金融和互联网企业相关从业人员齐聚一堂&#xff0c;共同探讨行业最佳实践、最新发展趋势以及最新应…

flask_apscheduler实现定时推送飞书消息

需求场景&#xff1a; 实现一个flask服务&#xff0c;通过接口控制一个定时任务任务&#xff08;对酒店订房情况进行检查&#xff09;的开启和停止。要求定时任务完成后&#xff0c;可以通过飞书机器人推送任务完成的消息。 展现效果&#xff1a; 启动定时任务 关闭定时任务…

聊聊wireshark的进阶使用功能 | 京东云技术团队

1. 前言 emmm&#xff0c;说起网络知识学习肯定离不来wireshark工具&#xff0c;这个工具能够帮助我们快速地定位网络问题以及帮助正在学习网络协议这块的知识的同学验证理论与实际的一大利器&#xff0c;平时更多的只是停留在初步的使用阶段。也是利用部门内部的网络兴趣小组…

Webpack使用plugin插件自动在打包目录生成html文件

我们使用html-webpack-plugin插件可以自动在打包代码目录生成html文件 使用步骤&#xff1a; 一、安装依赖 在控制台中输入如下代码&#xff1a; npm i -D html-webpack-plugin 二、在webpack.config.js中配置插件 const HTMLPlugin require("html-webpack-plugin&q…

Activiz 9.2 for Linux Crack

Activiz 9.2 在 C#、.Net 和 Unity 软件中为您的 3D 内容释放可视化工具包的强大功能。 ActiViz 允许您轻松地将 3D 可视化集成到您的应用程序中。 ActiViz 功能 用 C# 封装的 3D 可视化软件系统 允许在 .NET 环境中快速开发可投入生产的交互式3D 应用程序 支持窗口演示基础 (…

VS2015没有“Win32控制台应用程序”模块

发现问题 成功安装VS2015专业版之后&#xff08;安装期间遇到“安装包缺失或损坏的问题”&#xff0c;参考安装VS2015时提示“安装包丢失或损坏”成功解决&#xff09;&#xff0c;由于它没有在桌面创建快捷方式&#xff0c;于是我在“开始”处找到与VS2015有关的图标&#xf…

设置github的默认分支

设置github的默认分支 更换默认分支默认分支的作用 更换默认分支 之前默认的分支想main, 现在想更换默认的分支 点击main, 可以看到有两个分支: main和gpuVersion, 可以看到这里默认main分支为default 如果想设置gpuVersion作为default,可以点击View all branches, 进入下一个…

【校招VIP】前端计算机网络之HTTP和HTTPS

考点介绍&#xff1a; 为了解决HTTP协议的缺陷&#xff0c;需要使用另一种协议&#xff1a;安全套接字层超文本传输协议HTTPS&#xff0c;为了数据传输的安全&#xff0c;HTTPS在HTTP的基础上加入了SSL/TLS协议&#xff0c;SSL/TLS依靠证书来验证服务器的身份&#xff0c;并为浏…

流媒体及直播相关知识

文章目录 前言一、流媒体1、基本概念2、流式传输3、流媒体技术原理4、流媒体传输模式5、H.264 流媒体传输系统框架 二、直播1、直播中使用的流媒体协议2、直播的模块划分3、视频直播流程①、推流到服务器②、服务器流分发 前言 本文主要讲解流媒体及其直播相关知识&#xff0c…

Linux 线程属性相关函数

pthread_attr_t就是对应线程的属性 /*#include <pthread.h>int pthread_attr_init(pthread_attr_t *attr);初始化线程属性变量int pthread_attr_destroy(pthread_attr_t *attr);释放线程属性资源int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *deta…

无人机“长坡”上,谁是滚出“厚雪球”的长期主义者?

“股神”巴菲特&#xff0c;曾提出过“长坡厚雪”的理论&#xff1a; 人生就像滚雪球&#xff0c;重要的是发现很湿的雪和很长的坡。 运用到企业经营上&#xff0c;“长坡”指的是企业所布局的领域发展潜力足、空间大&#xff1b;而“湿雪”&#xff0c;指的是企业竞争力强、…

Flowable主要子流程介绍

1. 内嵌子流程 &#xff08;1&#xff09;说明 内嵌子流程又叫嵌入式子流程&#xff0c;它是一个可以包含其它活动、分支、事件&#xff0c;等的活动。我们通常意义上说的子流程通常就是指的内嵌子流程&#xff0c;它表现为将一个流程&#xff08;子流程&#xff09;定…

发送实时音频数据到udp服务

由于浏览器不能直接连接udp服务&#xff0c;所以需要搭建一个websocket服务做中转&#xff0c;让websocket服务连接udp服务 1、vue开发获取实时音频数据并按4096分包后添加rtp协议头发送到websocket服务&#xff08;连接websocket自行编写连接到127.0.0.1:8889&#xff09; da…

代码随想录算法训练营 动态规划part06

一、完全背包 卡哥的总结&#xff0c;还挺全代码随想录 (programmercarl.com) 二、零钱兑换 II 518. 零钱兑换 II - 力扣&#xff08;LeetCode&#xff09; 被选物品之间不需要满足特定关系&#xff0c;只需要选择物品&#xff0c;以达到「全局最优」或者「特定状态」即可。 …

uni-app, 实现 scroll-view 自动滚动到底部,并控制触发频率

实现思路 通过 scroll-view 组件的 scroll-top 属性可以设置容器竖向滚动条位置 属性名Valuescroll-y允许纵向滚动scroll-top设置竖向滚动条位置 想要实现 scroll-view 滚动到底部&#xff0c;只需要让 scroll-top scroll-view 内容高度 - scroll-view 容器本身高度&#…

vuejs - - - - - 使用code编辑器codemirror

使用code编辑器codemirror 0. 效果图1. 依赖安装2. 组件封装3. 组件使用 0. 效果图 列表实现参考: 列表实现代码 1. 依赖安装 npm install codemirror codemirror-editor-vue3 jsonlint-mod 2. 组件封装 code-mirror-editor.vue <template><VueCodeMirrorclas…

蓝桥杯 题库 简单 每日十题 day8

01 扫雷 题目描述 在一个n行列的方格图上有一些位置有地雷&#xff0c;另外一些位置为空。 请为每个空位置标一个整数&#xff0c;表示周围八个相邻的方格中有多少个地雷。 输入描述 输入的第一行包含两个整数n&#xff0c;m。 第2行到第n1行每行包含m个整数&#xff0c;相邻整…

Smart UI Web 16.0.1 WebComponents htmlelements Crack

Javascript Web 组件库 Smart UI Web 组件库是您构建令人惊叹的 Web 应用程序所需的唯一套件。它包含 70 多个快速且专业设计的 UI 组件&#xff0c;可在单个包中实现美观且始终现代的 Web 应用程序。 具有高级功能的即用型Javascript 组件。只需几行代码即可使用数据网格、甘特…

Mybatis分页框架-PageHelper

Mybatis分页框架-PageHelper 一、PageHelper基础使用1.引入jar包2.配置conifg3.测试使用 二、PageHelper的多种用法1.使用PageHelper.startPage传入对象2.不使用PageHelper.startPage,而使用PageHelper.offsetPage3.使用Lambda进行分页4.不使用PageHelper直接分页5.想要使用分页…

代码随想录算法训练营 动态规划part17

一、回文子串 647. 回文子串 - 力扣&#xff08;LeetCode&#xff09; class Solution {public int countSubstrings(String s) {boolean[][] dp new boolean[s.length()][s.length()];int ans 0;for (int j 0; j < s.length(); j) {for (int i 0; i < j; i) {if …