一个注解实现WebSocket集群方案,别提有多优雅了

news2025/1/18 20:16:50

WebSocket大家应该是再熟悉不过了,如果是单体应用确实不会有什么问题,但是当我们的项目使用微服务架构时,就可能会存在问题

比如服务A有两个实例A1A2,前端的WebSocket客户端C通过网关的负载均衡连到了A1,这个时候当A2触发消息发送的逻辑,需要将某个消息发送给所有的客户端时,C就接受不到消息

这个时候我们很快就能想到一种最简单的解决方案,就是把A2的消息转发给A1A1再把消息发送给C,这样C就能收到A2发送的消息了

用法

接下来让我们看看这个库的用法

首先我们需要在启动类上添加一个注解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
public class AServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(AServiceApplication.class, args);
    }
}

接着我们在需要发送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨实例发消息啦

@RestController
@RequestMapping("/ws")
public class WsController {

    @Autowired
    private WebSocketLoadBalanceConcept concept;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept.send(msg);
    }
}

是不是很简单,有没有觉得比自己集成单体应用的WebSocket还要简单!

当你的同事还在头疼要实现手动转发时你已经通过一个配置注解实现了功能并开始泡茶喝

你的同事肯定对你刮目相看啊(又能开始摸鱼了)

不知道大家看了之后是不是对具体实现已经有了一些思路呢

接下来我就来讲讲这个库的实现流程

抽象思路

其实我之前有专门针对WebSocket实现过类似功能的模块,只是当时的一些场景都是基于项目定死的,所以相对来说实现比较简单,但是过于定制化不好扩展;关注工众号:码猿技术专栏,回复关键词:1111 获取阿里内部Java性能优化手册!

有一天在和我的一个前同事聊天的过程中得知,他们在考虑让设备和服务直连,并且服务要部署成多实例

设备和服务直连无非就是通过TCP这种长连接来实现,可以使用缓存来保存连接和服务地址的映射关系来实现点对点转发的功能需求

听到这里,是不是感觉似曾相识?当时就有一道光穿过我的脑瓜子,真相只有一个!这不就和WebSocket在集群模式下的问题一样么

于是我从原来针对WebSocket的思考,变成了对各种长连接的思考,最终我将这个问题抽象成了:长连接的集群方案

而不管是WebSocket还是TCP都是长连接的一种具体实现

所以我们可以抽象一个顶级接口Connection,然后实现WebSocketConnection或者是TCPConnection

其实从抽象的角度来说不仅仅是长连接,短连接也在我们的抽象范围之内,只不过类似HTTP等协议并不存在上述的问题,但是并不妨碍你实现一个HTTPConnection用于转发消息,所以大家不要被先入为主的思维束缚住了

转发思路

之前讲到,这个库的主要思路就是将消息转发给其他的服务实例来达到一个单播或广播的效果

所以消息转发的设计就非常重要了

首先消息转发需要凭借一些支持数据交互的技术手段

比如HTTPMQTCPWebSocket

说到这里。。。大家是不是。。。你TM原来自己就能搞定啊(掀桌)

长连接不就是用来交互数据的吗,所以完全可以自给自足啊

于是就有一个精妙的想法在我脑子里形成:

如果每个服务实例都把自己作为一个客户端,连接到其他服务上呢?

WebSocket的场景下,我们将当前服务实例作为一个WebSocket客户端去连接其他服务实例的WebSocket服务端

TCP的场景下,我们将当前服务实例作为一个TCP的客户端去连接其他服务实例的TCP服务端

这样其他服务实例就可以把消息发到这些伪装的客户端上,当服务实例上伪装的客户端接收到消息之后就可以再转发给自己管理的真正的客户端

撒花家人们,自闭(自我闭环)了属于是

所以我们首先需要先让服务实例之间相互连接上

连接流程

让我们来看看互相建立连接是怎么设计的

我定义了一个ConnectionSubscriber的接口,大家可以理解为我们的服务实例要去订阅监听其他服务发送的消息

同时提供了默认实现,就是基于自身的协议进行连接和消息的发送

当然也能够灵活的支持其他方式,只需要自定义一个ConnectionSubscriber就可以了,如果使用MQ的方式就可以实现一个MQConnectionSubscriber或者使用HTTP就可以实现一个HTTPConnectionSubscriber

只不过使用自身的协议就可以不用依赖其他的库或是中间件了,当然如果你对消息的丢失率有比较严格的要求也可以使用MQ作为消息转发的中介,而以我之前参与过的项目来说,一般普通的WebSocket场景基本上还是能忍受一定的丢失率的

获取服务实例信息

那么我们怎么知道要去连接哪些实例呢

我定义了一个ConnectionServerManager的接口用来管理服务信息

当然我们完全可以自己实现一个,比如通过配置文件来配置服务实例信息

不过我们有更方便的方式,那就是依赖Spring Cloud的服务发现组件了,不管是Eureka还是Nacos还是其他的注册中心相当于都支持了,这就是抽象的魅力啊

我们可以通过DiscoveryClient##getInstances(Registration.getServiceId())来获得所有的实例,排除掉自身就是需要连接的服务实例了

当我们的服务实例连接上其他的服务实例之后,发送一个自身实例信息的消息过去,其他的服务实例接收到对应的消息之后反过来连接我们的服务实例,保证一定的连接及时性,这样双方的连接就搭建起来了,可以互相转发消息了

同时我还添加了心跳检测和自动重连,当一段时间没有收到心跳回复后就会断开连接,并且每隔一段时间就会重新查询一遍实例信息,如果发现存在某个服务实例没有对应的连接,就会重新进行连接,这样就能在某些偶尔网络不好的情况下有一定的容错

到目前为止,我们基本的框架已经建立了,当我们启动服务之后,服务间就会自动建立连接

连接区分和管理

基于上述的思路,我们肯定需要区分真实的客户端和用来转发的客户端

于是我就把这些连接做了一个分类

类别说明
Client普通的连接
Subscriber服务实例伪装的连接,用于接受需要转发的消息
Observable服务实例伪装的连接,用于发送需要转发的消息

然后对于这些连接进行一个统一的管理

通过连接工厂ConnectionFactory我们可以将任意的连接适配成Connection对象,并实现各种连接间的消息转发

每个连接都会配置一个MessageEncoderMessageDecoder用于消息的编码和解码,而且不同类别的连接对应的编码器和解码器肯定是不一样的,比如转发的消息和发给真实客户端的消息很大程度上都是有区别的,所以额外定义了一个MessageCodecAdapter用来适配不同类型的编解码器,也能让大家在自定义时方便管理

消息发送

现在当我们发送某条消息之后,消息就会被转发到其他的服务实例,所有的客户端就都能收到了

不对啊,在有些情况下我们不想让所有客户端都收到啊,能不能我们想让谁收到就让谁收到啊

真麻烦,来,我把所有的连接都给你,你自己选吧

连接选择

我们需要在消息发送时确定发送给哪些连接

于是我就定义了一个连接选择器ConnectionSelector

每次要发送消息的时候,我都会匹配一个连接选择器,然后通过选择器来获得需要发送消息的连接,而我们可以通过自定义连接选择器来实现我们消息的精准发送

这里其实就是我为什么会取名WebSocketLoadBalanceConcept的原因,为什么要叫LoadBalance

Ribbon`通过`IRule`来选择一个`Server

我通过ConnectionSelector来选择一个Connection集合

是不是有异曲同工之妙

继续来说自定义选择器

准备工作:

  • 我们的Connection有一个metadata字段用于存放自定义属性

  • 我们的Message有一个headers字段用于存放消息头

给指定用户发送消息

很多场景下我们需要给指定的用户发送消息

首先当客户端连接上来时,可以通过参数或者主动发送一个消息将userId发给服务端,然后服务端将得到的userId存在Connectionmetadata

接着我们给需要发送的Message添加一个header,将对应的userId作为消息头

这样我们就可以自定义一个连接选择器通过判断Message是否包含userId消息头来作为匹配的条件,当Messageheaders中存在userId时,对Connection中的metadata进行userId的匹配来筛选需要发送消息的连接

由于userId是唯一的,当我们自身服务连上来的客户端中已经匹配到就不需要再转发了,如果没有匹配到就通过其他服务实例的客户端进行消息转发

库中已经实现了对应的UserSelectorUserMessage,可以使用配置开启并通过在连接路径上添加userId参数来标记用户

当然我们也可以借用缓存来精确的判断需不需要转发或者是需要转发给哪几个服务,把userId和服务的instanceId等一些具有唯一性的数据缓存在Redis中,当给用户发送消息时,从Redis中获得用户对应的服务实例的instanceId或是具有唯一性的数据,如果经过匹配就是当前服务就可以直接下发,如果是其他服务就转发给那个对应的服务就行了

给指定路径发送消息

还有一种场景也比较常见就是类似主题订阅,如订阅设备状态更新的数据,就要给每一个对应路径的连接发送消息了

我们可以使用不同的路径来表示不同主题,然后自定义一个连接选择器来匹配连接的路径和消息头中指定的路径

当然库中也已经实现了对应的PathSelectorPathMessage,可以通过配置开启

总结

最后请允许我发表一点对于抽象的拙见

抽象其实就和 “道生一,一生二,二生三,三生万物” 一样,根据你的顶级接口(也就是核心功能)不断的向外展开,你的顶级接口就是道(狭义的来讲)

以这个库为例,ConnectionLoadBalanceConcept就是这个库的道,他的核心功能就是发送消息,至于怎么发,发给谁,不确定,像是一个混沌的状态

那么什么是一,二,三呢,我们发送消息需要载体于是就有了ConnectionMessage,我们需要对Connection进行管理于是就有了ConnectionRepository, 我们需要转发消息于是就有了ConnectionSubscriber等等

而万物就像是具体的实现,是能落实的,基于Spring Cloud服务发现的连接管理器DiscoveryConnectionServerManager,基于路径的连接选择器PathSelector,基于ReactiveWebSocket连接ReactiveWebSocketConnection

就像是你创造的世界,不断的衍生出各种各样的规则,这些规则相辅相成,让你的世界平稳的运行

当然你的世界也有可能存在bug,手动狗头

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/414976.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java数据结构】线性表-队列

线性表-队列概念队列的使用队列模拟实现循环队列如何区分空与满双端队列 (Deque)概念 队列:只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出FIFO(FirstIn First Out) 入队列:进行插入操作的…

文章生成器写出来的原创文章

文章生成机器人 文章生成机器人是一种基于人工智能技术和自然语言处理算法的程序,可以自动地生成高质量、原创的文章。 文章生成机器人的优点如下: 提高工作效率:文章生成机器人能够在较短的时间内自动帮助用户生成大量的文章,提…

GaussDB工作级开发者认证—第三章开发设计建议

一. 数据库对象命名和设计建议 二. 表设计最佳实践 三. SQL查询最佳实践 SQL 最佳实践 - SELECT 避免对大字段执行order by,group by等引起排序的操作避免频繁使用count()获取大表行数慎用通配符字段 “*”避免在select目标列中使用子查询统计表中所有记录数时&…

设计模式之策略模式(C++)

作者:翟天保Steven 版权声明:著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处 一、策略模式是什么? 策略模式是一种行为型的软件设计模式,针对某个行为,在不同的应用场景下&…

win下配置pytorch3d

一、配置好的环境:py 3.9 pytorch 1.8.0 cuda 11.1_cudnn 8_0 pytorch3d 0.6.0 CUB 1.11.0 你可能觉得pytorch3d 0.6.0版本有点低,但是折腾不如先配上用了,以后有需要再说。 (后话:py 3.9 pytorch 1.12.1 cuda …

Log4j日志

log4j日志简介组成Logger 日志记录器Appender 日志目的地(Windows下的路径分隔符)※Layout 日志信息布局layout 指定输出的样式模板?layout.ConversionPattern 指定输出的每项内容及其格式顺序日志信息等级/优先级使用的Log4j的jar包代码示例…

BGP路由控制

实验要求 IP地址分配 给所有路由器按要求配置IP地址,AS之间使用AR编号连接作为IP,例如34.0.0.0/30。 在AS123中启动OSPF 启动BGP,并将24网段的路由宣告 R4 [r4]bgp 400 [r4-bgp]peer 24.0.0.2 as-number 123 [r4-bgp]peer 34.0.0.2 as-num…

Qt关于第三方库介绍

文章目录前言一、获取第三方库二、Makefile是什么?三、将第三方库添加到 Qt 项目中四、mingw和msvc的区别五、安装msvc六、安装mingw七、如何使用不同的编译器前言 本专栏的系统为:windows11 qt版本为:qt6.4.2 提示:以下是本篇文…

【从零开始学Skynet】实战篇《球球大作战》(五):gateway代码设计(上)

1、协议格式 在写代码之前,我们要先了解什么是协议,协议就是 “客户端向服务端发起的登录请求”,那么登录请求是什么样子的呢?这得先从TCP数据流说起,客户端发起的请求,就是一些二进制数据。 (…

OpenCV实例(六)行人检测

OpenCV实例(六)行人检测1.行人检测概述2.行人检测基础实现2.1基本流程2.2实现程序2.3参数优化3.完整行人检测程序作者:Xiou 1.行人检测概述 行人检测是目标检测的一个分支。目标检测的任务是从图像中识别出预定义类型目标,并确定…

【Python】json数据解析

目录 json文件数据解析 爬虫获取王者荣耀英雄信息json数据包并解析 爬虫获取抖音视频json数据包并解析 json文件数据解析 json字符串:通常类似python数据类型中的列表和字典的结合,也可能是单独的列表或者字典格式,通常可以通过json模块的…

亚马逊影响搜索排名的主要因素有哪些,使用测评做排名有哪些要求?

亚马逊产品的排名越高就意味着分配的流量越多而且带来更高的销量。那主要有哪些因素影响产品的排名呢? 1、产品销量 产品销量反映了该产品在同类产品中的销售情况,该数值会在产品Listing中展示,平台会每小时更新一次该排行榜。在平台算法看…

【Linux】线程控制分析:如何获取线程ID?线程如何自动回收?

Linux系统中, 线程是轻量级的进程. 我们已经介绍过了线程的相关概念, 见过了线程再Linux操作系统中的存在形式. 我们知道, 进程有自己相关控制接口, 等待、创建等 而线程作为轻量级的进程, 其实也是有控制接口的. 文章目录线程控制线程的创建与回收演示获取线程idpthread_sel…

用户管理系统-自动化测试

文章目录1. 思维导图编写 Web 自动化测试用例2. 创建测试项目3. 根据思维导图设计用户管理系统自动化测试用例3.1 准备工具类3.2 测试登录页面3.3 测试用户列表页3.4 测试添加用户页3.5 测试修改用户页3.6 未登录状态4. 自动化测试项目总结4.1 自动化测试项目实现步骤4.2 当前项…

图数据库驱动的基础设施运维实操

本文系图技术在大型、复杂基础设施之中 SRE/DevOps 的实践参考,并以 OpenStack 系统之上的图数据库增强的运维案例为例,揭示图数据库、图算法在智能运维上的应用。本文所有示例代码开源。 最近,有些尚未使用过图技术、DevOps/Infra 领域的工程…

除了Java,还可以培训学习哪些IT技术?

除了Java,还可以培训学习哪些IT技术?转行IT学Java似乎已经成为很多人的首选,原因无非是开发技术含量高、开发有前景、开发是一个互联网企业的核心岗位,最重要的是开发薪资待遇高。但其实只单纯因为薪资选择Java的话,小…

Flask数据迁移详细步骤

数据迁移详细步骤: 1. 安装好数据迁移的包 flask-sqlalchemy和flask-migrate Flask模型相关包安装 2. 在exts.py中初始化Migrate和SQLAlchemy 3. 在models中定义好模型 4. 在views.py中一定要导入models模块 from .models import * 5. 配置好数据库(sql…

MYSQL笔记01 数据库概述,SELECT语句,运算符,排序与分页,多表查询

数据库概述 为什么要使用数据库 持久化:把数据保存在可掉电式存储设备中以供之后使用。大多数情况下,特别是企业级应用,数据持久化意味着将内存中的数据保存到硬盘上加以"固化",而持久化的实现过程大多通过各种关系数据…

详解 23 种设计模式(多图 + 代码)

创建型模式 创建型模式的作用就是创建对象,说到创建一个对象,最熟悉的就是 new 一个对象,然后 set 相关属性。但是,在很多场景下,我们需要给客户端提供更加友好的创建对象的方式,尤其是那种我们定义了类&a…

每68个孩子里,就有一个自闭症,“来自星星的孩子”,离我们很近

今年4月2日是世界上第14个“世界自闭症日”。自闭症儿童,又称“星星儿童”,形容他们像遥远的星星一样独自在夜空中闪耀。自闭症并不少见根据世卫组织的调查,世界上每160名儿童中就有一人患有自闭症。根据《中国自闭症(自闭症&…