WebSocket:基于 Spring Cloud 配置注解实现 WebSocket 集群方案

news2024/11/19 9:32:13

上一篇:WebSocket 的具体介绍与内部执行原理

文章目录

  • 介绍
  • 用法
    • 抽象思路
    • 转发思路
    • 连接流程
      • 获取服务实例信息
      • 连接区分和管理
    • 消息发送
      • 连接选择
        • 给指定用户发送消息
        • 给指定路径发送消息
  • 结束
  • 源码地址
  • 声明

介绍

  • WebSocket大家应该是再熟悉不过了,如果是单体应用确实不会有什么问题,但是当我们的项目使用微服务架构时,就可能会存在问题

  • 比如服务A有两个实例A1A2,前端的WebSocket客户端C通过网关的负载均衡连到了A1,这个时候当A2触发消息发送的逻辑,需要将某个消息发送给所有的客户端时,C就接受不到消息

  • 这个时候我们很快就能想到一种最简单的解决方案,就是把A2的消息转发给A1A1再把消息发送给C,这样C就能收到A2发送的消息了

基于这个思路,我实现了一个库,一个配置注解搞定一切

用法

接下来让我们看看这个库的用法

首先我们需要在启动类上添加一个注解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
public class AServiceApplication {
 public static void main(String[] args) {
 SpringApplication.run(AServiceApplication.class, args);
 }
}

接着我们在需要发送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨实例发消息啦

@RestController
@RequestMapping("/ws")
public class WsController {
 @Autowired
 private WebSocketLoadBalanceConcept concept;
 @RequestMapping("/send")
 public void send(@RequestParam String msg) {
 concept.send(msg);
 }
}
  • 是不是很简单,有没有觉得比自己集成单体应用的WebSocket还要简单!当你的同事还在头疼要实现手动转发时你已经通过一个配置注解实现了功能并开始泡茶喝。你的同事肯定对你刮目相看啊(又能开始摸鱼了)不知道大家看了之后是不是对具体实现已经有了一些思路呢

  • 接下来我就来讲讲这个库的实现流程

抽象思路

  • 其实我之前有专门针对WebSocket实现过类似功能的模块,只是当时的一些场景都是基于项目定死的,所以相对来说实现比较简单,但是过于定制化不好扩展

  • 有一天在和我的一个前同事聊天的过程中得知,他们在考虑让设备和服务直连,并且服务要部署成多实例

  • 设备和服务直连无非就是通过TCP这种长连接来实现,可以使用缓存来保存连接和服务地址的映射关系来实现点对点转发的功能需求

  • 听到这里,是不是感觉似曾相识?当时就有一道光穿过我的脑瓜子,真相只有一个!这不就和WebSocket在集群模式下的问题一样么

  • 于是我从原来针对WebSocket的思考,变成了对各种长连接的思考,最终我将这个问题抽象成了:长连接的集群方案
    而不管是WebSocket还是TCP都是长连接的一种具体实现,所以我们可以抽象一个顶级接口Connection,然后实现WebSocketConnection或者是TCPConnection。其实从抽象的角度来说不仅仅是长连接,短连接也在我们的抽象范围之内,只不过类似HTTP等协议并不存在上述的问题,但是并不妨碍你实现一个HTTPConnection用于转发消息,所以大家不要被先入为主的思维束缚住了

转发思路

  • 之前讲到,这个库的主要思路就是将消息转发给其他的服务实例来达到一个单播或广播的效果,所以消息转发的设计就非常重要了。首先消息转发需要凭借一些支持数据交互的技术手段,比如HTTPMQTCPWebSocket。说到这里…大家是不是…你TM原来自己就能搞定啊(掀桌)长连接不就是用来交互数据的吗,所以完全可以自给自足啊,于是就有一个精妙的想法在我脑子里形成:

    • 如果每个服务实例都把自己作为一个客户端,连接到其他服务上呢?
    • WebSocket的场景下,我们将当前服务实例作为一个WebSocket客户端去连接其他服务实例的WebSocket服务端
    • TCP的场景下,我们将当前服务实例作为一个TCP的客户端去连接其他服务实例的TCP服务端
    • 这样其他服务实例就可以把消息发到这些伪装的客户端上,当服务实例上伪装的客户端接收到消息之后就可以再转发给自己管理的真正的客户端
  • 所以我们首先需要先让服务实例之间相互连接上

连接流程

让我们来看看互相建立连接是怎么设计的

我定义了一个ConnectionSubscriber的接口,大家可以理解为我们的服务实例要去订阅监听其他服务发送的消息

同时提供了默认实现,就是基于自身的协议进行连接和消息的发送

当然也能够灵活的支持其他方式,只需要自定义一个ConnectionSubscriber就可以了,如果使用MQ的方式就可以实现一个MQConnectionSubscriber或者使用HTTP就可以实现一个HTTPConnectionSubscriber

只不过使用自身的协议就可以不用依赖其他的库或是中间件了,当然如果你对消息的丢失率有比较严格的要求也可以使用MQ作为消息转发的中介,而以我之前参与过的项目来说,一般普通的WebSocket场景基本上还是能忍受一定的丢失率的

获取服务实例信息

那么我们怎么知道要去连接哪些实例呢

我定义了一个ConnectionServerManager的接口用来管理服务信息

当然我们完全可以自己实现一个,比如通过配置文件来配置服务实例信息

不过我们有更方便的方式,那就是依赖Spring Cloud的服务发现组件了,不管是Eureka还是Nacos还是其他的注册中心相当于都支持了,这就是抽象的魅力啊

我们可以通过DiscoveryClient#getInstances(Registration.getServiceId())来获得所有的实例,排除掉自身就是需要连接的服务实例了

当我们的服务实例连接上其他的服务实例之后,发送一个自身实例信息的消息过去,其他的服务实例接收到对应的消息之后反过来连接我们的服务实例,保证一定的连接及时性,这样双方的连接就搭建起来了,可以互相转发消息了

同时我还添加了心跳检测和自动重连,当一段时间没有收到心跳回复后就会断开连接,并且每隔一段时间就会重新查询一遍实例信息,如果发现存在某个服务实例没有对应的连接,就会重新进行连接,这样就能在某些偶尔网络不好的情况下有一定的容错

到目前为止,我们基本的框架已经建立了,当我们启动服务之后,服务间就会自动建立连接

连接区分和管理

基于上述的思路,我们肯定需要区分真实的客户端和用来转发的客户端

于是我就把这些连接做了一个分类

类别说明
Client普通的连接
Subscriber服务实例伪装的连接,用于接受需要转发的消息
Observable服务实例伪装的连接,用于发送需要转发的消息

然后对于这些连接进行一个统一的管理

通过连接工厂ConnectionFactory我们可以将任意的连接适配成Connection对象,并实现各种连接间的消息转发

每个连接都会配置一个MessageEncoderMessageDecoder用于消息的编码和解码,而且不同类别的连接对应的编码器和解码器肯定是不一样的,比如转发的消息和发给真实客户端的消息很大程度上都是有区别的,所以额外定义了一个MessageCodecAdapter用来适配不同类型的编解码器,也能让大家在自定义时方便管理

消息发送

现在当我们发送某条消息之后,消息就会被转发到其他的服务实例,所有的客户端就都能收到了

不对啊,在有些情况下我们不想让所有客户端都收到啊,能不能我们想让谁收到就让谁收到啊

真麻烦,来,我把所有的连接都给你,你自己选吧

连接选择

我们需要在消息发送时确定发送给哪些连接

于是我就定义了一个连接选择器ConnectionSelector

每次要发送消息的时候,我都会匹配一个连接选择器,然后通过选择器来获得需要发送消息的连接,而我们可以通过自定义连接选择器来实现我们消息的精准发送

这里其实就是我为什么会取名WebSocketLoadBalanceConcept的原因,为什么要叫LoadBalance

Ribbon通过IRule来选择一个Server

我通过ConnectionSelector来选择一个Connection集合

是不是有异曲同工之妙

继续来说自定义选择器

准备工作:

  • 我们的Connection有一个metadata字段用于存放自定义属性

  • 我们的Message有一个headers字段用于存放消息头

给指定用户发送消息

  • 很多场景下我们需要给指定的用户发送消息

  • 首先当客户端连接上来时,可以通过参数或者主动发送一个消息将userId发给服务端,然后服务端将得到的userId存在Connectionmetadata

  • 接着我们给需要发送的Message添加一个header,将对应的userId作为消息头

  • 这样我们就可以自定义一个连接选择器通过判断Message是否包含userId消息头来作为匹配的条件,当Messageheaders中存在userId时,对Connection中的metadata进行userId的匹配来筛选需要发送消息的连接

  • 由于userId是唯一的,当我们自身服务连上来的客户端中已经匹配到就不需要再转发了,如果没有匹配到就通过其他服务实例的客户端进行消息转发

  • 库中已经实现了对应的UserSelectorUserMessage,可以使用配置开启并通过在连接路径上添加userId参数来标记用户

  • 当然我们也可以借用缓存来精确的判断需不需要转发或者是需要转发给哪几个服务,把userId和服务的instanceId等一些具有唯一性的数据缓存在Redis中,当给用户发送消息时,从Redis中获得用户对应的服务实例的instanceId或是具有唯一性的数据,如果经过匹配就是当前服务就可以直接下发,如果是其他服务就转发给那个对应的服务就行了

给指定路径发送消息

  • 还有一种场景也比较常见就是类似主题订阅,如订阅设备状态更新的数据,就要给每一个对应路径的连接发送消息了

  • 我们可以使用不同的路径来表示不同主题,然后自定义一个连接选择器来匹配连接的路径和消息头中指定的路径

  • 当然库中也已经实现了对应的PathSelectorPathMessage,可以通过配置开启

结束

  • 最后请允许我发表一点对于抽象的拙见

  • 抽象其实就和 “道生一,一生二,二生三,三生万物” 一样,根据你的顶级接口(也就是核心功能)不断的向外展开,你的顶级接口就是道(狭义的来讲)

  • 以这个库为例,ConnectionLoadBalanceConcept就是这个库的道,他的核心功能就是发送消息,至于怎么发,发给谁,不确定,像是一个混沌的状态

  • 那么什么是一,二,三呢,我们发送消息需要载体于是就有了ConnectionMessage,我们需要对Connection进行管理于是就有了ConnectionRepository, 我们需要转发消息于是就有了ConnectionSubscriber等等

  • 而万物就像是具体的实现,是能落实的,基于Spring Cloud服务发现的连接管理器DiscoveryConnectionServerManager,基于路径的连接选择器PathSelector,基于ReactiveWebSocket连接ReactiveWebSocketConnection

源码地址

  • https://github.com/Linyuzai/concept/wiki/Concept-WebSocket-LoadBalance

声明

  • 原文地址:https://juejin.cn/post/7105931939407740965#comment

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/681355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Debezium系列之:发布Debezium 2.3.0.Final

Debezium系列之:发布Debezium 2.3.0.Final 一、重大变化1.PostgreSQL / MySQL 安全连接更改2.JDBC 存储编码更改 二、新功能和改进1.Debezium Server支持K8s2.新的通知子系统3.新的可扩展信号子系统4.JMX 信号和通知集成5.新的 JDBC 存储子系统6.PostgreSQL 流式传输…

优化伊通河漂流旅行方案的模型——JLU数学学院2020级数学模型期末大作业

文章目录 题目描述背景介绍模型假设问题一的模型决策树模型游客安全最大化与旅行次数最大化模型统筹考虑游客安全与旅行次数的模型模型对比 第二问的模型每天下水的脚踏游船与电动游船的比率的敏感性分析全是电动游船的情形全是脚踏游船的情形每天下水的脚踏游船与电动游船成比…

【深度学习笔记】神经网络概述

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记,视频由网易云课堂与 deeplearning.ai 联合出品,主讲人是吴恩达 Andrew Ng 教授。刚兴趣的网友可以观看网易云课堂的视频进行深入学习,视频的链接如下:https://mo…

前端Vue自定义等宽标签栏标题栏选项卡

前端Vue自定义等宽标签栏标题栏选项卡&#xff0c; 下载完整代码请访问uni-app插件市场地址&#xff1a;https://ext.dcloud.net.cn/plugin?id13170 效果图如下&#xff1a; # cc-chooseTab #### 使用方法 使用方法 <!-- tabArr:标签数组 current&#xff1a;当前选择序…

Linux 实用操作技巧一

文章目录 Linux 实用操作技巧前言查找当前目录下所有 .gz 结尾的文件查找当前目录超过30天没有修改过且文件大于10M的.gz文件。将software 目录下大于 100k 的文件移动至 /tmp下 时间戳快速转换动态查看日志&#xff0c;并且停止获取内存、CPU、磁盘、IO等信息获取 公网 ip总结…

关于 SpringBoot 日志文件的知识

目录 日志有什么用&#xff1f; 日志怎么用&#xff1f; 自定义日志打印 在程序中得到日志对象 使用日志对象打印日志 日志格式 日志级别的分类与使用 日志级别设置 日志持久化 日志有什么用&#xff1f; 日志对于我们来说&#xff0c;最主要的用途就是排除和定位问题…

Java设计模式之行为型-模板方法模式

目录 一、概念 二、角色设计 三、代码实现 四、总结 一、概念 定义一个操作中的算法骨架&#xff0c;而将算法的一些步骤延迟到子类当中&#xff0c;使得子类可以不改变该算法结构的情况下重定义该算法的特定步骤&#xff0c;即在一个抽象类中公开定义了执行某一方法的模板…

一种新颖的智能优化算法—飞蛾扑火优化(MFO)算法

飞蛾扑火优化(Moth-Flame Optimization,MFO)算法是Mirjalili于2015年提出的一种新型智能优化算法&#xff0c;其灵感来源于一种特殊的导航机制—横向定位导航,实现了勘探与开发的较好平衡以获得全局优化性能。MFO算法具有并行优化能力强&#xff0c;全局性优且不易落入局部极值…

VS安装中报“应用程序无法启动,因为应用程序的并行配置不正确”的解决办法

1.问题描述 安装应用程序的时候&#xff0c;提示“应用程序无法启动&#xff0c;因为应用程序的并行配置不正确”。 2.解决过程 方法一 开启服务 开始→ 运行&#xff08;输入services.msc或者服务&#xff09;→确定后打开服务&#xff1b;找到Windows Modules Installer服务…

Advanced Installer使用指南

PC打包软件有很多 我只推荐一个advanced Installer完全傻瓜式操作&#xff0c;直接点就行了。innoSetUp需要学习它的脚本语言&#xff0c;学习成本太高了&#xff0c;而且网上的学习资料也很少。其它东西 增加依赖 我的程序需要dotNet5.0.13的运行时环境。 但是在AI上面没有…

chatgpt赋能python:Python撤销和回退的完全指南:从基础到高级

Python撤销和回退的完全指南&#xff1a;从基础到高级 Python是一种强大的编程语言&#xff0c;但即使在最好的情况下&#xff0c;错误也会出现。在此时&#xff0c;撤销错误和回退操作会变得非常重要。本指南将介绍Python中的撤销和回退操作&#xff0c;从基础操作到高级操作…

SparkSQL之AstBuilder

Spark SQL是基于ANTLR实现的&#xff0c;前文中有关于ANTLR的介绍文章《ANTLR实战》和《设计模式之访问者模式》&#xff0c;这篇文章主要介绍的内容是AstBuilder类。 Catalyst中提供了直接面向用户的ParseInterface接口&#xff0c;该接口中包含了对SQL语句、Expression表达式…

DDD(领域驱动设计) 核心概念浅析

文章目录 DDD(领域驱动设计) 核心概念浅析前言贫血模型什么是贫血模型贫血模型的优点贫血模型的缺点 充血模型充血模型的优点充血模型的缺点 DP 概念抽象接口简单概念简单概念流程&#xff1a;实现 统一语言和模型价值DP 和 Entity 的区别 Aggregate&#xff08;聚合&#xff0…

HackTheBox - 学院【CPTS】复习1 - PASSWORD ATTACKS

前言 有一个月时间没发文章了&#xff0c;我在6月11号进入htb学院学习CPTS&#xff0c;在扎实的THM基础的加持下&#xff0c;我学的非常顺利&#xff0c;其实大部分内容都相当于复习&#xff0c;而学到的内容只是一些可能不太常见、又或者非常细节的小技巧&#xff0c;这也是非…

vscode 出现 No such file or directory 的解决办法(python tkinter)

问题 Traceback (most recent call last): File “e:\Github\Python-GUI\PyQt-Fluent-Widgets\examples\navigation\demo.py”, line 202, in w Window() File “e:\Github\Python-GUI\PyQt-Fluent-Widgets\examples\navigation\demo.py”, line 95, in init self.initWindo…

ThreadPoolExecutor源码剖析

ThreadPoolExecutor源码涉及到的内容比较多&#xff0c;需要一点点的去啃和查看… ThreadPoolExecutor的核心属性 ThreadPoolExecutor的核心属性主要就是CTL。基于CTL获取到线程池的状态以及工作线程个数。 ctl是一个int类型的整数&#xff0c;內部基于AtomicInteger&#xff0…

再谈StringBuilder为什么线程不安全以及带来的问题

1 缘起 比较有意思的是&#xff0c;学习锁消除的过程中&#xff0c;有人讲到StringBuffer在方法内构建&#xff0c;不会被其他方法引用时&#xff0c;StringBuffer的锁会被消除&#xff0c; 于是&#xff0c;顺便看了一下同源的StringBuidler为什么线程不安全&#xff0c;以及…

【无标题】TP-LINK XDR5470 WiFi6路由器 简单开箱评测

TL-XDR5470易展版AX5400双频WiFi6路由器 简单开箱测评&#xff0c;上次买的XDR6078覆盖不够&#xff0c;还是得每层再买一个&#xff0c;所以又买了个TL-XDR5470&#xff0c;支持易展mesh。 上次买的XDR6078没有外置FEM功放芯片&#xff0c;所以信号差了一点&#xff0c;得加2…

PE系统盘制作

目录 前言 制作PE盘的步骤如下 前言 PE盘是一个轻量级的系统&#xff0c;类似于Windows系统。当您的计算机无法进入Windows系统时&#xff0c;您可以通过启动PE盘来访问一个独立的操作系统&#xff0c;从而执行各种任务&#xff0c;例如拷贝重要文件或进行系统安装。PE盘通常…

win10查看端口是否被占用,被哪一个程序占用(图文)

window系统中有时候我们会出现需要的端口号被占用&#xff0c;但不知道具体是哪个程序占用的。这时我们需要找到使用此端口的程序。 方法如下&#xff1a; 1&#xff09;以管理员身份打开命令提示符窗口&#xff08;开始-运行&#xff09;。 2&#xff09;使用命令查看端口使…