RocketMq 顺序消费、分区消息、延迟发送消息、Topic、tag分类 实战 (消费者) (三)

news2025/1/22 19:54:27

消费端配置
如下所示:是消费者的配置类,有以下几点需要注意的地方
1、是TargetMessageListener这个监听类(下文会把这个监听类的具体代码贴出来),需要把这个监听类订阅。
2、rocketMqDcProperties.getTargetProperties()这个方法里面有相关的配置信息(这里面有绑定消费者是那个组,因为一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致),具体代码见下文
3、subscription.setTopic(rocketMqDcProperties.getOrderTopic()) 是绑定Topic,这个Topic跟上篇文章生产者的Topic是一致的,这样就能保证消费者能准确消费到生产者发送的消息
4、subscription.setExpression(MqTagEnum.target.name()); 这个是一个消息的过滤,也是一个对消息的具体分类。详细用法请参考链接:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/message-filtering?spm=a2c4g.11186623.0.i38#concept-2047069
综上所述:
这段代码主要是用于配置一个 OrderConsumerBean 对象,设置其属性和订阅关系

@Configuration
public class TargetConsumerClient {

    @Autowired
    private RocketMqDcProperties rocketMqDcProperties;

    @Autowired
    private TargetMessageListener messageListener;

    @ConditionalOnProperty(name = "rocket.mq.dc.enabled", havingValue = "true", matchIfMissing = true)
    @Bean(initMethod = "start", destroyMethod = "shutdown",name = {"buildTargetConsumer"})
    public OrderConsumerBean buildTargetConsumer() {
        OrderConsumerBean orderConsumerBean = new OrderConsumerBean();
        //配置文件
        orderConsumerBean.setProperties(rocketMqDcProperties.getTargetProperties());
        //订阅关系
        Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(rocketMqDcProperties.getOrderTopic());
        subscription.setExpression(MqTagEnum.target.name());
        subscriptionTable.put(subscription, messageListener);
        orderConsumerBean.setSubscriptionTable(subscriptionTable);
        return orderConsumerBean;
    }
}
	public Properties getTargetProperties() {
        return this.getProperties(this.targetGroupId);
    }
    private Properties getProperties(String groupId) {
        Properties properties = new Properties();
        properties.setProperty("AccessKey", this.accessKey);
        properties.setProperty("SecretKey", this.secretKey);
        properties.setProperty("NAMESRV_ADDR", this.nameSrvAddr);
        properties.setProperty("GROUP_ID", groupId);
        properties.setProperty("ConsumeThreadNums", this.getConsumeThreadNums().toString());
        properties.setProperty("maxReconsumeTimes", this.getMaxReconsumeTimes().toString());
        properties.setProperty("consumeTimeout", this.getConsumeTimeout().toString());
        properties.setProperty("suspendTimeMillis", this.getSuspendTimeMillis().toString());
        return properties;
    }    

消费端代码
如下所示:TargetMessageListener 实现了MessageOrderListener接口,并如上文所示,其和OrderConsumerBean也绑定了订阅关系。

@Slf4j
@Component
public class TargetMessageListener implements MessageOrderListener {

    @Autowired
    private MqMessageRecordDao mqMessageRecordDao;

    @Autowired
    private TargetService targetServiceImpl;

    @Override
    public OrderAction consume(final Message message, final ConsumeOrderContext context) {

        log.info("MQ消息消费者监听消息内容:{}", message);

        MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo();
        try {
            String body = new String(message.getBody());
            String tag = message.getTag();
            mqMessageRecordMo.setMsgId(message.getMsgID());
            mqMessageRecordMo.setOrderTopic(message.getTopic());
            mqMessageRecordMo.setProducerIp(message.getBornHost());
            mqMessageRecordMo.setTag(tag);
            //mqMessageRecordMo.setMessageKey(message.getKey());
            mqMessageRecordMo.setShardingKey(message.getShardingKey());
            mqMessageRecordMo.setBodyJson(body)
            mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp()));
            mqMessageRecordMo.setCreateTime(LocalDateTime.now());
            mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId"));

            log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID());

            DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey());
            targetServiceImpl.process(message.getMsgID(),dataBase,body);

            log.info("MQ消息体消费监听解析结果:{}", body);
            mqMessageRecordMo.setIsSuccess(true);
            return OrderAction.Success;
        } catch (Exception e) {
            //消费失败,挂起当前队列
            // 存储错误消息,重试,记录日志
            /*log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e);
            mqMessageRecordMo.setIsSuccess(false);
            mqMessageRecordMo.setErrorMsg(e.getMessage());
            return OrderAction.Success;
        } finally {
            mqMessageRecordDao.save(mqMessageRecordMo);
        }
    }
}            

在这里想讲下顺序消息
顺序消息
顺序消息可以保证消息的消费顺序和发送的顺序一致,即先发送的先消费,后发送的后消费,常用于金融证券、电商业务等对消息指令顺序有严格要求的场景。本文介绍云消息队列 RocketMQ 版顺序消息的概念、适用场景、实现原理以及使用过程中的注意事项。
什么是顺序消息
顺序消息是云消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景
适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例
用户注册需要发送验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
阿里巴巴集团内部电商系统均使用分区顺序消息,既保证业务的顺序,同时又能保证业务的高性能。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景
适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
示例
在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
说明
全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高
如何实现顺序消息
在这里插入图片描述
在云消息队列 RocketMQ 版中,消息的顺序需要由以下三个阶段保证:
消息发送
如上图所示,A1、B1、A2、A3、B2、B3是订单A和订单B的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单A的消息发送和消费都按照A1、A2、A3的顺序。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时云消息队列 RocketMQ 版支持将Sharding Key相同(例如同一订单号)的消息序路由到一个队列中。
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。
消息存储
如上图所示,顺序消息的Topic中,每个逻辑队列对应一个物理队列,当消息按照顺序发送到Topic中的逻辑队列时,每个分区的消息将按照同样的顺序存储到对应的物理队列中。
消息消费
云消息队列 RocketMQ 版按照存储的顺序将消息投递给Consumer,Consumer收到消息后也不对消息顺序做任何处理,按照接收到的顺序进行消费。
Consumer消费消息时,同一Sharding Key的消息使用单线程消费,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致
注意事项
a、同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发。
b、对于全局顺序消息,建议消息不要有阻塞。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。
c、云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致

顺序消息常见问题
a、同一条消息是否可以既是顺序消息,又是定时消息和事务消息?
不可以。顺序消息、定时消息、事务消息是不同的消息类型,三者是互斥关系,不能叠加在一起使用。

b、顺序消息支持哪些地域?
支持云消息队列 RocketMQ 版所有公共云地域和金融云地域。

c、为什么全局顺序消息性能一般?
全局顺序消息是严格按照FIFO的消息阻塞原则,即上一条消息没有被成功消费,那么下一条消息会一直被存储到Topic队列中。如果想提高全局顺序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少处理本地业务逻辑的耗时。

d、顺序消息支持哪种消息发送方式?
顺序消息只支持可靠同步发送方式,不支持异步发送方式,否则将无法严格保证顺序。

e、顺序消息是否支持集群消费和广播消费?
顺序消息暂时仅支持集群消费模式,不支持广播消费模式。
以上文档链接来源:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/ordered-messages?spm=a2c4g.11186623.0.0.34b428e5LL1Jlh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532548.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

爬虫基础:HTTP基本原理

爬虫基础&#xff1a;HTTP基本原理 前言HTTP基本原理URI 和 URLHTTP 和 HTTPSHTTP 请求过程请求与响应HTTP请求HTTP响应请求与响应的交互过程 HTTP 2.0二进制传输多路复用Header压缩服务器端提前响应内容安全 前言 了解 HTTP的基本原理&#xff0c;了解从往测览器中输人 URL到获…

微服务day04(上)-- RabbitMQ学习与入门

1.初识MQ 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得到响应&#xff0c;但…

Tech Talks技术讲座中文培训-报名学习LPWAN、Matter、蓝牙和Wi-Fi最新开发技能!

Silicon Labs&#xff08;亦称“芯科科技”&#xff09;主办新一轮2024年“亚太区Tech Talks在线技术讲座”即将在5月9日至8月8日&#xff08;中文系列场次&#xff09;&#xff0c;以及4月24日至8月7日&#xff08;英文系列场次&#xff09;正式展开&#xff0c;现正热烈报名中…

spring boot学习第十四篇:使用AOP编程

一、基本介绍 1&#xff0c;什么是 AOP &#xff08;1&#xff09;AOP 为 Aspect Oriented Programming 的缩写&#xff0c;意为&#xff1a;面向切面编程&#xff0c;通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术。 &#xff08;2&#xff09;利用 AOP…

排水管网信息化平台:科技赋能,助力城市水环境管理升级

排水管网承担着城市污水、雨水的收集与排出的双重任务&#xff0c;是城市重要的基础设施。城市化率的不断提高&#xff0c;对城市基础设施的性能也提出了考验。 排水管网存在窨井监测设备不足、管段淤积、无序监管、污水超标排放等问题突出&#xff0c;导致部分污水直排受纳水…

Springboot笔记-05

1.Springboot的热部署 spring为开发者提供了一个名为spring-boot-devtools的模块来使Spring Boot应用支持热部署&#xff0c;提高开发者的开发效率&#xff0c;无需手动重启Spring Boot应用。 在pom文件加入依赖 <dependency> <groupId>org.springframework.boot…

像uniapp image标签一样对图片进行缩放和裁剪

像uniapp image标签一样对图片进行缩放和裁剪 0 前言提示1 实现1.1 不保持纵横比缩放图片&#xff0c;使图片的宽高完全拉伸至填满 image 元素1.2 保持纵横比缩放图片&#xff0c;使图片的长边能完全显示出来。也就是说&#xff0c;可以完整地将图片显示出来。1.3 保持纵横比缩…

NCV7428D15R2G中文资料PDF数据手册参数引脚图图片价格概述参数芯片特性原理

产品概述&#xff1a; NCV7428 是一款系统基础芯片 (SBC)&#xff0c;集成了汽车电子控制单元 (ECU) 中常见的功能。NCV7428 为应用微控制器和其他负载提供低电压电源并对其进行监控&#xff0c;包括了一个 LIN 收发器。 产品特性&#xff1a; 控制逻辑3.3 V或5 V VOUT电源&…

JavaScript 使用 Promise 实现 sleep 休眠

以下为代码实现&#xff0c;该代码实现了每隔1秒打印一次当前时间&#xff0c;总共打印5次的功能 for(let i 1; i < 5; i){console.log(new Date().toString())await new Promise(resolve>setTimeout(resolve,1000)) }实现休眠的核心代码为: await new Promise(resolv…

挖掘网络宝藏:利用Scala和Fetch库下载Facebook网页内容

介绍 在数据驱动的世界里&#xff0c;网络爬虫技术是获取和分析网络信息的重要工具。本文将探讨如何使用Scala语言和Fetch库来下载Facebook网页内容。我们还将讨论如何通过代理IP技术绕过网络限制&#xff0c;以爬虫代理服务为例。 技术分析 Scala是一种多范式编程语言&…

在windows上安装Jenkins

jenkins安装 下载jenkins 官网&#xff1a;Jenkins download and deployment 官方文档说明&#xff1a;Jenkins User Documentation 安装jenkins1.点击下载好的安装包&#xff0c;点击Next 2.选择一个安装路径 如果系统是windows家庭版打不开策略就创建一个txt文件&#xff0c…

node.js常用的命令

Node.js 是一个用于执行 JavaScript 代码的运行时环境。以下命令是 Node.js 开发中常用的命令&#xff0c;可以帮助你进行包管理、项目配置和代码执行等操作。 node -v&#xff1a;检查 Node.js 的版本。npm -v&#xff1a;检查 npm&#xff08;Node.js 包管理器&#xff09;的…

企业工商年报注册注销商标注册异常处理小程序开源版开发

企业工商年报注册注销商标注册异常处理小程序开源版开发 1、独立业务模型包括&#xff1a;企业工商年报、企业工商登记注册、企业注销登记、企业异常处理。 2、通用业务模型适合各种业务&#xff0c;比如&#xff1a;商标注册代理、财务会计服务、企业版权登记登。 当然&…

2024 用CleanMyMac X为您的MAC清理提速吧

CleanMyMac X 是由 MacPaw 公司开发的一款针对 macOS 操作系统的电脑清理工具。它可以帮助用户清理电脑中的垃圾文件、卸载不需要的软件、优化电脑性能等。它的界面简洁明了&#xff0c;操作简单易懂&#xff0c;非常适合普通用户使用。 链接: https://pan.baidu.com/s/1_TFnrI…

Zookeeper(六)Zokeeper 使用场景案例

目录 一 数据发布/订阅1.1 配置变更1.2 代码实现1.3 启动测试 二 负载均衡2.1 实现2.2 代码2.3 启动测试 三 分布式ID3.1 代码实现3.2 效果 四 服务器集群监控五 分布式锁2.1 排他锁2.2 共享锁 官网&#xff1a;Apache ZooKeeper 一 数据发布/订阅 数据发布/订阅(Publish/Sub…

怎样修改grafana的Loading picture和加载的文本

登录装了grafana的linux机器 command “sudo vi /usr/share/grafana/public/views/index.html”&#xff0c;编辑配置文件。 找到.preloader__logo更改background-image. 这里可以是个url也可以是个路径。 如果想要更改加载的文字.可以更改 的内容 改完:wq保存以后退出&…

从键盘到屏幕:C语言中输入输出探秘

在编程中&#xff0c;输入和输出是我们与计算机交流的关键。无论是键盘输入还是屏幕输出&#xff0c;它们贯穿了我们每一行代码的编写。本文将带你深入探索C语言中输入输出的精彩世界&#xff0c;解锁其中的奥秘&#xff0c;助你轻松驾驭键盘和屏幕&#xff01;&#xff08;最后…

C++ List底层实现

文章目录 前言成员变量成员函数迭代器self& operator()前置self operator(int)后置self operator--()前置--self operator--(int)后置--bool operator!(const self & tmp)判断是否相等T* operator*() 解引用操作 list()初始化iterator begin()iterator end()const_iter…

年度告警分类统计

1、打开前端Vue项目kongguan_web&#xff0c;完成前端src/components/echart/YearWarningChart.vue页面设计 在YearWarningChart.vue页面添加div设计 <template><div class"home"><div style"margin: 0px auto;height: 100%"><div …

seleniumUI自动化实例(CSDN发布文章)

1.CSDN登陆成功后&#xff0c;点击发布 源码&#xff1a; #点击首页中的发布按钮 CSDNconf.driver.find_element(By.LINK_TEXT,"发布").click() time.sleep(15) 2.输入标题 #输入文章标题&#xff0c;标题格式“selenium UI自动化测试实例今天的日期” CSDNconf.d…