今天学学消息队列RocketMQ:消息类型

news2024/12/27 10:54:25

        RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。

普通消息

        普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。

// 生产者
public static void main(String[] args) throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println();
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

顺序消息

        当我开始对消息的时序性有要求的时候,普通消息就无法满足我们的需求了。当我们要求顺序消费的时候,我们的Topic就不能采用多对多的形式,存放在多个MessageQueue中,而是需要牺牲一部分性能,存放在一个MessageQueue中。

// 生产者
public static void main(String[] args) throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(0), null);

            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println("RocketMQ test msg " + i);
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 跟普通消费不同的地方,这里采用了顺序消费方法。
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

 延时消息

        通过指定的通知时间间隔,让消息不会立刻被消费者收到。

// 生产者
public static void main(String[] args) throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            // 指定延时等级
            // DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
            // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            msg.setDelayTimeLevel(4);
            // 按毫秒延时
            msg.setDelayTimeMs(1L);
            // 按秒延时
            msg.setDelayTimeSec(1);
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println();
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

30秒的延迟

事务消息

        事务消息是RocketMQ提供的一种高级的消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息的生命周期

        (1)初始化:半事务消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态。

        (2)事务待提交:半事务消息被发送到服务端,和普通消息不同,半事务消息并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

        (3)消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

        (4)提交事务待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见。

        (5)消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一段时间后没有收到消费者的响应,服务端会对消息进行重试处理。

        (6)消费完成:消费者完成了消费动作,并向服务端提交了消费结果,服务端标记当前消息已经被处理。

        (7)消息删除:服务端按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

// 生产者
public static void main(String[] args) throws MQClientException {
    TransactionMQProducer producer = new TransactionMQProducer("rmq-group");
    TransactionListener listener = new TransactionListenerImpl();
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.setTransactionListener(listener);
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println(new Date());
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println();
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

 控制台打印

// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
93

============================
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
94

============================
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
95

============================
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
96

============================
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
97

============================
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
98

============================
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
99

============================
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
100

============================
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
101

============================
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
102

============================


// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023 
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023 
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023 

这里消费者只收到了执行事务成功的5,6,9。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/799152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一、Spring源码-ApplicationContext

Spring源码篇-ApplicationContext 一、ApplicationContext ApplicationContext到底是什么&#xff1f;字面含义是应用的上下文。这块我们需要看看ApplicationContext的具体的结构。 通过ApplicationContext实现的相关接口来分析&#xff0c;ApplicationContext接口在具备BeanF…

【正规方程对波士顿房价数据集进行预测】

数据准备 我们首先需要加载波士顿房价数据集。该数据集包含房屋特征信息和对应的房价标签。 import pandas as pd import numpy as npdata_url "http://lib.stat.cmu.edu/datasets/boston" raw_df pd.read_csv(data_url, sep"\s", skiprows22, headerN…

初代AIGC明星独角兽,停摆在大模型元年

唏嘘&#xff01;AIGC方兴未艾&#xff0c;但国内AIGC领域的昔日龙头独角兽&#xff0c;正站在风雨飘摇的悬崖边。 影谱科技&#xff0c;初代目AIGC明星公司&#xff0c;被爆已经面临经营不善、运营停摆的窘境。 这家成立于2009年的AI影像公司&#xff0c;一直专注大文娱产业…

JAVA线上问题排查降龙十八掌

现场问题一般有以下几种问题 CPU,磁盘&#xff0c;内存&#xff0c;GC问题&#xff0c;网络 同时例如jstack、jmap等工具也是不囿于一个方面的问题的&#xff0c;基本上出问题就是df、free、top 三连&#xff0c;然后依次jstack、jmap伺候&#xff0c;具体问题具体分析即可。 …

Vue3解决:Mockjs 引入后并访问 404(Not Found) 的页面报错问题

1、问题描述&#xff1a; 其一、报错为&#xff1a; GET http://localhost:5173/list 404 (Not Found) ncaught (in promise) AxiosError {message: Request failed with status code 404, name: AxiosError, code: ERR_BAD_REQUEST, config: {…}, request: XMLHttpRequest,…

【C语言初阶】指针篇—上

目录 1. 指针是什么&#xff1f;2. 指针和指针类型2.1 指针-整数2.2 指针的解引用 3. 野指针3.1 野指针成因1. 指针未初始化2. 指针越界访问3. 指针指向的空间释放 3.2 如何规避野指针 1. 指针是什么&#xff1f; 指针是什么&#xff1f; 指针理解的2个要点&#xff1a; > 1…

DAY14_FilterListenerAjaxAxiosJsonfastjson综合案例-axios和html交互

目录 1 Filter1.1 Filter概述1.2 Filter快速入门1.2.1 开发步骤1.2.2 代码演示 1.3 Filter执行流程1.4 Filter拦截路径配置1.5 过滤器链1.5.1 概述1.5.2 代码演示1.5.3 问题 1.6 案例1.6.1 需求1.6.2 分析1.6.3 代码实现1.6.3.1 创建Filter1.6.3.2 编写逻辑代码1.6.3.3 测试并抛…

计算机组成原理问答7

外围设备 1. I/O设备 输入设备(鼠标、键盘)、输出设备(显示器、打印机)、外存设备(光盘、硬盘) 2. I/O接口 又称I/O控制器、设备控制器,负责协调主机与外部设备之间的数据传输。 I/O控制方式 3. 程序查询方式 中断 关中断作用:实现原子操作。屏蔽可屏蔽的中断。…

备战秋招 | 笔试强训16

目录 一、选择题 二、编程题 三、选择题题解 四、编程题题解 一、选择题 1、下列一段 C 代码的输出结果是&#xff08;&#xff09; #include <iostream> class Base { public:int Bar(char x){return (int)(x);}virtual int Bar(int x){return (2 * x);} }; clas…

如何启用路由器dhcp?快解析如何内网穿透?

一、什么是DHCP&#xff1f; 动态主机设置协议&#xff08;DHCP&#xff09;是一种使网络管理员能够集中管理和自动分配 IP 网络地址的通信协议。在网络中&#xff0c;每个联网设备都需要分配独有的 IP 地址。并当有新计算机移到网络中的其它位置时&#xff0c;能自动收到新的…

【阅读笔记】一种暗通道优先的快速自动白平衡算法

解决问题: 自动白平衡算法中存在白色区域检测错误导致白平衡失效的问题,作者提出了一种基于暗通道优先的白平衡算法。 算法思想: 图像中白色区域或者高饱和度区域的光线透射率较低,根据以上特性利用暗通道法计算图像中白色区域。 算法概述: 作者使用何凯明提出的基于暗…

MLP-Mixer:面向视觉的全mlp架构

文章目录 MLP-Mixer: An all-MLP Architecture for Vision摘要本文方法代码实验结果 MLP-Mixer: An all-MLP Architecture for Vision 摘要 卷积神经网络(cnn)是计算机视觉的首选模型。 最近&#xff0c;基于注意力的网络&#xff0c;如VIT&#xff0c;也变得流行起来。在本文…

四种刷题模式的爱刷题无后端无数据库刷题应用网站H5源码

四种刷题模式的爱刷题无后端无数据库刷题应用网站H5源码。提供了简单轻量化的部署方式和详细的四种刷题模式教程。该应用使用JSON作为题库的存储方式&#xff0c;层次清晰、结构简单易懂。 配套的word模板和模板到JSON转换工具可供使用&#xff0c;方便将题库从word格式转换为…

Python显示循环代码的进度条

目录 1. tqdm库 2. alive_progress库 3. progressbar库 1. tqdm库 tqdm是一个快速&#xff0c;可扩展的Python进度条&#xff0c;可以在Python长循环中添加一个进度提示信息 import time from tqdm import trangefor i in trange(100):# do somethingtime.sleep(0.5) 2. a…

Mysql原理篇--第一章 1条语句的执行

Mysql原理系列篇 第一章 1条语句的执行 文章目录 Mysql原理系列篇 第一章 1条语句的执行前言&#xff1a;1 连接mysql 服务端:1.1 通信模型&#xff1a;1.2 通信连接数&#xff1a; 2 sql 语句的到达存储引擎的流程:3 数据从存储引擎返回的流程&#xff1a;4 Buffer_pool 脏页的…

【云原生】Docker容器资源限制(CPU/内存/磁盘)

目录 ​编辑 1.限制容器对内存的使用 2.限制容器对CPU的使用 3.block IO权重 4.实现容器的底层技术 1.cgroup 1.查看容器的ID 2.在文件中查找 2.namespace 1.Mount 2.UTS 3.IPC 4.PID 5.Network 6.User 1.限制容器对内存的使用 ⼀个 docker host 上会运⾏若⼲容…

【Linux下6818开发板(ARM)】SecureCRT串口和交叉编译工具(巨细版!)

(꒪ꇴ꒪ ),hello我是祐言博客主页&#xff1a;C语言基础,Linux基础,软件配置领域博主&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff01;送给读者的一句鸡汤&#x1f914;&#xff1a;集中起来的意志可以击穿顽石!作者水平很有限&#xff0c;如果发现错误&#x…

策略:一致性行动原则,力出一孔

策略&#xff1a;一致性行动的原则 策略有很多种解释 经常跟战略混淆 趣讲大白话&#xff1a;就是指导方针 【趣讲信息科技238期】 **************************** 教员的游击战“16字口诀”很经典 敌进我退&#xff0c;敌退我进&#xff0c; 敌疲我打&#xff0c;敌驻我扰 曾国…

小城市当程序员好不好?

在职业发展中&#xff0c;小城市和大城市都有各自的机会和挑战。在大城市&#xff0c;C#的应用比例可能相对较低&#xff0c;学习C可能有一定的难度&#xff0c;而学习Java最好有人指导。在小城市&#xff0c;机会相对较少&#xff0c;跳槽的选择也有限。然而&#xff0c;小城市…

想转嵌入式或工控上位机,哪个前途更光明?

上位机开发的需求目前很大&#xff0c;根据BOSS上的数据&#xff0c;C#和WPF的需求较多。作为WPF开发者&#xff0c;薪资水平可以有较大的变动&#xff0c;主要取决于经验&#xff0c;20,000到30,000元的薪资并不难达到。工作环境因工种而异&#xff0c;不可避免地需要进行现场…