系列十一(实战)、发送 接收带标签的消息(Java操作RocketMQ)

news2025/2/26 3:06:27

一、发送 & 接收带标签的消息

1.1、概述

        消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。

        其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。

1.2、订阅关系一致性

        订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考

订阅关系一致文档,这里不再赘述。

1.3、Demo07MQTestApp 

/**
 * @Author : 一叶浮萍归大海
 * @Date: 2023/12/25 13:03
 * @Description: 发送 & 接收带标签的消息
 */
@Slf4j
public class Demo07MQTestApp {


    /**
     * 发送带标签的消息
     */
    @Test
    public void demo7Producer() throws Exception {
        // 1、创建一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");

        // 2、连接NameServer
        producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);

        // 3、启动
        producer.start();

        // 4、创建消息
        String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};
        for (int i = 1; i <= 6; i++) {
            String tag = tags[i % tags.length];
            String content = "";
            switch (tag) {
                case "NBA":
                    content = "this is a message about NBA,消息编号[" + i + "]";
                    break;
                case "run":
                    content = "this is a message about run,消息编号[" + i + "]";
                    break;
                case "star":
                    content = "this is a message about star,消息编号[" + i + "]";
                    break;
                case "mobile":
                    content = "this is a message about mobile,消息编号[" + i + "]";
                    break;
                case "tourism":
                    content = "this is a message about tourism,消息编号[" + i + "]";
                    break;
                default:
                    content = "this is a message about foods,消息编号[" + i + "]";
                    break;
            }
            Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));
            // 5、发送消息
            producer.send(message);
            log.info("【demo7Producer】发送消息成功,消息内容:{}",content);
        }

        // 关闭producer
        producer.shutdown();
    }

    /**
     * 接收带标签的消息(Push方式)
     */
    @Test
    public void demo7PushConsumer1() throws Exception {
        // 1、创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");
        // 2、连接NameServer
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 3、订阅消息,*表示订阅该主题所有的消息
        consumer.subscribe("tag-topic", "NBA");
        // 4、设置监听器(采用异步回调方式,一直监听)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));
                }
                /**
                 * 返回值:消费消息成功与否
                 *      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队
                 *      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5、启动
        consumer.start();
        log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");

        // 6、挂起当前JVM
        System.in.read();
    }

    @Test
    public void demo7PushConsumer2() throws Exception {
        // 1、创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");
        // 2、连接NameServer
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 3、订阅消息,*表示订阅该主题所有的消息
        consumer.subscribe("tag-topic", "NBA || star || mobile");
        // 4、设置监听器(采用异步回调方式,一直监听)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));
                }
                /**
                 * 返回值:消费消息成功与否
                 *      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队
                 *      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5、启动
        consumer.start();
        log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");

        // 6、挂起当前JVM
        System.in.read();
    }

}

1.4、测试

        先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。

1.5、Topic和Tag如何选择

        不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:

(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;

(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;

(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;

(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;

        总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1334364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL定时备份实现

一、备份数据库 –all-databases 备份所有数据库 /opt/mysqlcopy/all_$(date “%Y-%m-%d %H:%M:%S”).sql 备份地址 docker exec -it 容器名称 sh -c "mysqldump -u root -ppassword --all-databases > /opt/mysqlcopy/all_$(date "%Y-%m-%d %H:%M:%S").sq…

SSRF中Redis的利用

目录 1. SSRF 1.1 什么是SSRF 1.2 漏洞成因 1.3 可能会存在SSRF的地方 1.4 SSRF分类 1.5 验证方法 1.6 利用方式 1.7 可以利用的协议 1.8 SSRF过滤绕过 2. SSRF攻击Redis 2.1 环境搭建 2.2 漏洞复现(通过ssrf利用redis写入webshell) 2.2.1 想要写入webshell的两个…

ECMAScript 的未来:预测 JavaScript 创新的下一个浪潮

以下是简单概括关于JavaScript知识点以及一些目前比较流行的比如&#xff1a;es6 想要系统学习&#xff1a; 大家有关于JavaScript知识点不知道可以去 &#x1f389;博客主页&#xff1a;阿猫的故乡 &#x1f389;系列专栏&#xff1a;JavaScript专题栏 &#x1f389;ajax专栏&…

ChatGPT/GPT4+AI绘图+论文写作+编程结合到底有多强大?带你详细了解

ChatGPT在论文写作与编程方面具备强大的能力。无论是进行代码生成、错误调试还是解决编程难题&#xff0c;ChatGPT都能为您提供实用且高质量的建议和指导&#xff0c;提高编程效率和准确性。此外&#xff0c;ChatGPT是一位出色的合作伙伴&#xff0c;可以为您提供论文写作的支持…

Tofu5m目标识别跟踪模块 跟踪模块

Tofu5m 是高性价比目标识别跟踪模块&#xff0c;支持可见光视频或红外网络视频的输入&#xff0c;支持视频下的多类型物体检测、识别、跟踪等功能。 产品支持视频编码、设备管理、目标检测、深度学习识别、跟踪等功能&#xff0c;提供多机版与触控版管理软件&#xff0c;为二次…

智慧零售技术探秘:关键技术与开源资源,助力智能化零售革新

智慧零售是一种基于先进技术的零售业态&#xff0c;通过整合物联网、大数据分析、人工智能等技术&#xff0c;实现零售过程的智能化管理并提升消费者体验。 实现智慧零售的关键技术包括商品的自动识别与分类、商品的自动结算等等。 为了实现商品的自动识别与分类&#xff0c;…

echarts属性名

theme {// 全图默认背景// backgroundColor: rgba(0,0,0,0),// 默认色板color: [#ff7f50,#87cefa,#da70d6,#32cd32,#6495ed,#ff69b4,#ba55d3,#cd5c5c,#ffa500,#40e0d0,#1e90ff,#ff6347,#7b68ee,#00fa9a,#ffd700,#6699FF,#ff6666,#3cb371,#b8860b,#30e0e0],// 图表标题title: {…

DshanMCU-R128s2硬件设计指南

硬件设计指南 原理图设计 硬件系统框图 R128是一颗专为“音视频解码”而打造的全新高集成度 SoC&#xff0c;主要应用于智能物联和专用语音交互处理解决方案。 单片集成 MCURISCVDSPCODECWIFI/BTPMU&#xff0c;提供生态配套成熟、完善的用于系统、应用和网络连接开发的高效…

Web 3.0 是什么

第 1 章 明晰Web 3.0 从本章开始,就进入了本书的第一篇章,入门Web3.0,在第一篇章中将会让读者对Web3.0有一个整体的认知,为学习后面的章节打下基础。 在本章中,主要介绍的是Web的发展历史,包涵Web1.0、Web2.0、Web3.0的发展过程,以及资本为什么需要入场Web3.0、Web3.0…

新玩法!如何在 PieCloudDB Database 中“种”一棵圣诞树?

随着圣诞节的到来&#xff0c; 很多城市也都张灯结彩&#xff0c; 处处充满了节日气息。 圣诞节当然离不开圣诞树啦&#xff01; 和家人一起挂上圣诞装饰&#xff0c; 树下放上互相准备的小礼物&#xff0c; 小小的仪式感&#xff0c; 充满了浪漫与温馨。 今天&#xff…

Python 高级(二):使用 webbrowser 控制浏览器

大家好&#xff0c;我是水滴~~ 本文将介绍 webbrowser 模块的详细使用方法&#xff0c;文章中包含大量的示例代码&#xff0c;希望能够帮助新手同学快速入门。 《Python入门核心技术》专栏总目录・点这里 文章目录 前言一、导入webbrowser模块二、打开网页&#xff08;open&am…

项目联系 Spring Boot + flowable 快速实现工作流

总览 使用flowable自带的flowable-ui制作流程图 使用springboot开发流程使用的接口完成流程的业务功能 基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 用户小程序&#xff0c;支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信…

DN-DETR调试记录

先前的DN-DETR模型都是在服务器上运行的&#xff0c;后来在本地运行时出现了一些小问题&#xff0c;这篇博文则主要介绍DN-DETR模型在本地运行时所需要做的配置。 运行环境 首先DN-DETR的运行环境与DINO一致&#xff0c;这里就不再赘述了。 博主使用的本地配置是I7-13700H406…

2024 年 10大 AI 趋势

2025 年&#xff0c;全球人工智能市场预计将达到惊人的 1906.1 亿美元&#xff0c;年复合增长率高达 36.62%。 人工智能软件正在迅速改变我们的世界&#xff0c;而且这种趋势在未来几年只会加速。 我们分析了未来有望彻底改变 2024 年的 10 个AI趋势。从生成式人工智能的兴起到…

TensorFlow入门和案例分析

一、什么是TensorFlow 在这里&#xff0c;引入TensorFlow中文社区首页中的两段描述。 关于 TensorFlow TensorFlow™ 是一个采用数据流图&#xff08;data flow graphs&#xff09;&#xff0c;用于数值计算的开源软件库。节点&#xff08;Nodes&#xff09;在图中表示数学操作…

免费的苹果清理软件2024新版mac清理大师CleanMyMac官版下载

免费的苹果清理软件2024新版mac清理大师CleanMyMac官版下载 作为一款专业的mac电脑系统管家&#xff0c;CleanMymac X一直致力于更加智能、便捷地全方位维护我们的电脑&#xff0c;它囊括了多种系统工具&#xff0c;包括电脑智能体检、扫描系统垃圾、移除恶意软件、清理个人隐…

电路设计(6)——彩灯控制器的multism仿真

1.功能设计 使用两个运算放大器、两个计数器芯片&#xff0c;实现了彩灯的循环移位控制。 整体原理图如下所示&#xff1a; 运行效果截图如下&#xff1a; 小灯分为两组&#xff0c;一组十个&#xff0c;在脉冲的驱动下&#xff0c;轮流发光&#xff01; 2.设计思路 两个运放…

2023年山东省职业院校技能大赛高职组“软件测试”赛项竞赛任务书

2023年山东省职业院校技能大赛高职组 “软件测试”赛项竞赛任务书 目录 2023年山东省职业院校技能大赛高职组 “软件测试”赛项竞赛任务书 竞赛概述 竞赛时间 本次竞赛时间共为8小时&#xff0c;参赛选手自行安排任务进度&#xff0c;休息、饮水、如厕等不设专门用时&#…

前端常用的Vscode插件

前端常用的Vscode插件&#x1f516; 文章目录 前端常用的Vscode插件&#x1f516;1. Chinese (Simplified) (简体中文) Language Pack for Visual Studio Code -- Vscode中文插件2. Code Runner -- 快速运⾏调试代码3. Live Server -- 实时重新加载本地开发服务器4. Image prev…

【并发编程篇】源码分析,手动创建线程池

文章目录 &#x1f6f8;前言&#x1f339;Executors的三大方法 &#x1f354;简述线程池&#x1f386;手动创建线程池⭐源码分析✨代码实现&#xff0c;手动创建线程池&#x1f388;CallerRunsPolicy()&#x1f388;AbortPolicy()&#x1f388;DiscardPolicy()&#x1f388;Dis…