RocketMQ基础篇(一)

news2025/1/31 3:11:31

目录

  • 一、发送消息类型
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、顺序消费
    • 5、延迟消费
  • 二、消费模式
    • 1、集群模式
    • 2、广播模式
    • 3、消费模式扩展
    • 4、如何配置
  • 三、其他用法
    • 1、事务消息
    • 2、过滤消息
      • 1)Tag过滤
      • 2)SQL方式过滤

源码放到了GitHub仓库上,地址 https://github.com/shengwanping/SpringBoot-RocketMQ/tree/dev_01

一、发送消息类型

1、同步消息

发送同步消息是指producer向 broker发送消息,执行API时同步等待,直到broker服务器返回发送结果

// 可以使用RocketMQTemplate类下面的syncSend方法
SendResult sendResult = rocketMQTemplate.syncSend("topic_001", "Hello RocketMQ 同步消息");
System.out.println(sendResult);

2、异步消息

指producer向broker发送消息时异步执行,不会影响后面逻辑。而异步里面会调用一个回调方法,来处理消息发送成功或失败的逻辑

// 可以使用RocketMQTemplate类下面的asyncSend方法
rocketMQTemplate.asyncSend("topic_001", "Hello RocketMQ 异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步消息 发送成功!");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步消息 发送失败!");
            }
        });

3、单向消息

是指producer向 broker发送消息,执行API时直接返回,不等待broker 服务器的响应

// 可以使用RocketMQTemplate类下面的sendOneWay方法
rocketMQTemplate.sendOneWay("topic_001", "Hello RocketMQ 单项消息");

4、顺序消费

就是让消费者按照生产者发送消息的顺序去消费。

应用场景:比如电商系统需要实现,订单创建、支付、完成顺序的流程。RocketMQ默认是并发消费,没有顺序的。需要顺序消费需要通过如下配置:

首先消费者@RocketMQMessageListener注解,consumeMode 设置为ConsumeMode.ORDERLY

@RocketMQMessageListener(topic = "topic_001",
        consumerGroup = "${rocketmq.consumer.group}",
        consumeMode = ConsumeMode.ORDERLY)

然后生产者调用含有Orderly的方法:

	//                                topic          消息               队列
	rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-创建", "1001");
    rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-支付", "1001");
    rocketMQTemplate.syncSendOrderly("topic_001", "1001顺序消费-完成", "1001");

    rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-创建", "1002");
    rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-支付", "1002");
    rocketMQTemplate.syncSendOrderly("topic_001", "1002顺序消费-完成", "1002");

在消费者打印结果:

1001顺序消费-创建
1001顺序消费-消费
1001顺序消费-完成
1002顺序消费-创建
1002顺序消费-消费
1002顺序消费-完成

5、延迟消费

就是生产者设定延迟时间,时间到了消费者才能去消费

应用场景:一种比较常见的场要就是在电商系统中,订单创建后,会有一个等待用户支付的时间窗口,一般为30分钟,30分钟后customer会收到这条订单满息,然后程序去订单表中检查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式去处理了,

RocketMQ不能自定义延迟时间,有特定等级如下
延迟等级 0 不延迟,1 延时1s,2 延时5s,3 延时10s,4 延时 30s,以此类推。。。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

// 延迟方法                         topic           消息      默认3秒,没有发送消息会抛出异常   延迟等级
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel)
// 演示
rocketMQTemplate.syncSend("topic_001", MessageBuilder.withPayload("延迟消费5秒").build(), 3000, 2);
rocketMQTemplate.syncSend("topic_001", MessageBuilder.withPayload("延迟消费30秒").build(), 3000, 4);

二、消费模式

RocketMQ的消费者有两种消费模式:BROADCASTING广播模式,CLUSTERING集群模式,默认集群消费模式。

1、集群模式

理解:

如果这个消费者组都是集群模式,那么这个消费者组会去平分这个topic下面的消息,且一条消息只能被一个消费者消费

举个例子:

生产者给topic_1发送了10条消息,消费topic_1的这个消费者组有2个消费者,那么这两个消费者就会平分这10条消息,每个消费者5条消息。
但是经测试有时也会一个消费者6条,另一个消费者4条。(这个问题笔者占时也不清楚,待解答)

2、广播模式

理解:

如果这个消费者组都是广播模式,那么这个消费者组中的每个消费者都会去执行这个topic下面所有的消息,相当于一条消息会被执行多次

举个例子:

生产者给topic_2发送了10条消息,消费topic_2的这个消费者组有2个消费者,那么这两个消费者都会去消费这10条消息

3、消费模式扩展

生产者给Topic_1推送了10条消息,然后同时有两个消费者组对Topic_1进行消费。

消费者组1 中有两个消费者,分别是消费者A和消费者B,消费者A是集群模式,消费者B是广播模式。
消费者组2 中有一个消费者,是消费者C,消费者C是广播模式。

这个时候是怎么消费的呢?

答案是:消费者C会全量消费10条消息,消费者B也会全量消费10条消息,而消费者A只会消费一半消息(可能4条、5条、6条)

解释:由上可以看出,消费多少消息是由消费者的消费模式决定的。因为B、C都是广播模式,所以会消费这个Topic下面所有消息,而A是集群模式,他只会消费到的消息是
消费消息数量 = Topic中消息总数/消费者组中消费者数量

4、如何配置

在消费者 @RocketMQMessageListener注解中配置messageModel 参数,(没有设置默认集群模式)

设置为MessageModel.CLUSTERING,则是集群模式
设置为MessageModel.BROADCASTING,则是广播模式

如下:

@RocketMQMessageListener(topic = "topic_001",
 						consumerGroup = "${rocketmq.consumer.group}", 
 						messageModel = MessageModel.CLUSTERING)

三、其他用法

1、事务消息

Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit或Rollback)。可以看出, Message Status Check主要用来解决分布式事务中的超时问题。

在这里插入图片描述
1.应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
2. prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
3. 根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
4. 如果是Commit, MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
5. 第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息) ,处理结果同第4步。
6. MQ消费的成功机制由MQ自己保证。

生产者发送事务消息:

rocketMQTemplate.sendMessageInTransaction("topic_001", MessageBuilder.withPayload("事务消息").build(), null);
// rocketmq事务消息  配置类
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 执行本地事务,如果是COMMIT则消息发送成功,如果是ROLLBACK则直接丢弃消息,如果是UNKNOWN则调用checkLocalTransaction()
        try {
            System.out.println("executeLocalTransaction");
        }catch (Exception e){
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 检查本地事务(最多调用15次,如果全部失败则ROLLBACK丢弃消息)
        System.out.println("checkLocalTransaction");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

2、过滤消息

在消费端进行消息消费的时候,我们根据业务需求,可以对消息进行过滤处理需要的消息
尤其是广播模式下,消息过滤经常使用
RocketMQ提供了TAG和SQL表达式两种消息过滤方式

1)Tag过滤

生产者需要在Topic后面加上 冒号 + TAG
消费者需要配置 selectorType = SelectorType.TAGselectorExpression

消费端配置如下:

@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001",
        consumerGroup = "${rocketmq.consumer.group}",
        selectorType = SelectorType.TAG, selectorExpression = "TAG1 || TAG2")
public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口
    @Override
    public void onMessage(String s) {
        System.out.println("收到的消息是:"+s);
    }
}

生产端发送消息如下:

	rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG1", MessageBuilder.withPayload("TAG1消息").build());
	rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG2", MessageBuilder.withPayload("TAG2消息").build());
	rocketMQTemplate.convertAndSend("topic_001"+":"+"TAG3", MessageBuilder.withPayload("TAG3消息").build());

消费端打印:

收到的消息是:{"payload":"TAG1消息","headers":{"id":"3df5f1a5-cbb2-fac5-e95b-489f29bc4a77","timestamp":1678204919985}}
收到的消息是:{"payload":"TAG2消息","headers":{"id":"1112d1cf-e1a9-bc2c-b38d-59a667196385","timestamp":1678204920260}}

2)SQL方式过滤

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。
RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

数字比较,如>,>=, <,<=,
BETWEEN, =;
字符比较,如:=,<>,IN;
IS NULL or IS NOT NULL;
逻辑运算符:AND, OR, NOT;

常量类型:
数值,如:123,3.1415;
字符,如: ‘abc’,必须使用单引号;
NULL,特殊常量
Boolean, TRUE or FALSE;

首先要在broker配置文件里面加入支持,否则会报错

1、rocketmq-4.4.0\conf\broker.conf 加入enablePropertyFilter = true
2、重启broker 并指定 配置文件 mqbroker.cmd -n localhost:9876 autoCreateToopicEnable=true -c ../conf/broker.conf

完成上面两步操作接口用sql方式过滤消息

生产者:

		Message msg1 = MessageBuilder.withPayload("rocketmq过滤消息测试01").build();
        Map<String, Object> headers = new HashMap<>();
        headers.put("name", "xiao ming");
        headers.put("a", 2) ;
        rocketMQTemplate.convertAndSend("topic_001", msg1, headers);

        Message msg2 = MessageBuilder.withPayload("rocketmq过滤消息测试02").build();
        Map<String, Object> headers1 = new HashMap<>();
        headers1.put("name", "xiao hua");
        headers1.put("a", 7) ;
        rocketMQTemplate.convertAndSend("topic_001", msg2, headers1);

消费者: 主要是selectorType = SelectorType.SQL92, selectorExpression = "name = 'xiao ming' and a < 5")

@Component
// 指定topic 和 消费者组
@RocketMQMessageListener(topic = "topic_001",
        consumerGroup = "${rocketmq.consumer.group}",
//        selectorType = SelectorType.TAG, selectorExpression = "TAG1 || TAG2")
        selectorType = SelectorType.SQL92, selectorExpression = "name = 'xiao ming' and a < 5")
public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口
    @Override
    public void onMessage(String s) {
        System.out.println("收到的消息是:"+s);
    }
}

消费者打印:
根据过滤条件只打印了第一条消息

收到的消息是:{"payload":"rocketmq过滤消息测试01","headers":{"id":"da3ac866-4140-b440-2f4c-ecb5ce4d9965","timestamp":1678206096192}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/396585.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HyperLPR3车牌识别-Android-SDK光速部署与使用

简介HyperLPR在2023年初已经更新到了v3的版本&#xff0c;该版本与先前的版本一样都是用于识别中文车牌的开源图像算法项目&#xff0c;最新的版本的源码可从github中提取&#xff1a;https://github.com/szad670401/HyperLPRHyperLPR-Android-SDK for JitPackHyperLPR3的官方源…

Prim算法和Kruskal算法到底哪个好?

Prim和Kruskal有啥区别&#xff1f;到底哪个好&#xff1f; 今天做了一道最小生成树的题&#xff0c;发现了一点猫腻&#xff01; 题目在这里 &#xff1a; 《修路问题1》 文章目录Prim和Kruskal有啥区别&#xff1f;到底哪个好&#xff1f;先说结论PrimKruskal修路问题1——…

不好!有敌情,遭到XSS攻击【网络安全篇】

XSS&#xff1a;当一个目标的站点&#xff0c;被我们用户去访问&#xff0c;在渲染HTMl的过程中&#xff0c;出现了没有预期到的脚本指令&#xff0c;然后就会执行攻击者用各种方法注入并执行的恶意脚本&#xff0c;这个时候就会产生XSS。 涉及方&#xff1a; 用户&#xff0…

Linux端安装MySQL并实现远程连接Navicat

文章目录Linux端安装MySQL&#xff08;centos版本&#xff09;Linux端安装MySQL&#xff08;centos版本&#xff09; 1、先将MySQL需要的四个rpm安装包上传上去&#xff0c;这里可以使用Xftp软件或者是通过window端使用ftp文件传输方式上传到Linux端&#xff0c;这里选择Xftp来…

基于JavaWeb学生选课系统开发与设计(附源码资料)

文章目录1. 适用人群2. 你将收获3.项目简介4.技术实现5.运行部分截图5.1.管理员模块5.2.教师模块5.3.学生模块1. 适用人群 本课程主要是针对计算机专业相关正在做毕业设计或者是需要实战项目的Java开发学习者。 2. 你将收获 提供&#xff1a;项目源码、项目文档、数据库脚本…

远程办公18年,把一个开源工具变成了价值 75亿美元的跨国企业

把自己的兴趣做成了一份事业&#xff0c;把一个开源工具发展成为一家价值75亿美元的跨国企业&#xff0c;而且还是那种员工做梦都想进入的公司&#xff0c;真正实现了功成名就&#xff0c;这或许是所有程序员的梦想吧。 先来看看这家公司的福利&#xff1a; 员工拥有没有限制的…

git快速入门(1)

1 git的下载与安装1&#xff09;下载git安装包下载路径&#xff1a;https://git-scm.com/我的操作系统是window&#xff0c;64位的&#xff0c;我下载的Git-2.33.0-64-bit.exe&#xff0c;从官网下载或者从网址下载链接&#xff1a;链接地址&#xff1a;https://pan.baidu.com/…

【MySQL】P8 多表查询(2) - 连接查询 联合查询

连接查询以及联合查询多表查询概述连接查询内连接隐式内连接显式内连接外连接左外连接右外连接自连接联合查询多表查询概述 建表语句见上一篇博文&#xff1a;https://blog.csdn.net/weixin_43098506/article/details/129402302 e.g.e.g.e.g. select * from emp, dept where e…

深入分析@Configuration源码

文章目录一、源码时序图1. 注册ConfigurationClassPostProcessor流程源码时序图2. 注册ConfigurationAnnotationConfig流程源码时序图3. 实例化流程源码时序图二、源码解析1. 注册ConfigurationClassPostProcessor流程源码解析&#xff08;1&#xff09;运行案例程序启动类Conf…

Python安装、断点调试

一、安装Python方法 1.1 在Microsoft Store微软商店中搜索Python安装&#xff08;推荐&#xff09; 或直接在cmd中Python运行 已经安装了就显示版本号&#xff0c; 如果没有安装过&#xff0c;会直接跳到微软商店 1.2 在python官网中找最新版下载安装 二、VSCODE中运行与断点…

容易混淆的嵌入式(Embedded)术语

因为做嵌入式开发工作虽然跳不出电子行业&#xff0c;但还是能接触到跨度较大的不同行当&#xff0c;身处不同的圈子。诸如医疗&#xff0c;银行&#xff0c;车载&#xff0c;工业&#xff1b;亦或者手机&#xff0c;PC&#xff0c;专用芯片&#xff1b;甚至可能横跨系统开发、…

Vue常见的事件修饰符

前言 vue一共给我们准备了6个事件修饰符&#xff0c;前三个比较常用&#xff0c;后三个少见&#xff0c;这里着重讲下前三个 1.prevent:阻止默认事件(常用) 2. stop:阻止事件冒泡(常用) 3. once:事件只触发一次(常用) 4.captrue:使用事件的捕捉模式(不常用) 5.self:只有event…

案例10---对生产环境的敬畏--生产环境

一&#xff1a;背景介绍 1&#xff1a;上午9:23&#xff0c;老师没有进行上课&#xff0c;但是却又很多的在线人员&#xff0c;并且在线人员的时间也不正确&#xff0c;用户反映问题。 2&#xff1a;开发人员发现用户上课情况异常。 3&#xff1a;10点整&#xff0c;询问项目…

Notepad++ 下载与安装教程

文章目录Notepad 下载与安装教程Notepad 简介一&#xff0c;Notepad 下载二&#xff0c;Notepad 安装Notepad 下载与安装教程 Notepad 简介 Notepad是程序员必备的文本编辑器&#xff0c;Notepad中文版小巧高效&#xff0c;支持27种编程语言&#xff0c;通吃C,C ,Java ,C#, XM…

Android Execution failed for task ‘:app:mergeDebugJavaResource

错误提示 FAILURE: Build failed with an exception.* What went wrong: Execution failed for task :app:mergeDebugJavaResource. > A failure occurred while executing com.android.build.gradle.internal.tasks.MergeJavaResWorkAction> 2 files found with path k…

不写代码、年薪百万,带你玩赚ChatGPT提示工程-提示应用程序

文章目录前言一、数据生成二、PAL (Program-Aided Language Models): Code as Reasoning总结前言 随着ChatGPT的大火&#xff0c;提示工程在大模型中的重要性不言而喻&#xff0c;本文参考国外Prompt Engineering Guide完成国内中文版本的《提示工程指南》&#xff0c;希望能够…

一文读懂倒排序索引涉及的核心概念

基础概念相信对于第一次接触Elasticsearch的同学来说&#xff0c;最难理解的概念就是倒排序索引&#xff08;也叫反向索引&#xff09;&#xff0c;因为这个概念跟我们之前在传统关系型数据库中的索引概念是完全不同的&#xff01;在这里我就重点给大家介绍一下倒排序索引&…

DOTA双功能螯合剂127985-74-4,p-SCN-Bn-DOTA,实验室科研试剂

p-SCN-Bn-DOTA产品描述&#xff1a;p-SCN-Bn-DOTA用于标记多肽的双功能螯合剂&#xff0c;同时螯合放射性核素和连接单克隆抗体。DOTA 的全名是 1,4,7,10-Tetraazacyclododecane-1,4,7,10-tetraacetic acid&#xff0c;中文名称为 1,4,7,10-四氮杂环十二烷-四乙酸&#xff0c;其…

Linux中断操作

一、thread_irq在内核中&#xff0c; 除了可以通过request_irq() 、 devm_request_irq()申请中断以外&#xff0c; 还可以通过以下二个函数申请( 它们比request_irq和devm_request_irq多了一个参数thread_fn)。 用这两个API申请中断的时候&#xff0c; 内核会为相应的中断号分配…

steam海外道具搬运,2个月变现1.6万,真的假的?

这几年的环境&#xff0c;让我这个身负房贷的房奴&#xff0c;实在是喘不过来气&#xff01; 也是无意间在朋友圈看到&#xff0c;之前突然裸辞的同事&#xff0c;不知道干什么发了财&#xff0c;竟然自己开了公司&#xff01; 几经询问才知道&#xff0c;他就是利用steam海外…