【RocketMQ】RocketMQ标签、过滤及消息重复消费

news2024/11/28 10:50:01

【RocketMQ】RocketMQ标签、过滤及消息重复消费

文章目录

  • 【RocketMQ】RocketMQ标签、过滤及消息重复消费
    • 1. 标签(Tag)
      • 1.1 示例
    • 2. 键(Keys)
      • 2.1 示例
    • 3. 消息重复消费
      • 3.1 示例

参考文档: 官方文档

1. 标签(Tag)

Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。

注:

  • Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
  • Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

image-20230601163927542

什么时候应该用Topic,什么时候该用Tag?

可以从以下几个方面进行判断:

  • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
  • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
  • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
  • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。


1.1 示例

生产者发送含tag的消息到broker:

@Test
public void tagProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    //tag为vip1
    Message message = new Message("tagTopic", "vip1", "我是vip1".getBytes());
    //tag为vip2
    Message message2 = new Message("tagTopic", "vip2", "我是vip2".getBytes());
    producer.send(message);
    producer.send(message2);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者1接收 tag为vip1 的消息:

@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //tag为vip1
    consumer.subscribe("tagTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("我是VIP1的消费者,我正在消费消息:" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消费者2接收 tag为vip1和vip2 的消息:

@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("tagTopic", "vip1 || vip2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("我是VIP1和VIP2的消费者,我正在消费消息:" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

2. 键(Keys)

RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

注:msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。


2.1 示例

生产者发送一条 tag为key1 ,keys为key 的消息:

@Test
public void keyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("keyTopic", "key1", key, "我是key1".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者接收消息,得到keys:

@Test
public void keyConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("keyTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            System.out.println("我是VIP1的消费者,我正在消费消息:" + new String(messageExt.getBody()));
            System.out.println(messageExt.getMsgId());
            System.out.println("我们业务的标识:" + messageExt.getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

3. 消息重复消费

消息为什么会重复消费?

  1. 生产者重复投递消息
  2. 消费者扩容导致重平衡

如何解决:

  1. 数据库去重表,保证幂等
  2. 数据库唯一索引约束
  3. redis setnx
  4. 布隆过滤器

3.1 示例

生产者发送两条相同的消息:

@org.junit.jupiter.api.Test
public void repeatProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();

    String key = UUID.randomUUID().toString();
    System.out.println(key);
    //发送两条相同的消息,制造重复消费场景
    Message m1 = new Message("repeatTopic", null, key, "扣减库存".getBytes());
    Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存".getBytes());
    producer.send(m1);
    producer.send(m1Repeat);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者解决重复消费消息:

@org.junit.jupiter.api.Test
void repeatConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("repeatTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 先拿key
            MessageExt messageExt = msgs.get(0);
            String keys = messageExt.getKeys();
            // 原生方式操作
            Connection connection = null;
            try {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useSSL=false", "root", "123456");
            } catch (SQLException e) {
                e.printStackTrace();
            }
            PreparedStatement statement = null;

            try {
                // 插入数据库 因为我们 key做了唯一索引
                statement = connection.prepareStatement("insert into order_log(`type`, `order_sn`, `user`) values (1,'" + keys + "','123')");
            } catch (SQLException e) {
                e.printStackTrace();
            }

            try {
                // 新增 要么成功 要么报错   修改 要么成功,要么返回0 要么报错
                statement.executeUpdate();
            } catch (SQLException e) {
                System.out.println("executeUpdate");
                if (e instanceof SQLIntegrityConstraintViolationException) {
                    // 唯一索引冲突异常
                    // 说明消息来过了
                    System.out.println("该消息来过了");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                e.printStackTrace();
            }

            // 处理业务逻辑
            // 如果业务报错 则删除掉这个去重表记录 delete order_log where order_sn = keys;
            System.out.println(new String(messageExt.getBody()));
            System.out.println(keys);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消费者接收到消息,直接插入数据库,由数据库根据唯一索引约束对key进行判断,如果成功插入,则继续执行业务逻辑;如果失败,则直接返回。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/597827.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue学习2

文章目录 引入vue的分析render修改脚手架的默认配置修改步骤 refpropsmixin局部&#xff1a;全局总结 插件&#xff08;install&#xff09;总结 scoped案例总结浏览器本地存储保存读取删除清空 组件自定义事件绑定传递数据的两种方式解绑坑this使用原生的总结 全局事件总线消息…

攻防世界 mfw(Git源码泄露与命令执行漏洞)

目录 Git 源码泄露&#xff1a; 1、strpos() 函数 2、assert()函数 3、file_exists() 函数 4、die() 函数 代码审计&#xff1a; 命令执行漏洞&#xff1a; 打开链接 在About里发现网站是使用Git、PHP、Bootstrap搭建的 使用dirsearch扫一下 从结果可以看出确实存在.git …

在Centos7.9中安装postgresql15最新版本_参考官网说明安装---PostgreSQL工作笔记002

现在我要实现利用nifi同步,postgresql中的增量数据,也就是如果postgresql中出现增删改数据的时候,数据要自动同步到我们远程的mysql数据库中. 又难到我了...首先:去安装postgresql在centos7.9中,之所以在centos7.9中又安装了一遍,因为,我的大体思路,是利用postgresql的逻辑复制…

搭建flask后端和微信小程序前端

目录 一、准备工作 &#xff08;1&#xff09;我的前端代码 &#xff08;2&#xff09;我的后端代码 &#xff08;3&#xff09;后端运行成功的截图 &#xff08;4&#xff09;前端运行成功的截图 &#xff08;5&#xff09;整体运行成功的截图 二、部署后端 &#xff08…

【C++】什么是函数模板/类模板?

文章目录 一、函数模板1.什么是函数模板&#xff1f;2.函数模板格式3.函数模板原理4.函数模板实例化&#xff08;1&#xff09;隐式实例化&#xff08;2&#xff09;显示实例化 二.类模板1.类模板定义格式2.类模板的实例化 总结 一、函数模板 1.什么是函数模板&#xff1f; 函…

VBA之正则表达式(42)-- 提取代码中变量名称

实例需求&#xff1a;待处理代码段如下所示&#xff0c;现在需要提取其中的变量名称。 Public pFactor As Integer Sub TestCode() Dim reg As New RegExp, a As Workbook Dim ms As VBScript_RegExp_55.MatchCollection Dim m As VBScript_RegExp_55.Match Dim i, j Dim x1, y…

记一次udp服务性能优化经历

目录 概述磁盘io网络io减少重复计算减少内存复制减少互斥锁 概述 手上有个go项目&#xff0c;接收udp信息&#xff08;主要是syslog和snmp trap&#xff09;并查询设备信息&#xff0c;将信息结构化&#xff08;设备ip名称&#xff0c;匹配了什么规则之类的&#xff09;后发送…

生态系统NPP及碳源、碳汇模拟(土地利用变化、未来气候变化、空间动态模拟)

前言 由于全球变暖、大气中温室气体浓度逐年增加等问题的出现&#xff0c;“双碳”行动特别是碳中和已经在世界范围形成广泛影响。碳中和可以从碳排放&#xff08;碳源&#xff09;和碳固定&#xff08;碳汇&#xff09;这两个侧面来理解。陆地生态系统在全球碳循环过程中有着…

综述:图像分割

综述 图像分割(segmentation、cut)指的是将数字图像划分成多个图像子区域的过程。 在实际场景中具有诸多重要应用 在广义的图像分割中&#xff0c;传统方法和深度方法对于分割有不同的定义。 传统方法&#xff1a;对于图像进行区域划分&#xff0c;核心问题在于&#xff1a;区…

饿了么太狠:面个高级Java,抖这多硬活、狠活(饿了么面试真题)

前言&#xff1a; 在40岁老架构师尼恩的&#xff08;50&#xff09;读者社群中&#xff0c;经常有小伙伴需要面试饿了么、 头条、美团、阿里、京东等大厂。有很多的小伙伴&#xff0c;完成了人生的逆袭&#xff0c;拿到了高端的offer。 最近一个6年经验的小伙伴&#xff0c;年…

linux进程间通信(共享内存)

共享内存&#xff0c;顾名思义就是允许两个不相关的进程访问同一个逻辑内存&#xff0c;共享内存是两个正在运行的进 程之间共享和传递数据的一种非常有效的方式。不同进程之间共享的内存通常为同一段物理内存。进程可 以将同一段物理内存连接到他们自己的地址空间中&#xff0…

win10 编译 openssl

环境:系统win10 编译器:VS2015 准备: 一、openssl下载 官网&#xff1a;www.openssl.org 安装Perl,安装NASM. cmd下运行perl --version得出下面信息就说明安装perl安装成功. nasm --version得出下面信息,说明nasm安装成功 我以vs2015为例: 打开这个终端,之所以打开这个是因…

适合每一个对高光谱技术感兴趣,并想用python进行实践的人

总结了高光谱遥感技术领域的基础原理与核心概念&#xff0c;采用编程语言复现经典数据处理和应用方法&#xff0c;追踪了最新的技术突破&#xff0c;在消化理解、触类旁通之后&#xff0c;用即使是遥感“小白”也容易接受的方式分享给你。 高光谱遥感学习的第一季&#xff1a;提…

HTTP的缓存机制是什么?

HTTP 缓存机制是一种在 Web 开发中常用的技术&#xff0c;它旨在提高性能和减少网络流量。通过缓存&#xff0c;可以避免不必要的网络请求&#xff0c;减少服务器负载&#xff0c;并加快页面加载速度。下面是关于 HTTP 缓存机制的详细介绍。 HTTP 缓存机制的基本原理是将 Web …

LNMP网站框架搭建(yum方式)

目录 一、Nginx的yum安装 1&#xff09;搭建nginx相关的yum源 2&#xff09;刷新yum仓库&#xff0c;安装启动nginx服务 二、mysql的 yum 安装 1&#xff09;卸载一切与mysql有关的包 2&#xff09;wget mysql相关的yum源 附加&#xff1a;第二种方式&#xff08;与上…

单卡轻松打造 ChatGPT 竞争者“原驼”,QLoRA 革新大语言模型微调技术

出品人&#xff1a;Towhee 技术团队 作者&#xff1a;顾梦佳 由 OpenAI 推出的聊天机器人ChatGPT 爆火&#xff0c;带动 AI 受到了前所未有的关注。随之市面上也涌现出了各类开源的大语言模型&#xff08;LLM&#xff09;&#xff0c;其中 LLaMA “羊驼系列”最受关注、最具潜力…

Vue实现订单确认界面禁止浏览器返回操作导致重复提交订单的问题

哈喽 大家好啊 最近遇到一个问题&#xff0c;就是在提交订单成功后的页面&#xff0c;然后用户去浏览器返回&#xff0c;就导致又提交了一次 然后就想到了如果提交成功页面&#xff0c;就阻止浏览器返回操作 主要实现如下&#xff1a; 1.在mounted的钩子函数&#xff1a; 2.…

每日一练 | 华为认证真题练习Day50

1、SWA和SWB的MAC地址表中&#xff0c;MAC地址、VLAN、端口对应关系正确的有&#xff1f;&#xff08;多选&#xff09; 2、PPP帧格式中的Flag字段的取值为&#xff1f; A. 0xFF B. 0x7E C. 0xEF D. 0x8E 3、ICMP报文不包含端口号&#xff0c;所以无法使用NAPT。 A. 对 B…

[ Term ] 你真的了解 UTC 时间吗?它和 GMT 时间的区别是什么?

什么是 GMT 和 UTC&#xff0c;他们之间的区别是什么&#xff1f; GMT&#xff08;Greenwich Mean Time&#xff09;和UTC&#xff08;Coordinated Universal Time&#xff09;是两个不同的时间标准&#xff0c;但它们非常相似并且通常被混淆使用&#xff0c;那他们之间的区别在…

后端从入门到转岗,如何转型

一、技术介绍 我学习的技术很多&#xff1a; 例如&#xff1a;JAVA JS C# python vue mysql Oracle 等等。 学习也是从入门开始 &#xff0c;入口是JAVA 二、学习前的准备工作 学习前需要准备什么呢&#xff1f; 我觉得学习前最需要准备的是下定决心吃苦&#xff…