仅此一招,再无消息乱序的烦恼

news2025/1/23 6:03:55

1. 概览

RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…

1.1. 背景

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。

在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:

// 计算 destination
protected String createDestination(String topic, String tag) {
    if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
        return topic + ":" + tag;
    }else {
        return topic;
    }
}
// 发送信息
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);

tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

但,在消费消息时,就变的没那么方便了,简单示例如下:

@Service
@RocketMQMessageListener(
    topic = "consumer-test-topic-1",
        consumerGroup ="user-message-consumer-1",
        selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY
)
@Slf4j
public class RocketBasedUserMessageConsumer extends UserMessageConsumer
    implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String tag = message.getTags();
        byte[] body = message.getBody();
        log.info("handle msg body {}", new String(body));
        switch (tag){
            case "UserCreatedEvent":
                UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);
                handle(createdEvent);
                return;
            case "UserEnableEvent":
                UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);
                handle(enableEvent);
                return;
            case "UserDisableEvent":
                UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);
                handle(disableEvent);
                return;
            case "UserDeletedEvent":
                UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);
                handle(deletedEvent);
                return;
        }
    }
}

该方法有几个问题:

  1. tag 维护成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部数据,增加通讯成本;如果使用 tag1 || tag2 方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;
  2. 充斥大量模板代码,比如 case 分支,反序列化,调用业务方法等;
  3. API 具有侵入性,开发是需要关心 RocketMQ API,存在一定学习成本;

1.2. 目标

提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:

  1. Tag 和代码保持一致,不需要多处配置,新增逻辑自动完成 Tag 注册;
  2. 消除模板方法,类中只保留核心业务方法,框架完成 方法分发、消息反序列化等操作;
  3. 代码零侵入,仅使用注解,无需了解 RocketMQ API;

2. 快速入门

框架依赖
rocketmq-spring-boot-starter 完成消息发送和回收。

2.1. 环境准备

2.1.1. 增加依赖

首先,增加 rocketmq 相关依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

然后,增加 lego starter。

<dependency>
    <groupId>com.geekhalo.lego</groupId>
    <artifactId>lego-starter</artifactId>
    <version>0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT</version>
</dependency>

2.1.2. 增加配置

在 application.yml 文件中增加 rocketmq 配置。

rocketmq:
  name-server: http://127.0.0.1:9876
  producer:
    group: rocket-demo

2.2. 定义消费者

定义消费者,只需:

  1. 在 Bean 上增加 @TagBasedDispatcherMessageConsumer 注解,并指定 topic 和 consumer
  2. 在 Bean 的方法上添加 @HandleTag 注解,并指定监听的 tag

示例如下:

@TagBasedDispatcherMessageConsumer(
        topic = "consumer-test-topic",
        consumer = "user-message-consumer"
)
public class UserMessageConsumer {
    private final Map<Long, List<UserEvents.UserEvent>> events = Maps.newHashMap();
    public void clean(){
        this.events.clear();;
    }
    public List<UserEvents.UserEvent> getUserEvents(Long userId){
        return this.events.get(userId);
    }
    @HandleTag("UserCreatedEvent")
    public void handle(UserEvents.UserCreatedEvent userCreatedEvent){
        List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userCreatedEvent);
    }
    @HandleTag("UserEnableEvent")
    public void handle(UserEvents.UserEnableEvent userEnableEvent){
        List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userEnableEvent);
    }
    @HandleTag("UserDisableEvent")
    public void handle(UserEvents.UserDisableEvent userDisableEvent){
        List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userDisableEvent);
    }
    @HandleTag("UserDeletedEvent")
    public void handle(UserEvents.UserDeletedEvent userDeletedEvent){
        List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());
        userEvents.add(userDeletedEvent);
    }
}

2.3. 测试

编写测试用例如下:

@SpringBootTest(classes = DemoApplication.class)
@Slf4j
class UserMessageConsumerTest {
    @Autowired
    private UserMessageConsumer userMessageConsumer;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private List<Long> userIds;
    @BeforeEach
    void setUp() throws InterruptedException {
        this.userMessageConsumer.clean();
        this.userIds = new ArrayList<>();
        for (int i = 0; i< 100; i++){
            userIds.add(10000L + i);
        }
        this.userIds.forEach(userId -> sendMessage(userId));
        TimeUnit.SECONDS.sleep(3);
    }
    private void sendMessage(Long userId) {
        String topic = "consumer-test-topic";
        {
            String tag = "UserCreatedEvent";
            UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();
            userCreatedEvent.setUserId(userId);
            userCreatedEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userCreatedEvent);
        }
        {
            String tag = "UserEnableEvent";
            UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();
            userEnableEvent.setUserId(userId);
            userEnableEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userEnableEvent);
        }
        {
            String tag = "UserDisableEvent";
            UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();
            userDisableEvent.setUserId(userId);
            userDisableEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userDisableEvent);
        }
        {
            String tag = "UserDeletedEvent";
            UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();
            userDeletedEvent.setUserId(userId);
            userDeletedEvent.setUserName("Name-" + userId);
            sendOrderlyMessage(topic, tag, userDeletedEvent);
        }
    }
    private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {
        String shardingKey = String.valueOf(event.getUserId());
        String json = JSON.toJSONString(event);
        Message<String> msg = MessageBuilder
                .withPayload(json)
                .build();
        String destination = createDestination(topic, tag);
        SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
        log.info("Send result is {} for msg", sendResult, msg);
    }
    protected String createDestination(String topic, String tag) {
        if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
            return topic + ":" + tag;
        }else {
            return topic;
        }
    }
    @AfterEach
    void tearDown() {
    }
    @Test
    void getUserEvents() {
        this.userIds.forEach(userId ->{
            List<UserEvents.UserEvent> userEvents = this.userMessageConsumer.getUserEvents(userId);
            Assertions.assertEquals(4, userEvents.size());
            Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);
            Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);
            Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);
            Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);
        });
    }
}

启动时,可以看到如下日志:

TagBasedDispatcherConsumerContainer : success to subscribe  http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer

从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。

测试逻辑比较简单,逻辑如下:

  1. 创建 100 个用户
  2. 每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent
  3. 消费发送完成后,停顿 3 秒
  4. 依次检测每个用户收到的消息,并对顺序进行检测

观察日志,可以看到发送和消费日志交替出现:

UserMessageConsumerTest        : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msg
TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms

用例通过,运行结果符合预期。

3. 设计&扩展

3.1. 初始化流程

image

 

框架初始化流程如下:

  1. TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,依次对托管 bean 进行处理;
  2. 如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 注解,便会提取配置信息,构建 TagBasedDispatcherConsumerContainer 实例
  3. TagBasedDispatcherConsumerContainer 收集方法上的 @HandleTag 注解,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等信息构建 DefaultMQPushConsumer 并完成 topic 和 tag 的订阅
  4. TagBasedDispatcherConsumerContainer 内部会构建 tag 与 method 的映射关系,以对指定tag进行处理;

3.2. 运行流程

 

image
运行流程如下:

  1. 消息发送者将消息发送至 MQ;
  2. MQ 将消息发送至 Consumer;
  3. Consumer 收到消息后,根据 tag 对消息进行分发;
  4. 处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/14621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AF488 NHS,AF488 活性酯,Alexa Fluor488 NHS,水溶性小分子绿色荧光标记染料

AF488 NHS通过引入两个磺酸根离子&#xff0c;AF488的水溶性大大增强&#xff0c;荧光强度增加&#xff0c;pH稳定性&#xff0c;光稳定性也提高&#xff0c;但是它的激发和发射谱图基本保持不变。不像荧光素类染料&#xff0c;AF488的荧光在较宽的pH范围内(4 – 10)保持不变。…

ATF源码篇(八):docs文件夹-Components组件(7)固件配置框架

7、固件配置框架 fconf/索引 本文档概述了固件配置框架 7.1 固件配置框架是什么&#xff1f; 1 介绍 固件配置框架&#xff08;|FCONF|&#xff09;是平台特定数据的抽象层&#xff0c;允许查询“属性”并检索值&#xff0c;而请求实体不知道使用什么后备存储来保存数据。 …

Java接口(Interface)

文章目录接口语法注意事项和细节实现接口VS.继承类接口的多态特性小练习usb插槽就是现实中的接口。 你可以把手机,相机,u盘都插在usb插槽上,而不用担心那个插槽是专门插哪个的,原因是做usb插槽的厂家和做各种设备的厂家都遵守了统一的规定包括尺寸&#xff0c;排线等等。 首先创…

ISP-Gamma

参考:https://blog.csdn.net/lxy201700/article/details/24929013 http://www.cambridgeincolour.com/tutorials/gamma-correction.htm 1. 什么是Gamma Gamma是一种指数曲线&#xff0c;显示器用这个指数曲线来调整真实输出到显示屏幕上的颜色值&#xff0c;以此更好的适应人…

卷?这份Java后端架构指南首次公开就摘星百万,肝完直接60K+

最近和各位小伙伴儿私下聊的比较多&#xff0c;各个阶段的朋友都有&#xff1b;因为大环境的内卷&#xff0c;导致大家在求学、求职、提升自己的各个方面都多多少少有些迷茫焦虑&#xff1b; 这些其实是一个非常普遍且正常的现象&#xff0c;会焦虑的人&#xff0c;往往都是对…

大学生简单个人静态HTML网页设计作品 HTML+CSS制作我的家乡杭州 DIV布局个人介绍网页模板代码 DW学生个人网站制作成品下载 HTML5期末大作业

常见网页设计作业题材有 个人、 美食、 公司、 学校、 旅游、 电商、 宠物、 电器、 茶叶、 家居、 酒店、 舞蹈、 动漫、 服装、 体育、 化妆品、 物流、 环保、 书籍、 婚纱、 游戏、 节日、 戒烟、 电影、 摄影、 文化、 家乡、 鲜花、 礼品、 汽车、 其他等网页设计题目, A…

蓝牙学习一(简介)

1.简介 蓝牙分为经典蓝牙&#xff08;BT-Bluetooth&#xff09;和低功耗蓝牙&#xff08;BLE-Bluetooth Low Energy&#xff09;&#xff0c;本次主要学习BLE。 BLE分了很多个版本&#xff0c;现在用的比较多的就是4.2和5.X。那4.2到5.0之间有哪些升级呢&#xff1f;首先&#…

多肽标签X-press,DLYDDDDK

X-press Tag Peptide 是一种N-端前导肽&#xff0c;Anti-Xpress 抗体能够识别 Xpress 表位&#xff0c;因此&#xff0c;该多肽可用来纯化 X-press Tag 融合蛋白。X-press Tag Peptide is a tag peptide used for protein purification. X-press Tag is also an N-terminal lea…

【附源码】计算机毕业设计JAVA商院足球赛事管理

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven Vue 等等组成&#xff0c;B/…

CAPL语言编译的那些事

CAPL是类似于C语言的面向过程语言,这是众所周知的。C或C++代码在执行前需要编译成机器语言,也就是二进制语言,如此能够更快速运行。CAPL程序也是一样的,需要编译后执行 在CAPL Browser编辑器下,Home -> Compile/Compile All,Compile编译当前打开的CAPL文件,Compile A…

基于51单片机的可调节占空比四种三种波形发生器proteus仿真

简介&#xff1a; 该系统显示器为LCD1602&#xff0c;可实时显示波形的参数情况可显示四种波形&#xff0c;分别是正弦波 三角波方波以及锯齿波该系统可以通过按键调节波形的占空比波形输出通过仿真软件的示波器可以查看得到波形发生器的核心芯片是利用DAC0832产生运放LM324经…

Jenkins部署的Windows爬虫机如何配置

文章目录一 安装软件1. Python爬虫必备安装包2. Visual Studio Code3. Git3.1. 备选 - OneDrive4. Java5. 向日葵二 配置Chrome1. 查看Chrome版本2. 下载ChromeDriver3. 解压放入Python的Scripts文件夹有时候, 一台Windows只是用来部署一些任务, 例如爬虫任务. 这个时候需要简单…

【人见人爱报错系列】GIt常见问题解决大全

前言 在使用的github\gitlab各种hub的过程中&#xff0c;会遇到各种各样的小问题&#xff0c;这些会给程序员们带来五光十色的烦恼&#xff0c;本文总结使用git的各种问题并持续更新。 一、Git用户名邮箱设置 使用git过程中&#xff0c;会切换不同项目但是发现提交人都是一样…

M1 芯片 MacBook 结合 MAMP 集成环境配置 PHP 环境变量

MacOS Catalina 版本之后 shell 改为使用 zsh 。 可以使用 echo $SHELL 命令查看。 配置文件分为系统级&#xff08;所有用户生效&#xff09;和用户级&#xff08;当前登录用户生效&#xff09;&#xff0c;可以自行了解&#xff0c;一般不经常切换用户的话&#xff0c;用户…

【Java多数据源实现教程】实现动态数据源、多数据源切换方式

前言 本文为 【Java多数据源实现教程】 相关知识&#xff0c;由于自己最近在做导师的项目的时候需要使用这种技术&#xff0c;于是自学了相关技术原理与实现&#xff0c;并将其整理如下&#xff0c;具体包含&#xff1a;多数据源的典型使用场景&#xff08;包含业务复杂场景、读…

JavaWeb:JavaWeb技术架构演进

Java Web&#xff0c;是用 Java 技术来解决相关web互联网领域的技术栈。web 包括&#xff1a;web 服务端和 web 客户端两部分。Java 在客户端的应用有 Java Applet&#xff0c;不过使用得很少&#xff0c;Java 在服务器端的应用非常的丰富&#xff0c;比如 Servlet&#xff0c;…

【MySQL数据库笔记 - 进阶篇】(四)视图/存储过程/触发器

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4da;专栏地址&#xff1a;暂定 &#x1f4dd;视频地址&#xff1a;黑马程序员 MySQL数据库入门到精通 &#x1f4e3;专栏定位&#xff1a;这个专栏我将会整理 B 站黑马程序员的 MySQL…

将多张图片制作gif

如何将多张图片制作gif&#xff1f;gif其实也是一种比较常见的图片格式&#xff0c;不过gif和其它图片有很大区别&#xff0c;gif是一种动态图片&#xff0c;相信很多小伙伴都知道。我们每天几乎也会看到或者使用到gif动图&#xff0c;有些gif动图是由几张静态的普通图片合并而…

深入理解java虚拟机-1.自动内存管理

文章目录1、自动内存管理1.1 Java内存区域与内存溢出异常1.1.1 运行时数据区域程序计数器程序计数器为什么是私有的?java虚拟机栈本地方法栈虚拟机栈和本地方法栈为什么是私有的?Java堆创建的对象一定会放在堆中吗&#xff1f;方法区运行时常量池直接内存1.1.2 HotSpot虚拟机…

渗透测试之信息收集

信息收集1.域名信息收集1.1.whois查询1.1.1.whois解释1.1.2.whois收集  1.1.2.1.在线网站查询  1.1.2.2.工具查询1.2.反查1.3.备案信息查询1.3.1.备案信息收集1.4.子域名收集1.4.1.子域名解释1.4.2.子域名收集  1.4.2.1.在线网站收集  1.4.2.2.工具收集  1.4.2.3.Goo…