Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

news2025/1/11 8:06:33

1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.配置RocketMQ连接信息:在application.propertiesapplication.yml中配置RocketMQ的连接信息,包括Name Server地址等:

spring:
  application:
    name: ${sn.publish}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
        bindings:
          output:
            producer:
              group: testSocket
              sync: true
      bindings:
        output:
          destination: test-topic
          content-type: application/json

3.消息发布组件

@Component
public class MqSourceComponent {
    @Resource
    Source source;

    public void publishNotify(SampleNotifyDTO notify) {
        source.output().send(MessageBuilder.withPayload(notify).build());
    }
}

4.消息发布控制器

@RestController
@Api(tags = "rocketmq")
public class MqController {
    @Resource
    MqSourceComponent mq;

    @ApiOperation(value = "测试发布消息")
    @PostMapping("test-publish")
    public JsonVO<String> testSend(SampleNotifyDTO notify) {
        mq.publishNotify(notify);
        return JsonVO.success("消息已发送");
    }
}

项目结构:

接下来是websocket模块的搭建

1. 依赖添加

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.application.yml配置文件

server:
  port: ${sp.ws}
spring:
  application:
    name: ${sn.ws}
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: ${rocket-mq.name-server}
      bindings:
        input:
          destination: test-topic
          content-type: application/json
          group: testSocket

3.将应用程序绑定到消息代理

@EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {

    public static void main(String[] args) {
        SpringApplication.run(WsApplication.class, args);
    }

}

4.消息订阅组件

监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。

@Component
@Slf4j
public class MqListenComponent {
    @Resource
    ChatService chat;

    @StreamListener(Sink.INPUT)
    public void listenNotify(SampleNotifyDTO notify) {
        log.info(notify.toString());
        chat.sendMessage(notify.getClientId(), notify);
    }
}

5.消息通知服务

package com.zeroone.star.ws.service;

import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


@Component
@ServerEndpoint("/chat")
public class ChatService {
    /**
     * 连接会话池
     */
    private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session) throws IOException {
        // 判断客户端对象是否存在
        if (SESSION_POOL.containsKey(session.getQueryString())) {
            CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");
            session.getUserProperties().put("reason", closeReason);
            session.close();
            return;
        }
        // 将客户端对象存储到会话池
        SESSION_POOL.put(session.getQueryString(), session);
        System.out.println("客户端(" + session.getQueryString() + "):开启了连接");
    }

    @OnMessage
    public String onMessage(String msg, Session session) throws IOException {
        // 解析消息 ==> ID::消息内容
        String[] msgArr = msg.split("::", 2);
        // 处理群发消息,ID==all表示群发
        if ("all".equalsIgnoreCase(msgArr[0])) {
            for (Session one : SESSION_POOL.values()) {
                // 排除自己
                if (one == session) {
                    continue;
                }
                // 发送消息
                one.getBasicRemote().sendText(msgArr[1]);
            }
        }
        // 指定发送
        else {
            // 获取接收方
            Session target = SESSION_POOL.get(msgArr[0]);
            if (target != null) {
                target.getBasicRemote().sendText(msgArr[1]);
            }
        }
        return session.getQueryString() + ":消息发送成功";
    }

    @OnClose
    public void onClose(Session session) {
        // 连接拒绝关闭会话
        Object reason = session.getUserProperties().get("reason");
        if (reason instanceof CloseReason) {
            CloseReason creason = (CloseReason) reason;
            if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {
                System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");
                return;
            }
        }
        // 从会话池中移除会话
        SESSION_POOL.remove(session.getQueryString());
        System.out.println("客户端(" + session.getQueryString() + "):关闭连接");
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());
    }

    @SneakyThrows
    public void sendMessage(String id, Object message) {
        // 群发
        if ("all".equalsIgnoreCase(id)) {
            for (Session one : SESSION_POOL.values()) {
                // 发送消息
                one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
        // 指定发送
        else {
            // 获取接收方
            Session target = SESSION_POOL.get(id);
            if (target != null) {
                target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));
            }
        }
    }
}

项目结构:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1143413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM虚拟机:运行时数据区详解

本文重点 我们前面已经将类的加载过程进行了全面的了解和学习,按照如下所示的JVM架架构图,接下来我们应该学习运行时数据区了。 运行时数据区 如上图所示,灰色的标识线程私有,基本不存在垃圾回收。而非灰色的是线程共享的,存在垃圾回收。 PC计数器 每个线程都有一个程序…

【java】建筑施工一体化智慧工地信息管理系统源码

智慧工地系统是一种利用人工智能和物联网技术来监测和管理建筑工地的系统。它可以通过感知设备、数据处理和分析、智能控制等技术手段&#xff0c;实现对工地施工、设备状态、人员安全等方面的实时监控和管理。 一、智慧工地让工程施工智能化 1、内容全面&#xff0c;多维度数…

简化geojson策略

1、删除无用的属性&#xff0c;也就是字段&#xff0c;在shp的时候就给删了 用arcgis等等软件都可以做到 2、简化坐标的小数位数 &#xff08;1&#xff09;网上推荐的办法&#xff0c;俺不会Python… github.com/perrygeo/geojson-precision &#xff08;2&#xff09;曲线…

【C++项目】高并发内存池第五讲内存回收释放过程介绍

内存回收 1.ThreadCache2.CentralCache3.PageCache 1.ThreadCache void ThreadCache::Deallocate(void* ptr, size_t size) {assert(ptr);assert(size < MAX_BYTES);//计算在哪号桶中&#xff0c;然后插入进去size_t index SizeClass::Index(size);_freeLists[index].Push…

HCIP笔记——数据链路层协议

网络类型 根据二层&#xff08;数据链路层&#xff09;所使用的协议来进行区分。 MA——多点接入网络 BMA——广播型多点接入网络——以太网 NBMA——非广播型多点接入网络 P2P——点到点的网络 以太网协议 MAC地址——区分和标识不同的设备 以太网中独有的一种地址——MAC地址…

基于华为云 IoT 物联网平台实现家居环境实时监控

01 智能家居环境监测 智能家居环境监测采用 Ruff 开发板作为主控&#xff0c;串口线连接温湿度传感器 DHT11 和空气质量传感器 SDS011&#xff0c;每5分钟采集一次数据&#xff0c;通过 MQTT 协议发送到华为云 IoT 物联网平台&#xff0c;并基于数据分析服务实时计算出整个家庭…

大数据Doris(十三):创建用户和创建数据库并赋予权限

文章目录 创建用户和创建数据库并赋予权限 一、创建用户

Python算法练习 10.28

leetcode 700 二叉搜索树中的搜索 给定二叉搜索树&#xff08;BST&#xff09;的根节点 root 和一个整数值 val。 你需要在 BST 中找到节点值等于 val 的节点。 返回以该节点为根的子树。 如果节点不存在&#xff0c;则返回 null 。 示例 1: 输入&#xff1a;root [4,2,7,1,…

springboot web项目中 Set-Cookie 失败 办法

1. 背景 目前有个项目 线上环境 使用spring session管理的登录 项目中有两个接口 一个用来登录的 登录成功后会设置cookie 后续请求就会使用该cookie &#xff08;cookie的键值就是session Id 和 登录后的信息 例如菜单&#xff0c;权限等&#xff09; 一个用来检查是否登录…

听GPT 讲Rust源代码--library/std(5)

File: rust/library/std/src/sys/unsupported/time.rs 在Rust源代码中&#xff0c;rust/library/std/src/sys/unsupported/time.rs文件的作用是提供对于时间的支持&#xff0c;特别是在不支持的操作系统上。 该文件中包含了两个结构体定义&#xff0c;分别是Instant和SystemTim…

confluence

confluence PS&#xff1a;此文档全部由docker部署 1.准备挂载目录 1.给mysql创建data volume卷 [rooti-1-17 ~]# docker volume create confluence-mysql 2.给mysql创建配置文件 [rooti-1-17 ~]# mkdir -p /u01/confluence/mysql/ [rooti-1-17 ~]# cd /u01/confluence/my…

linux--

一、crond 任务调度 1、原理示意图 2、crontab 进行定时任务的设置 2.1. 概述 任务调度&#xff0c;是指系统在某个时间执行的特定的命令或程序。任务调度分类&#xff1a; 系统工作: 有些重要的工作必须周而复始地执行。如病毒扫描等 个别用户工作:个别用户可能希望执行某些…

springboot心理咨询管理系统

springboot心理咨询管理系统&#xff0c;java心理咨询管理系统&#xff0c;心理咨询管理系统 运行环境&#xff1a; JAVA版本&#xff1a;JDK1.8 IDE类型&#xff1a;IDEA、Eclipse都可运行 数据库类型&#xff1a;MySql&#xff08;8.x版本都可&#xff09; 硬件环境&#xf…

基于Qt串口Serial Port配置纯代码实现(桌面和嵌入式平台)

## Serial Port Qt 提供了串口类,可以直接对串口访问。我们可以直接使用 Qt 的串口类编程即可,十分方便。Qt 串口类不仅在 Windows 能用,还能在 Linux 下用,虽然串口编程不是什么新鲜事儿,既然 Qt 提供了这方面的接口,我们就充分利用起来,这将会使我们的开发十分方便!…

力扣刷题 day57:10-27

1.将数组划分成相等数对 给你一个整数数组 nums &#xff0c;它包含 2 * n 个整数。 你需要将 nums 划分成 n 个数对&#xff0c;满足&#xff1a; 每个元素 只属于一个 数对。 同一数对中的元素 相等 。 如果可以将 nums 划分成 n 个数对&#xff0c;请你返回 true &#x…

Java面试八股文之暑假合集

八股文暑假合集 基础篇二分查找 java基础篇7月12号面向对象和面向过程的区别重载和重写String 7月13号自动装箱和拆箱静态方法构造方法成员变量和局部变量对象引用和对象实例返回值 与equals(重要)hashcode()和equals()HashMap 7月16号线程&#xff0c;进程和程序final关键字的…

fastadmin分类下拉(多级分类)使用教程

效果图1&#xff1a; 在后台分类管理中&#xff0c;添加需要的分类数据 效果图2&#xff1a; 在后台添加页面&#xff0c;点击下拉即可出现分类多级下拉数据 以上就是效果图。 分类下拉实现步骤&#xff1a; 1.更改控制器 找到需要修改的控制器&#xff0c;修改公共方法 _i…

matlab simulink PMSM_SVPWM PI转速控制

1、内容简介 略 8-可以交流、咨询、答疑 2、内容说明 略PMSM_SVPWM PI转速控制 PMSM SVPWM PI转速控制 3、仿真分析 4、参考论文 略

推理还是背诵?通过反事实任务探索语言模型的能力和局限性

推理还是背诵&#xff1f;通过反事实任务探索语言模型的能力和局限性 摘要1 引言2 反事实任务2.1 反事实理解检测 3 任务3.1 算术3.2 编程3.3 基本的句法推理3.4 带有一阶逻辑的自然语言推理3.5 空间推理3.6 绘图3.7 音乐3.8 国际象棋 结果5 分析5.1 反事实条件的“普遍性”5.2…

线程池的理解

线程池 线程池本质上是一种池化技术&#xff0c;而池化技术是一种资源复用的思想&#xff0c;比较常见的有连接池、内存池、对象池。 而线程池里面复用的是线程资源&#xff0c;它的核心设计目标&#xff0c;有两个&#xff1a; 减少线程的频繁创建和销毁带来的性能开销&#x…