spring boot 集成rocketMq + 基本使用

news2025/1/12 0:56:19

1. RocketMq基本概念

1. NameServer
每个NameServer结点之间是相互独立,彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,
相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker
2. Broker
消息存储和中转角色,负责存储和转发消息
在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
3. topic : 一个消息的集合的名字
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。
4. 生产者
生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地,
并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,
轮询从队列列表中选择一个队列(默认轮询)
5. 消费者
消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上,
然后直接跟Broker建立连接通道,然后开始消费消息

2. maven 引入starter

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

3.yml配置

3.1 生产者yml 配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my-group
    # 发送消息超时时间
    send-message-timeout: 5000
    # 发送消息失败重试次数
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.2 消费者yml 配置

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    topic: topic_test
    group: consumer_my-group

4.生产者发送消息

4.1 一般消息

@Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     *  一般消息
     * Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。
     * 使用 Tag 可以实现对 Topic 中的消息进行过滤。
     * **/
    @GetMapping("/send")
    public String send(){
        rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");
        rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");
        return "rocketMq普通消息发送完成";
    }

4.2 顺序消息

/** 支持消费者按照发送消息的先后顺序获取消息 */
    @GetMapping("/send/orderly")
    public String sendOrder(){
        //发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列
        rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");
        rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");
        return "rocketMq顺序-消息发送成功";
    }

4.3 同步消息

@GetMapping("/send/sync")
    public String sendMsg() {
        String message = "我是同步消息:" + LocalDateTime.now();
        SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());
        log.info("同步-消息发送成功:" + LocalDateTime.now());
        return "rocketMq 同步-消息发送成功:" + result.getSendStatus();
    }

4.4 异步消息

/** 发送异步消息 */
    @GetMapping("/send/async")
    public String asyncSendMsg(){
        String message = "我是异步消息:" + LocalDateTime.now();
        rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("发送失败 (后执行)");
            }
        });
        return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();
    }

 4.5 单向消息:一般用来发送日志等不重要的消息

@GetMapping("/send/oneWay")
    public String sendOneWayMessage() {
        String message =  "我是单向消息:"+LocalDateTime.now();
        this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);
        log.info("单向发送消息完成:message = {}", message);
        return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();
    }

 

4.6 延时消息

/** 延时消息 */
    @GetMapping("/sendDelay")
    public String sendDelay(){
        String message = "我是延时消息:" + LocalDateTime.now();
        // 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h
        rocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);
        return "rocketMq延时-消息发送成功";
    }

4.7 事务消息

4.7.1 事务消息发送代码

/** 事务消息 */
    @GetMapping("/send/transaction/{id}")
    public void sendTransactionMessage(@PathVariable("id") Integer id){
        //发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
        //参数一:topic;参数二:消息
        // 事务id
        String[] tags = {"tagA", "tagB", "tagC"};
        int i = id%3;

        String transactionId = UUID.randomUUID().toString();
        String message = "我是事务消息:" + LocalDateTime.now();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i]
                , MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),
                // 给本地事务的参数
                2);
        //发送状态
        String sendStatus = result.getSendStatus().name();
        //本地事务执行状态
        String localState = result.getLocalTransactionState().name();
        log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);


    }

4.7.2 继承 RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
public class MyTransactionListener implements RocketMQLocalTransactionListener {


    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {

        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));
        try{
            //模拟网络波动
            Thread.sleep(3000);
            /***
             * 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。
             * 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。
             * 如果本地事务成功,消息会被提交并发送给消费者;
             * 如果失败,消息会被回滚,消费者不会接收到这个消息
             */

        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        // 执行本地事务
        String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
        if (StringUtils.equals("tagA", tag)){
            //这里只讲TAGA消息提交,状态为可执行
            return RocketMQLocalTransactionState.COMMIT;
        }else if (StringUtils.equals("tagB", tag)) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else if (StringUtils.equals("tagC",tag)) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        log.info("事务提交,消息正常处理: " + LocalDateTime.now());
        //执行成功,可以提交事务
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info(transactionId + ",消息回查"+ LocalDateTime.now());
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker 

 

 5. 消费端

/**
 * topic指定消费的主题,consumerGroup指定消费组,
 * 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
 *  2.实现RocketMQListener接口
 *  如果想拿到消息的其他参数可以写成MessageExt
 *  selectorExpression = "tagA || tagB" 指定tag 的消费
 */
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
public class RocketMqConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String s) {
        log.info("topic_test: 所有的收到消息:"+s);
    }

}

6.广播消费模式

生产端是一样的,但是消费端需要增加一个参数

messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String s) {
        log.info("consumer2---topic_test: 所有的收到消息:"+s);
    }

}

// 第2个消费者类,他们都是一样的代码,
//为了表示广播,就是一个消息,会被这两个消费者消费

@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{

    @Override
    public void onMessage(String s) {
        log.info("consumer1--topic_test: 所有的收到消息:"+s);
    }

}

7.其他

RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1596074.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python实验4

一、实验目的 掌握和使用进程池掌握和使用多线程掌握和使用互斥锁掌握协程的使用 二、实验内容 使用进程池计算区间内素数个数使用多线程计算区间内合数个数互斥锁的设计和使用异步状态转换器 三、实验环境 在Educoder平台进行实验 四、实验要求 根据每个实训的每个关卡要…

springCloudAlibaba集成seata实战(分布式事物详解)

一、分布式事务 1. 事务介绍 1.1 基础概念 事务&#xff1a;保证我们多个数据库操作的原子性&#xff0c;多个操作要么都成功要么都不成功 事务ACID原则 A&#xff08;Atomic&#xff09;原子性&#xff1a;构成事务的所有操作&#xff0c;要么都执行完成&#xff0c;要么全部…

使用冒泡排序模拟实现qsort函数

目录 冒泡排序qsort函数的使用1.使用qsort函数排序整型数据2.使用qsort函数排序结构数据 冒泡排序模拟实现qsort函数今日题目1. 字符串旋转结果2.杨氏矩阵3.猜凶手4.杨辉三角 总结 冒泡排序 冒泡排序的核心思想是:两两相邻的元素进行比较 代码如下: //⽅法1 void bubble_so…

ros-param添加参数控制rviz显示掉帧问题

在ros中有一套参数系统可以直接写到launch文件中&#xff0c;这样非常方便&#xff0c;不需要编译就能直接用&#xff0c;这对于c来说非常的有好&#xff0c;这里记录一下如何使用。 主要步骤如下&#xff1a; 首先初始化节点 使用nodehandle进行管理&#xff0c;然后通过param…

停车场道闸系统的实施流程有哪些安装注意事项?

随着城市交通压力的不断增加&#xff0c;停车场道闸系统的安装与优化成为了提升城市交通效率的关键环节。不同类型的停车场&#xff0c;如社区、园区、公共交通站点以及商业综合体等&#xff0c;都有其独特的运营特点和用户需求。因此&#xff0c;了解并掌握停车场道闸系统安装…

腾讯客户端开发实习一面

听说腾讯25年5000offer&#xff0c;我就去了...投完简历&#xff0c;当天晚上做完测评&#xff0c;第二天下午打电话约了第三天面试&#xff0c;额流程很快&#xff0c;快到第三天就寄了... 写在这里做个记录&#xff0c;也可以给学习学妹们经验&#xff0c;文末也有大厂面经合…

VSCode中vue的packag.json报错:unable to load schema from‘ http://json.schema‘...问题解决

package.json有这个报错&#xff0c;类似于这种问题一般是网络连接有问题&#xff0c;无法加载重启一下就好。 但是如果是没有网络或者云桌面等环境不能连接外网&#xff0c;就在设置中把这个设置一下&#xff0c;这样就不报错了&#xff0c;根据需要选择处理。

element问题总结之el-table使用fixed固定列后滚动条滑动到底部或者最右侧的时候错位问题

el-table使用fixed固定列后滚动条滑动到底部或者最右侧的时候错位 效果图前言解决方案纵向滑动滚动条滑动到底部的错位解决横向滚动条滑动到最右侧的错位解决 效果图 前言 在使用el-table固定行的时候移动滚动条会发现移动到底部或者移动到最右侧的时候会出现表头和内容错位或…

CentOS7使用Docker搭建Joplin Server并实现多端同步与公网使用本地笔记

文章目录 1. 安装Docker2. 自建Joplin服务器3. 搭建Joplin Sever4. 安装cpolar内网穿透5. 创建远程连接的固定公网地址 Joplin 是一个开源的笔记工具&#xff0c;拥有 Windows/macOS/Linux/iOS/Android/Terminal 版本的客户端。多端同步功能是笔记工具最重要的功能&#xff0c;…

python怎么输出小数

先将整型转换成float型&#xff0c;再进行计算&#xff0c;结果就有小数了。 >>> a 10 >>> b 4 >>> c a/b >>> a,b,c (10, 4, 2) >>> a float(a) >>> d a/b >>> a,b,d (10.0, 4, 2.5) >>> 注意&…

ES6-2:Iterator、Proxy、Promise、生成器函数...

11-Iterator迭代器 打印出的是里面的内容&#xff0c;如果是for in打印出来的是索引&#xff0c;of不能遍历对象Symbol.iterator是js内置的&#xff0c;可以访问直接对象arr[Symbol.iterator]&#xff0c;()调用对象非线性一般不能迭代 后两个是伪数组&#xff0c;但是是真迭…

QT、ffmpeg视频监控分屏

1、支持分屏&#xff08;4&#xff0c;6&#xff0c;8&#xff0c;9&#xff0c;13&#xff0c;16&#xff0c;25&#xff0c;32&#xff0c;64&#xff09;切换 2、支持拖拽效果 3、支持播放mp4&#xff0c;rtmp等 4、本人亲测支持播放32路&#xff0c;64路没做测试 5、支持读…

12.文件浏览器

子程序参数的使用 1.可空的用法&#xff1b;表示这个参数不写也行。 2.如何使用递归 3.需要注意的事 递归的子程序必须有个退出的条件 注意区分递归和循环&#xff0c;不要混用 流程&#xff1a; 1.插入按钮&#xff0c;输入输出调试文本&#xff08;“按钮被单击”&…

Windows本地部署Ollama+qwen本地大语言模型Web交互界面并实现公网访问

文章目录 前言1. 运行Ollama2. 安装Open WebUI2.1 在Windows系统安装Docker2.2 使用Docker部署Open WebUI 3. 安装内网穿透工具4. 创建固定公网地址 前言 本文主要介绍如何在Windows系统快速部署Ollama开源大语言模型运行工具&#xff0c;并安装Open WebUI结合cpolar内网穿透软…

三次握手与四次挥手到底是怎么回事?

三次握手和四次挥手是TCP/IP协议中建立和断开连接的关键步骤&#xff0c;它们是保证可靠通信的重要机制。这里将探讨这两个概念&#xff0c;并解释它们背后的原理。 三次握手 三次握手用于建立TCP连接&#xff0c;它由客户端和服务器之间发送的三个报文组成&#xff1a; 第一次…

竞赛 基于Django与深度学习的股票预测系统

文章目录 0 前言1 课题背景2 实现效果3 Django框架4 数据整理5 模型准备和训练6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于Django与深度学习的股票预测系统 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff…

怎么使用JMeter进行性能测试?

一、简介 JMeter是Apache软件基金会下的一款开源的性能测试工具&#xff0c;完全由Java开发。它专注于对我们应用程序进行负载测试和性能测量&#xff0c;最初设计用于web应用程序&#xff0c;现在已经扩展到其他测试功能&#xff0c;比如&#xff1a;FTP、Database和LDAP等。…

【题目】【信息安全管理与评估】2022年国赛高职组“信息安全管理与评估”赛项样题5

【题目】【信息安全管理与评估】2022年国赛高职组“信息安全管理与评估”赛项样题5 第一阶段竞赛项目试题 本文件为信息安全管理与评估项目竞赛-第一阶段试题&#xff0c;第一阶段内容包括&#xff1a;网络平台搭建与设备安全防护。 本次比赛时间为180分钟。 介绍 竞赛阶段…

Github 2024-04-09 Python开源项目日报 Top10

根据Github Trendings的统计,今日(2024-04-09统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目10Vue项目1JavaScript项目1系统设计指南 创建周期:2507 天开发语言:Python协议类型:OtherStar数量:241693 个Fork数量:42010 次…

广西有多少家建筑模板企业?

作为一个建筑大省,广西地区建筑模板企业数量可谓不少。这些企业规模大小不一,生产能力和产品质量参差不齐。然而,在这些企业中,有一家脱颖而出,备受业内推崇,那就是贵港市能强优品木业有限公司。 能强优品木业有限公司是广西知名的建筑模版生产厂家,拥有25年的丰富生产经验。公…