【stomp实战】搭建一套websocket推送平台

news2025/1/19 11:11:07

前面几个小节我们已经学习了stomp协议,了解了stomp客户端的用法,并且搭建了一个小的后台demo,前端页面通过html页面与后端服务建立WebSocket连接。发送消息给后端服务。后端服务将消息内容原样送回。通过这个demo我们学习了前端stomp客户端的用法,同时也学到了如何通过spring-boot来搭建一个websocket平台。
本节我们将继续深入,搭建一套可用于生产的websocket平台。

基本介绍

websocket连接推送服务包含两个服务,websocket-connector和websocket-gateway。

在这里插入图片描述
架构如上图
websocket-connector

  • 作为和客户端建立websocket连接的服务,负责消息的接收和推送

websocket-gateway

  • 作为后台服务,提供http接口给其他微服务,其他微服务可以通过http接口发送消息给指定的用户

使用说明

通过下面的步骤来进行调试

  1. 分别启动项目websocket-connector和websocket-gateway
  2. 访问下面接口,获取某个用户的token
    下面示例是获取用户1001的token
curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"userId\":\"1001\"}" "http://localhost:8081/api/ws/token/get"
  1. 打开websocket-connector调试页面http://localhost:8080/index.html
    将上一个接口获取到的token作为参数,与服务器建立连接
    在这里插入图片描述

  2. 通过页面的send按钮,发送一条消息给服务器,同时服务器会将此消息回复给前端页面。参考上图

  3. 通过websocket-gateway的接口,发送用户单播消息

curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"appCode\":\"test2\",\"messageData\":{\"body\":\"single message\",\"headers\":{}},\"topic\":\"/user/topic/single/hello\",\"userIds\":[1001]}" "http://localhost:8081/api/ws/message/single/send"

在这里插入图片描述
在这里插入图片描述

前端页面可以收到该消息的推送
6.通过websocket-gateway的接口,发送广播消息

curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"appCode\":\"test1\",\"messageData\":{\"body\":\"hello board message1\",\"headers\":{}},\"topic\":\"/topic/boardCast/hello\"}" "http://localhost:8081/api/ws/message/boardCast/send"

在这里插入图片描述
在这里插入图片描述

主要代码分析

前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>STOMP</title>
</head>
<body onload="disconnect()">
<h1 id="tip">消息发布订阅测试页</h1>
请输入token:<input type="text" id="token" placeholder=""> <br>
<button onclick="connect()" id="connect">connect</button>
<button onclick="disconnect()" id="disconnect">disconnect</button>

<p>输入消息: <span id="msg"></span></p>
<input type="text" id="content" placeholder=""> <br>
<button onclick="send()">send</button>

<ul id="ul">
    回答消息<p id="answer"></p>
    单播消息<p id="single"></p>
    广播消息<p id="board"></p>
</ul>
<script src="sockjs.min.js"></script>
<script src="stomp.min.js"></script>
<script>
    var stompClient = null;
    var endpoint = "/ws";

    //断开连接
    function disconnect() {
        if (stompClient != null) {
            stompClient.disconnect();
        }
        setConnect(false);
        console.log("disconnected");
    }
    //建立连接
    function connect() {
        var token = document.getElementById("token").value;
        if (token === '' || token === undefined) {
            alert("请输入token");
            return;
        }
        //连接请求头里面,设置好我们提前获取好的token
        var headers = {
            token: token
        };
        var url = endpoint;
        var socket = new SockJS(url);
        stompClient = Stomp.over(socket);
        //建立连接
        stompClient.connect(headers, function (msg) {
            setConnect(true);
            console.log("connected:" + msg);
            //订阅了三个topic
            //订阅用户消息topic1
            stompClient.subscribe("/user/topic/answer", function (response) {
                createElement("answer", response.body);
            });
            //订阅用户消息topic2
            stompClient.subscribe("/user/topic/single/hello", function (response) {
                createElement("single", response.body);
            });
            //订阅广播消息topic
            stompClient.subscribe("/topic/boardCast/hello", function (response) {
                createElement("board", response.body);
            });
        });
    }
    //主动发送消息给服务器,对应的后端topic为/app/echo
    function send() {
        var value = document.getElementById("content").value;
        var msg = {
            msgType: 1,
            content: value
        };
        stompClient.send("/app/echo", {}, JSON.stringify(msg));
    }

    function createElement(eleId, msg) {
        var singleP = document.getElementById(eleId);
        var p = document.createElement("p");
        p.style.wordWrap = "break-word";
        p.appendChild(document.createTextNode(msg));
        singleP.appendChild(p);
    }

    function setConnect(connected) {
        document.getElementById("connect").disabled = connected;
        document.getElementById("disconnect").disabled = !connected;
    }
</script>
</body>
</html>

websocket-connector端的代码

会话的鉴权及连接建立

@Slf4j
@Component
public class WebSocketInboundInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (accessor == null) {
            return message;
        }

        //建立连接
        if (Objects.equals(accessor.getCommand(), StompCommand.CONNECT)) {
            connect(message, accessor);
        }
        return message;
    }

    /**
     * 建立会话
     *
     * @param message
     * @param accessor
     */
    private void connect(Message<?> message, StompHeaderAccessor accessor) {
        String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);
        if (StringUtils.isEmpty(token)) {
            throw new MessageDeliveryException("token missing!");
        }
        String userId = TokenUtil.parseToken(token);
        if (StringUtils.isEmpty(userId)) {
            throw new MessageDeliveryException("userId missing!");
        }
        String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);

        UserSession userSession = new UserSession();
        userSession.setSimpleSessionId(simpleSessionId);
        userSession.setUserId(userId);
        userSession.setCreateTime(LocalDateTime.now());
        //关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息
        accessor.setUser(new UserSessionPrincipal(userSession));
    }
}

如何接收客户端发送的消息

@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
    private final SimpMessageSendingOperations msgOperations;
    private final SimpUserRegistry simpUserRegistry;

    /**
     * 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端
     */
    @MessageMapping("/echo")
    public void echo(Principal principal, Msg msg) {
        String username = principal.getName();
        msg.setContent("Echo: " + msg.getContent());
        msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
        int userCount = simpUserRegistry.getUserCount();
        int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
        log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);
    }
}

消费rabbitMQ推送的单播和广播消息

@Component
@Slf4j
public class MessageReceiveConsumer {

    private final Gson gson;
    private final ReceivedMessageHandler receivedMessageHandler;

    public MessageReceiveConsumer(Gson gson, ReceivedMessageHandler receivedMessageHandler) {
        this.gson = gson;
        this.receivedMessageHandler = receivedMessageHandler;
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(),
            exchange = @Exchange(value = WsConstants.SINGLE_EXCHANGE, type = ExchangeTypes.FANOUT)))
    public void handleSingleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        SingleMessage singleMessage = gson.fromJson(body, SingleMessage.class);
        receivedMessageHandler.handleSingleMessage(singleMessage);
        channel.basicAck(tag, false);
    }


    @RabbitListener(bindings = @QueueBinding(value = @Queue(),
            exchange = @Exchange(value = WsConstants.BOARD_CAST_EXCHANGE, type = ExchangeTypes.FANOUT)))
    public void handleBoardCastMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        BoardCastMessage boardCastMessage = gson.fromJson(body, BoardCastMessage.class);
        receivedMessageHandler.handleBoardCastMessage(boardCastMessage);
        channel.basicAck(tag, false);
    }
}

建立了两个exchange分别来接收消息。这里 @QueueBinding(value = @Queue(),通过这种方式建立的队列,队列名是spring取的一个随机名称,如下图所示
在这里插入图片描述

调用客户端工具类,发送消息给客户端

@Slf4j
@Component
public class ReceivedMessageHandler {
    private final WsMessageClient wsMessageClient;

    public ReceivedMessageHandler(WsMessageClient wsMessageClient) {
        this.wsMessageClient = wsMessageClient;
    }

    public void handleSingleMessage(SingleMessage singleMessage) {
        wsMessageClient.sendToUsers(singleMessage.getUserIds(), singleMessage);
    }


    public void handleBoardCastMessage(BoardCastMessage boardCastMessage) {
        wsMessageClient.boardCast(boardCastMessage);
    }
}

websocket-gateway 申请token接口

通过该接口,生成用户的jwtToken,在客户端建立连接时需要此token,不然无法建立连接

public class TokenController {
    @PostMapping("/get")
    public String getToken(@RequestBody @Validated TokenRequest tokenRequest) {
        return TokenUtil.generateToken(tokenRequest.getUserId());
    }
}

websocket-gateway 发送消息的接口

websocket-gateway暴露发送消息的接口给业务服务

public class MessageController {
    private final MessageSendService messageSendService;

    public MessageController(MessageSendService messageSendService) {
        this.messageSendService = messageSendService;
    }

    @PostMapping("/single/send")
    public Boolean singleSend(@RequestBody SingleMessage singleMessage) {
        return messageSendService.singleSend(singleMessage);
    }

    @PostMapping("/boardCast/send")
    public Boolean boardCastSend(@RequestBody BoardCastMessage boardCastMessage) {
        return messageSendService.boardCastSend(boardCastMessage);
    }

}

通过该http接口,最终会调用rabbitmq,发送消息给websocket-connector服务

项目地址

更多项目代码直接看一下源码吧
https://gitee.com/syk1234/websocket-services

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1622487.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【leetcode面试经典150题】71. 对称二叉树(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主&#xff0c;题解使用C语言。&#xff08;若有使用其他语言的同学也可了解题解思路&#xff0c;本质上语法内容一致&…

科技改变视听4K 120HZ高刷新率的投影、电视、电影终有用武之地

早在1888年&#xff0c;法国生理学家埃蒂安朱尔马莱就发明了一套盒式摄像机&#xff0c;能以120帧/s的速度在一条纸膜上曝光照片&#xff0c;但是当时没有相匹配的放映设备。而马莱的另一套拍摄设备是60帧/s的规格&#xff0c;并且图像质量非常好。 受此启发&#xff0c;雷诺的…

Linux上部署Jupyter notebook

安装jupyter notebook pip install notebook #或者 conda install notebook配置 jupyter notebook --generate-config## The IP address the notebook server will listen on. # Default: localhost # 设置可以访问的ip, 默认是localhost, 将其改为 * c.NotebookApp.ip *#…

AutoGPT-Forge使用教程,自行构建agent智能体

本博客给出AutoGPT-forge四个教程的翻译与理解&#xff0c;使用GPT4翻译&#xff0c; 参考官方教程https://aiedge.medium.com/autogpt-forge-a-comprehensive-guide-to-your-first-steps-a1dfdf46e3b4 使用AutoGPT Github代码日期2024/4/22&#xff1b; 博客开始编辑日期20…

java和python刷题的一些语法规则总结(未完成)

语法总结 Java篇1、代码补全2、编程题中常用头文件3、编程题常用的内置方法4、模版 Python篇1、2、编程题中常用的头文件3、编程题中常用的内置方法4、伪代码模版 去哪练习&#xff1f; 1、LeetCode上有个面试模拟 2、牛客公司真题&#xff08;ACM模式&#xff09; ⚠️ 笔试均…

Android Gradle查看依赖库

1.gradle :app:dependencies 输出列表展示了所有configuration下的依赖树&#xff0c;依赖关系明显&#xff0c;层次清晰。 2.日志太长可以写入本地文件gradle :app:dependencies > D:/log.txt 3.gradlew processReleaseManifest --stacktrace 跟踪具体报错文件 注…

解决Android studio更换sdk地址后flutter项目显示no device selected

问题描述 因为之前sdk的路径在c盘上,经常在更新或下在sdk后c盘饱满,于是就更换了sdk的路径,更换sdk路径后就导致flutter项目在选择设备的时候出现no device selected 找不到设备,但是在device Manager可以看到物理设备或者是虚拟设备。如下图所示。 问题分析 导致这个问题…

python 报错:ImportError: cannot import name ‘kaiser‘ from ‘scipy.signal‘

python 报错&#xff1a;ImportError: cannot import name kaiser from scipy.signal 介绍第一步&#xff1a;第二步&#xff1a;最终结果&#xff1a; 介绍 这个错误表明在导入 scipy.signal 模块时出现了问题&#xff0c;因为无法找到 kaiser 函数。可能的原因是 scipy 库没有…

C语言 | Leetcode C语言题解之第47题全排列II

题目&#xff1a; 题解&#xff1a; int* vis;void backtrack(int* nums, int numSize, int** ans, int* ansSize, int idx, int* perm) {if (idx numSize) {int* tmp malloc(sizeof(int) * numSize);memcpy(tmp, perm, sizeof(int) * numSize);ans[(*ansSize)] tmp;return…

什么样的汽车制造供应商管理平台 可以既高效又安全?

汽车制造供应商管理是汽车制造商最基础的工作项&#xff0c;因为在汽车制造环节&#xff0c;与供应商间存在着必不可少又高频的业务往来&#xff0c;而在汽车制造供应商之间&#xff0c;文件往来是确保业务顺利进行、沟通协作和质量控制的重要环节。这些文件往来涵盖了多个方面…

linux的“>”和“>>”

在Linux中&#xff0c;>和>>都是用于文件重定向的操作符&#xff0c;它们用于将命令的输出发送到文件中。 > 用于创建一个新文件或覆盖现有文件的内容。当你执行一个如 command > file.txt 的命令时&#xff0c;如果 file.txt 文件存在&#xff0c;它的内容将被…

黄金回收价格和国际金价差多少?

在探讨黄金回收价格与国际金价的关系时&#xff0c;了解黄金的基础知识至关重要。黄金作为一种贵金属&#xff0c;其价值不仅仅在于它本身的物质属性&#xff0c;还包括它的纯度、类型以及市场需求等多种因素。这些特性决定了黄金的价值&#xff0c;并影响着黄金回收价格。 大家…

使用 Gradio 的“热重载”模式快速开发 AI 应用

在这篇文章中&#xff0c;我将展示如何利用 Gradio 的热重载模式快速构建一个功能齐全的 AI 应用。但在进入正题之前&#xff0c;让我们先了解一下什么是重载模式以及 Gradio 为什么要采用自定义的自动重载逻辑。如果你已熟悉 Gradio 并急于开始构建&#xff0c;请直接跳转到第…

LLM 安全 | 大语言模型应用安全入门

一、背景 2023年以来&#xff0c;LLM 变成了相当炙手可热的话题&#xff0c;以 ChatGPT 为代表的 LLM 的出现&#xff0c;让人们看到了无限的可能性。ChatGPT能写作&#xff0c;能翻译&#xff0c;能创作诗歌和故事&#xff0c;甚至能一定程度上做一些高度专业化的工作&#x…

20240309web前端_第四次作业_完成随机点名程序

要求 一、结合抽奖案例完成随机点名程序&#xff0c;要求如下: 1.点击点名按钮&#xff0c;名字界面随机显示&#xff0c;按钮文字由点名变为停止 2.再次点击点名按钮&#xff0c;显示当前被点名学生姓名&#xff0c;按钮文字由停止变为点名 3.样式请参考css及html自由发挥完成…

短视频矩阵系统源码====3年技术公司源头开发商交付

短视频矩阵系统#源头技术打磨 哈尔滨爆火带动了一波“北上热潮”&#xff0c;各地文旅坐不住了&#xff0c;兄弟们开“卷”&#xff01;这波互卷浪潮中&#xff0c;河南率先出圈。如今&#xff0c;河南文旅账号粉丝已经突破200w&#xff01; 01 矩阵打法&#xff0c;很难不火…

车载视频监控:守护行车安全,助力企业管理

随着科技的不断发展&#xff0c;车载视频监控设备已经成为现代车辆安全监控的重要工具。它不仅可以实时记录车辆行驶过程中的情况&#xff0c;为交通事故的调查提供证据&#xff0c;还可以帮助企业和个人实现安全监控&#xff0c;保障人员和财产安全。本文将从车载视频监控的定…

生物安全柜检测与验证标准指南及验证设备选型建议

为什么生物安全柜要检测和验证 包括生物安全柜行业标准、国家实验室生物安全通用要求、微生物和生物医学实验室安全通则等都规定了生物安全柜除了出厂检测外&#xff0c;在安装、移动、及使用一定时间后须做风速、高效过滤器完整性、防漏等十几项检测。这是因为&#xff0c;对…

黑龙江—等保测评三级安全设计思路

需求分析 6.1、 系统现状 6.2、 现有措施 目前&#xff0c;信息系统已经采取了下述的安全措施&#xff1a; 1、在物理层面上&#xff0c; 2、在网络层面上&#xff0c; 3、在系统层面上&#xff0c; 4、在应用层面上&#xff0c; 5、在管理层面上&#xff0c; 6.…

文件摆渡:安全、高效的摆渡系统助力提升效率

很多组织和企业都会通过网络隔离的方式来保护内部的数据&#xff0c;网络隔离可以是物理隔离&#xff0c;也可以是逻辑隔离&#xff0c;如使用防火墙、VPN、DMZ等技术手段来实现&#xff0c;隔离之后还会去寻找文件摆渡方式&#xff0c;来保障日常的业务和经营需求。 进行网络隔…