基于springboot websocket和okhttp实现消息中转

news2025/2/24 20:59:59

1、业务介绍

消息源服务的消息不能直接推给用户侧,用户与中间服务建立websocket连接,中间服务再与源服务建立websocket连接,源服务的消息推给中间服务,中间服务再将消息推送给用户。流程如下图:
在这里插入图片描述
此例中我们定义中间服务A的端口为8082,消息源头服务B的端口为8081,方便阅读下面代码。
说明:此例子只实现了中间服务的转发,连接的关闭等其他逻辑并没有完善,如需要请自行完善;

2、中间服务实现

中间服务即为上图的中间服务A,由于中间服务既要发送(发给用户端)消息,又要接收(从消息源服务B接收)消息,故服务A分为服务端与客户端。
服务A的websocket服务端我们使用springboot websocket实现,客户端使用okhttp实现;会话缓存暂使用内存缓存(实际项目中可置于其他缓存中)
中间服务所需依赖为:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.2.2</version>
</dependency>

缓存类:

public class WSCache {

    //存储客户端session信息, {会话id:ws_session}
    public static Map<String, Session> clients = new ConcurrentHashMap<>();

    //存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}
    public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();
}

自定义消息类:

@Accessors(chain = true)
@Data
public class MsgInfo {

    private String massage;

    //为userId,用于从缓存中获取对应用户的websocket session
    private String userKey;
}

2.1 中间服务A的客户端:

客户端也可以使用springboot websocket,当下我们选择使用okhttp实现。

@Slf4j
public class CommonWSClient extends WebSocketListener {

    /**
     * websocket连接建立
     *
     * @param webSocket
     * @param response
     */
    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
		log.info("客户端连接建立:{}", response.body().string());
    }

    /**
     * 收到消息
     * @param webSocket
     * @param text
     */
    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
        log.info("okhttp receive=>{}", text);
        //todo 收到源(8081)的消息,取到对应userId的消息,并将消息通过本地server发送给用户
        ObjectMapper mapper = new ObjectMapper();
        try {
            MsgInfo msgInfo = mapper.readValue(text, MsgInfo.class);
            Set<String> strings = WSCache.connection.get(msgInfo.getUserKey());
            if(!CollectionUtils.isEmpty(strings)){
                for (String sid : strings) {
                    Session session = WSCache.clients.get(sid);
                    session.getBasicRemote().sendText(msgInfo.getMassage());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            //throw new RuntimeException(e);
        }
    }

    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        super.onMessage(webSocket, bytes);
    }

    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        super.onClosing(webSocket, code, reason);
        log.info("okhttp socket closing.");
    }

    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        super.onClosed(webSocket, code, reason);
        log.info("okhttp socket closed.");
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        super.onFailure(webSocket, t, response);
        if (response == null) {
            log.error("okhttp onFailure, response is null.");
            return;
        }
        try {
            log.error("okhttp onFailure, code: {}, errmsg: {}", response.code(), response.body().string());
        } catch (IOException e) {
            log.warn("okhttp onFailure failed, error: {}", e.getMessage());
        }
    }

}

2.2 中间服务A的服务端:

websocket服务:

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {
    
    //会话id
    private String sid = null;

    //建立连接的用户id
    private String userId;

    /**
     * @description: 当与用户端连接成功时,执行该方法
     * @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解
     **/
    @OnOpen
    public String onOpen(Session session, @PathParam("userId") String userId){
        this.sid = UUID.randomUUID().toString();
        this.userId = userId;
        WSCache.clients.put(this.sid,session);
        //判断该用户是否存在会话信息,不存在则添加
        Set<String> clientSet = WSCache.connection.get(userId);
        if (CollectionUtils.isEmpty(clientSet)){
            clientSet = new HashSet<>();
            clientSet.add(this.sid);
        }else {
            clientSet.add(this.sid);
        }
        WSCache.connection.put(userId,clientSet);
        log.info("用户{}与本地(8082)server建立连接", this.userId);
        
        //todo 本地client与源server(8081)连接
        Request requestRemote = new Request.Builder()
                .url("ws://127.0.0.1:8081/api/notice/" + userId)
                .build();
        OkHttpClient webSocketClientRemote = new OkHttpClient.Builder()
                .build();
        WebSocket localClientRemote = webSocketClientRemote.newWebSocket(requestRemote, new CommonWSClient());
        log.info("本地server创建本地client,且本地client与远程(8082)server连接成功");
        
        return userId + "与本地server连接";
    }

    /**
     * @description: 当连接失败时,执行该方法
     **/
    @OnClose
    public void onClose(){
        WSCache.clients.remove(this.sid);
        System.out.println(this.sid+"连接断开");
    }

    /**
     * @description: 当收到client发送的消息时,执行该方法
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("-----------收到来自用户:" + this.userId + "的信息   " + message);
    }


    /**
     * @description: 当连接发生错误时,执行该方法
     **/
    @OnError
    public void onError(Throwable error){
        System.out.println("error--------系统错误");
        error.printStackTrace();
    }
}

websocket配置类:

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

3、消息源服务

消息源服务B只需要websocket服务用来发送消息即可,其实现与中间服务A的服务端相同。
服务:

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {
    //存储客户端session信息, {会话id:ws_session}
    public static Map<String, Session> clients = new ConcurrentHashMap<>();

    //存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}
    public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();

    //会话id
    private String sid = null;

    //建立连接的用户id
    private String userId;

    /**
     * @description: 当与客户端的websocket连接成功时,执行该方法
     * @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId){
        log.info("onOpen-->session.getRequestParameterMap():{}", session.getRequestParameterMap());
        this.sid = UUID.randomUUID().toString();
        this.userId = userId;
        clients.put(this.sid,session);
        //判断该用户是否存在会话信息,不存在则添加
        Set<String> clientSet = connection.get(userId);
        if (clientSet == null){
            clientSet = new HashSet<>();
            connection.put(userId,clientSet);
        }
        clientSet.add(this.sid);
        System.out.println(this.userId + "用户建立连接," + this.sid+"连接开启!");
    }

    /**
     * @description: 当连接失败时,执行该方法
     **/
    @OnClose
    public void onClose(){
        clients.remove(this.sid);
        System.out.println(this.sid+"连接断开");
    }

    /**
     * @description: 当收到客户端发送的消息时,执行该方法
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("-----------收到来自用户:" + this.userId + "的信息   " + message);
        //自定义消息实体
        MsgInfo msgInfo = new MsgInfo()
                .setUserKey(this.userId)
                .setMassage("服务端-" + System.currentTimeMillis() + ":已收到用户" +
                        this.userId + "的信息: " + message);
        sendMessageByUserId(this.userId,  msgInfo);
    }


    /**
     * @description: 当连接发生错误时,执行该方法
     **/
    @OnError
    public void onError(Throwable error){
        System.out.println("error--------系统错误");
        error.printStackTrace();
    }

    /**
     * @description: 通过userId向用户发送信息
     * 该类定义成静态可以配合定时任务实现定时推送
     **/
    public static void sendMessageByUserId(String userId, MsgInfo msgInfo){
        if (!StringUtils.isEmpty(userId)) {
            Set<String> clientSet = connection.get(userId);
            //用户是否存在客户端连接
            if (Objects.nonNull(clientSet)) {
                Iterator<String> iterator = clientSet.iterator();
                while (iterator.hasNext()) {
                    String sid = iterator.next();
                    Session session = clients.get(sid);
                    //向每个会话发送消息
                    if (Objects.nonNull(session)){
                        try {
                            //同步发送数据,需要等上一个sendText发送完成才执行下一个发送
                            ObjectMapper mapper = new ObjectMapper();
                            session.getBasicRemote().sendText(mapper.writeValueAsString(msgInfo));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    @Scheduled(cron = "0/10 * * * * ?")
    public void testSendMessageByCron(){
        log.info("-----------模拟消息开始发送--------------");
        //模拟两个用户100和200
        MsgInfo msg100 = new MsgInfo()
                .setUserKey("100")
                        .setMassage("这是8081发给用户100的消息" + System.currentTimeMillis());
        sendMessageByUserId("100", msg100);
        MsgInfo msg200 = new MsgInfo()
                .setUserKey("200")
                .setMassage("这是8081发给用户200的消息" + System.currentTimeMillis());
        sendMessageByUserId("200", msg200);
    }
}

4、测试

我们使用: wss在线测试工具进行测试;
1、 打开两个该工具窗口,分别模拟用户100和用户200,这两个用户都连接中间服务A(端口8082的服务);
用户100
用户200
2、分别启动消息源服务B和中间服务A
此时在服务B控制台我们可以看到:
在这里插入图片描述
我们模拟的消息发送已经在给用户100和用户200发送,因为我们的用户100和用户200均没有与中间服务A建立连接,故此时测试界面看不到消息;
当我们在用户100的模拟界面点击“开启连接”后,可以在右侧看到发给用户100的模拟消息:
在这里插入图片描述

之后我们再打开用户200的连接:
在这里插入图片描述

好了,到这里就结束了,有任何问题请积极指出,此例子只是个例子,并未经受任何生产的测试,欢迎讨论沟通:)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1854576.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux应急响应——知攻善防应急靶场-Linux(1)

文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要&#xff1a; 小王急匆匆地找到小张&#xff0c;小王说"李哥&#xff0c;我dev服务器被黑了",快救救我&…

【React】ref

概述 使用 ref 引用值 – React 中文文档 希望组件“记住”某些信息&#xff0c;但又不想让这些信息更新时 触发新的渲染 时&#xff0c;可以使用 ref 。 也就是说 ref 对象 包裹的值 React 追踪不到的&#xff0c;他像是用来存储组件信息的秘密“口袋”。 与 state 相同的是…

一、系统学习微服务遇到的问题集合

1、启动了nacos服务&#xff0c;没有在注册列表 应该是版本问题 Alibaba-nacos版本 nacos-文档 Spring Cloud Alibaba-中文 Spring-Cloud-Alibaba-英文 Spring-Cloud-Gateway 写的很好的一篇文章 在Spring initial上面配置 start.aliyun.com 重新下载 < 2、 No Feign…

美团携手HarmonyOS SDK,开启便捷生活新篇章

华为开发者大会&#xff08;HDC 2024&#xff09;于6月21日在东莞松山湖拉开序幕&#xff0c;通过一系列精彩纷呈的主题演讲、峰会、专题论坛和互动体验&#xff0c;为开发者们带来了一场知识与技术的盛宴。6月23日&#xff0c;《HarmonyOS开放能力&#xff0c;使能应用原生易用…

上位机图像处理和嵌入式模块部署(mcu和swd接口)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 最近学习mcu的时候&#xff0c;接触了不少调试器&#xff0c;这里面有daplink、st-link v2、j-link v9。虽然模块的形状可能不太一样&#xff0c;但…

程序人生:关于RHCE红帽认证这件事

花了两个月备考红帽&#xff0c;最终终于双满分通过。 关于考试 RHCE红帽认证总共需要考两门&#xff1a;RHCSA、RHCE。 RHCSA主要是考察基本的Linux操作&#xff1a;用户、权限、空间扩容、yum、容器等内容。 RHCE主要是考察ansible playbook 代码的开发。 通过考试没有别…

【web1】标签,css,js

文章目录 1.标签&#xff1a;input1.1 html&#xff1a;HTML&#xff08;用于创建网页结构&#xff09;&#xff0c;CSS&#xff08;对页面进行美化&#xff09;&#xff0c;JavaScript&#xff08;用于与用户交互&#xff09;1.2 文本标签&#xff1a;字体属性1.3 a标签&#…

DIVE INTO DEEP LEARNING 50-55

文章目录 50. semantic segmentation50.1 Basic concepts50.2 Major application 51. Transposed convolution51.1 Basic concepts51.2 Major role51.3 Implementation steps and application areas51.4 Transposed convolution51.5 Transposed convolution is a type of convo…

卧槽,6。套死你猴子,Tomcat访问html页面显示源码?

卧槽&#xff0c;6。Tomcat访问html页面显示源码&#xff1f; 元凶text/explain //踩坑&#xff01;&#xff01;&#xff01;不能用 servletResponse.setContentType("text/explain&#xff0c;否则访问html会看到源码&#xff0c;而不是渲染页面; charsetUTF-8"…

IDEA各种实体类运行爆红,不运行就没事

1.问题描述 如图所示&#xff0c;后端项目的import的各种entity爆红&#xff0c;点击也有导入包的提示&#xff0c;且这种报红几乎遍布了整个工程项目 2.我的解决方案 清空缓存&#xff0c;然后把target文件删掉&#xff0c;重新跑 3.小结 idea项目有时候就是一个核弹&…

【科普】半导体制造过程的步骤、技术、流程

在这篇文章中&#xff0c;我们将学习基本的半导体制造过程。为了将晶圆转化为半导体芯片&#xff0c;它需要经历一系列复杂的制造过程&#xff0c;包括氧化、光刻、刻蚀、沉积、离子注入、金属布线、电气检测和封装等。 基本的半导体制造过程 1.晶圆&#xff08;Wafer&#xf…

免费一年SSL证书申请——建议收藏

免费一年SSL证书申请——建议收藏 获取免费一年期SSL证书其实挺简单的 准备你的网站&#xff1a; 确保你的网站已经有了域名&#xff0c;而且这个域名已经指向你的服务器。还要检查你的服务器支持HTTPS&#xff0c;也就是443端口要打开&#xff0c;这是HTTPS默认用的。 验证域…

23.并发

目录 一、一些概念二、进程和线程2.1 概念2.2 多线程导致的问题2.3 使用spawn创建新线程2.4 线程与move闭包 三、消息传递3.1 概念3.2 创建通道3.3 示例3.4 其它测试 四、共享状态并发4.1 互斥器4.2 Mutex的API4.3 多线程共享Mutex1&#xff09;在多线程之间共享值&#xff1b;…

ARC学习(3)基本编程模型认识(三)

笔者来介绍arc的编程模型的中断流程和异常流程 1、中断介绍 主要介绍一下中断进入的流程&#xff0c;包括需要配置的寄存器等信息。 中断号&#xff1a;16-255&#xff0c;总共240个中断。触发类型&#xff1a;脉冲或者电平触发中断优先级&#xff1a;16个&#xff0c;0最大&…

在linux系统中使用docker、mysql实例

systemctl 是一个命令行工具&#xff0c;用于控制和管理基于 systemd 的 Linux 发行版中的系统和服务。 启动服务 &#xff1a;使用 systemctl start [service-name] 开始一个服务。 如启动docker&#xff1a;systemctl start docker 停止服务 &#xff1a;使用 systemctl st…

【python】python葡萄酒国家分布情况数据分析pyecharts可视化(源码+数据集+论文)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

力扣SQL50 超过5名学生的课

Problem: 596. 超过5名学生的课 Code select class from courses group by class having count(distinct student) > 5;

FFmpeg源码:ff_ctz / ff_ctz_c函数分析

一、ff_ctz函数的作用 ff_ctz定义在FFmpeg源码目录的libavutil/intmath.h 下&#xff1a; #ifndef ff_ctz #define ff_ctz ff_ctz_c /*** Trailing zero bit count.** param v input value. If v is 0, the result is undefined.* return the number of trailing 0-bits*/…

如何使用AI工具进行写作

随着AI人工智能技术的飞速发展&#xff0c;AI工具已经逐渐成为学术和专业写作的得力助手。AI工具不仅可以帮助我们提高写作效率&#xff0c;还能在内容创作上提供灵感和支持。在本文中&#xff0c;小编将和大家分享如何利用AI工具提高写作效率和质量&#xff0c;并确保文章的原…

基于CDMA的多用户水下无线光通信(3)——解相关多用户检测

继续上一篇博文&#xff0c;本文将介绍基于解相关的多用户检测算法。解相关检测器的优点是因不需要估计各个用户的接收信号幅值而具有抗远近效应的能力。常规的解相关检测器有运算量大和实时性差的缺点&#xff0c;本文针对异步CDMA的MAI主要来自干扰用户的相邻三个比特周期的特…