RocketMQ 发送批量消息、过滤消息和事务消息

news2025/2/27 12:29:24

前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。

发送批量消息

限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。

消息的生产者

package com.demo.rocketmq.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

/**
 * 批量发送消息
 *
 * 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
 */
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 1:实例化消息生产者 Producer,  指定生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("produceGroup");
        // 2:设置NameServer的地址
        producer.setNamesrvAddr("192.168.152.130:9876");
        // 3:启动Producer实例
        producer.start();


        // 4:创建消息
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));

        try {
            SendResult sendResult = producer.send(messages);

            // 5:通过 sendResult 返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
            System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());
        } catch (Exception e) {
            e.printStackTrace();
            //处理error
        }

        // 6:如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消息的消费者

package com.demo.rocketmq.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BatchConsumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");

        // 2. 指定 NameSever 地址
        consumer.setNamesrvAddr("192.168.152.130:9876");

        // 3. 订阅主题 Topic 和 tag
        consumer.subscribe("BatchTest", "TagA");

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

消息分割

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
       @Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}

 
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

过滤消息

RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

消息过滤主要通过以下几个关键流程实现:

  • 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。

  • 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。

  • 服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。

详细的可以看下官网
这里使用 Tag 过滤,在大多数情况下,TAG是一个简单而有用的设计,其可以来选择想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

发送消息的时候,还是按照正常的方式发送,在消费消息的时候,修改下对应的 tag 表达式就好

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TagConsumer {

    public static void main(String[] args) throws Exception{
        // 1. 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");

        // 2. 指定 NameSever 地址
        consumer.setNamesrvAddr("192.168.152.130:9876");

        // 3. 订阅主题 Topic 和 tag
        consumer.subscribe("TagFilterTopicTest", "TagA || TagB");    // 这块做了修改

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

事务消息

这里是消息支持事务操作,如果发送消息失败,就可以回滚当前的操作。

整个事务消息的详细交互流程如下图所示:
在这里插入图片描述

生产者

对应的生产者需要添加 事务监听器 ,如果返回的状态值为 LocalTransactionState.UNKNOW; 就会进行回查消息

 producer.setTransactionListener(new TransactionListener() {

    /**
     * 在该方法中执行本地事务
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println(o);

        // 如果是 TagA 就执行提交
        if (StringUtils.equals("TagA", message.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", message.getTags())) {
            // 如果是 TagB 就执行回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("TagC",  message.getTags())) {
            // 如果是 TagB 就返回 UNKNOW,  返回 UNKNOW 的时候,调用下面的回查方法
            return LocalTransactionState.UNKNOW;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

    /**
     * 该方法是 MQ 进行消息事务状态回查
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

完整代码

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 发送事务消息
 * @author wuq
 * @Time 2023-1-5 17:46
 * @Description
 */
public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        // 1:实例化消息生产者 Producer,  指定生产组名称
        TransactionMQProducer producer = new TransactionMQProducer("produceGroup");
        // 2:设置NameServer的地址
        producer.setNamesrvAddr("192.168.220.129:9876");

        // 3: 添加监听器
        producer.setTransactionListener(new TransactionListener() {

            /**
             * 在该方法中执行本地事务
             * @param message
             * @param o
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println(o);

                // 如果是 TagA 就执行提交
                if (StringUtils.equals("TagA", message.getTags())) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TagB", message.getTags())) {
                    // 如果是 TagB 就执行回滚
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (StringUtils.equals("TagC",  message.getTags())) {
                    // 如果是 TagB 就返回 UNKNOW,  返回 UNKNOW 的时候,调用下面的回查方法
                    return LocalTransactionState.UNKNOW;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            /**
             * 该方法是 MQ 进行消息事务状态回查
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        // 启动生成者
        producer.start();


        String[] tags = {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 3; i++) {
            // 4:创建消息,并指定 Topic,Tag 和 消息体
            Message msg = new Message("TransactionTopic", tags[i], ("RocketMQ Sync Msg " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 5:发送消息到一个Broker, 第二个参数:执行本地事务的回调参数
            SendResult sendResult = producer.sendMessageInTransaction(msg, Map.of("callback", "true"));

            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
            System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());

            TimeUnit.SECONDS.sleep(3);
        }

        // 6:如果不再发送消息,关闭Producer实例。
//        producer.shutdown();
    }
}

对应的消费者

在测试的时候发现,会出现 tagC 两次被回查的情况,这里可能是需要做幂等控制

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionConsumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建消费者,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");

        // 2. 指定 NameSever 地址
        consumer.setNamesrvAddr("192.168.220.129:9876");

        // 3. 订阅主题 Topic 和 tag
        consumer.subscribe("TransactionTopic", "*");

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach(messageExt -> {
                    String body = new String(messageExt.getBody());
                    String tags = messageExt.getTags();
                    System.out.println("tag:" + tags + ", body: "+ body);
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/426867.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何合理选择ClickHouse表主键

ClickHouse提供索引和数据存储的复杂机制&#xff0c;能够实现在高负载下仍有优异的读写性能。当创建MergeTree表时需要选择主键&#xff0c;主键影响大多数查询性能。本文介绍主键的工作原理&#xff0c;让我们知道如何选择合适的主键。 设置主键 MergeTree表可以设置主键&am…

香橙派5使用RK3588S内置NPU加速yolov5推理,实时识别数字达到50fps

前言&#xff1a; 香橙派5采用了RK3588S&#xff0c;内置的NPU达到了6Tops的算力&#xff0c;博主这里记录一下自己的踩坑过程&#xff0c;好不容易做出来的不能以后忘记了&#xff08;手动狗头&#xff09;。这里博主还在B站上发布了效果视频&#xff0c;大家感兴趣的话可以看…

TensorFlow 和 Keras 应用开发入门:1~4 全

原文&#xff1a;Beginning Application Development with TensorFlow and Keras 协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【ApacheCN 深度学习 译文集】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 不要担心自己的形…

Java 中的 非并发容器

1.四大类容器 java中容器主要有四大类&#xff0c;如下图所示 2.非并发容器 1) List 类 List 类 不支持并发的有 ArrayList 与 LinkedList ArrayList 底层实现 ArrayList 底层为 数组&#xff0c;由于数组的特性&#xff0c;非常适合用于 查询多&#xff0c;增删改的业务…

【数据结构学习1】数据结构

目录数据结构定义数据结构的构成逻辑结构逻辑结构的类型存储结构数据运算数据类型和抽象数据类型算法定义分析基础时间复杂度分析事前分析估算法 -> 分析算法的执行时间时间复杂度时间复杂度类型简化的算法时间复杂度分析空间复杂度分析数据结构 定义 数据&#xff1a;所有…

工作流调度系统 Azkaban介绍与安装(一)

文章目录前言1、为什么要用工作流调度系统2、常见的工作流调度系统1 集群规划2 配置 MySQL3 配置 Executor Server3.1 修改 azkaban.properties3.2 启动3.3 激活4 配置 Web Server4.1 修改 azkaban.properties4.2 修改azkaban-users.xml文件&#xff0c;添加 atguigu 用户4.3 启…

VM 虚拟机没有网络,无法Ping通

场景&#xff1a; 虚拟机用过&#xff0c;之前一切正常&#xff0c;使用NAT模式联网&#xff0c;配置了静态IP换了路由器&#xff0c;推测是主机IP网段变了无法使用ssh工具连接虚拟机&#xff0c;且相互都ping不通&#xff08;后来经历了主机可以ping通虚拟机&#xff0c;虚拟…

PWM寄存器初始化

本模块主要实现输出频率占空比可调的 PWM 波形功能和输入捕获功能&#xff0c;同时也可作为计数器使用。一、主要特性 1. 16位向上或向下计数器&#xff1b; 2. 支持最多6路PWM通道&#xff1b; 3. 每个通道支持输出比较或边缘对齐PWM模式波形输出&#xff0c;支持设置、清除、…

关于 CSDN-AI 机器人 programmer_ada —— 阿达·洛夫莱斯(Ada Lovelace)

收到早期文章的一条新评论&#xff1a; 文笔和内容稍稍透漏着机器人的风格&#xff0c;打开主页果不其然 看到个人介绍中的巴贝奇的分析机&#xff0c;突然觉得头像很是眼熟。 最近刚读了《人工智能简史》&#xff0c;第4章——从汇编语言到TensorFlow&#xff0c;人工智能的…

使用layui组件库制作进度条

使用layui组件库制作进度条 html代码 <!DOCTYPE html> <html> <head><meta charset"UTF-8"><title>Example</title><!-- 引入 layui 的 CSS 文件 --><link rel"stylesheet" href"https://cdn.staticfil…

Day948.组件化成熟度评估,你的目的地在哪里呢 -系统重构实战

组件化成熟度评估&#xff0c;你的目的地在哪里呢 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于组件化成熟度评估&#xff0c;你的目的地在哪里呢的内容。 一、组件化成熟度模型 组件化成熟度模型可以帮助咱全局去思考当前的现状&#xff0c;并制定更有针对性的…

ChatGPT带火的百万年薪职业究竟是什么?

对话有ChatGPT、画图有Midjourney&#xff0c;哪怕被封号了&#xff0c;国内的文心一言、通义千问也不遑多让。 ChatGPT等生成式AI工具涌现程度堪比“乱花渐欲迷人眼”。 拟一份演讲稿&#xff0c;画一张海报&#xff0c;做一份PPT大纲&#xff0c;生成个图表&#xff0c;敲一…

【数据结构】- 线性表+顺序表(上)

文章目录前言一、线性表二、顺序表2.1概念及结构2.2接口实现2.3具体实现总结前言 所有的失败都是上帝在考验你是否真的热爱 本章是关于数据结构中的顺序表和链表 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、线性表 线性表&#xff08;linear …

Xilinx CDC Constraints(ug903: Chapter6 )

&#xff08;1&#xff09;About CDC Constraints 跨时钟域约束适用于具有不同启动和捕获时钟的定时路径。根据启动和捕获时钟的关系以及在CDC路径上设置的时序异常&#xff0c;有同步CDC和异步CDC。例如&#xff0c;同步时钟之间但被错误路径约束覆盖的CDC路径不被定时…

传统图像处理——颜色迁移

转自知乎&#xff1a;https://zhuanlan.zhihu.com/p/267832794,仅供学习。 利用一张图片的颜色去修改另一张图片的颜色风格。 原理是利用颜色空间的正交化&#xff0c;即更改某个颜色&#xff0c;不会影响到其它属性。这里的色彩迁移的论文则是使用了LAB空间&#xff08;RGB颜…

C语言初阶--连用scanf(以%c读取时)遇到的问题

目录前言总结前言 在我们写程序的过程中&#xff0c;会频繁使用scanf函数&#xff0c;当在一个程序中scanf用多了&#xff0c;会出现输入不了的问题&#xff01;大家有没有想过是什么原因导致的该问题呢&#xff1f;下面我们一起探讨一下吧&#xff01; 遇到问题的例子&#…

Linux环境下安装JDK1.8

目录 一、下载jdk 二、安装准备 三、解压缩包到指定安装目录 四、配置环境变量 五、验证安装结果 一、下载jdk 这部分依然是从Oracle官网下载&#xff0c;下载速度还是很快的。 下载完成后&#xff0c;将该压缩包放到Linux环境下&#xff0c;准备解压安装。 二、安装准备…

Redis性能调优详解

文章目录前言确认是否是Redis真的变慢了&#xff1f;什么是基准性能&#xff1f;具体如何做&#xff1f;使用复杂度过高的命令哪些属于复杂度过高命令--聚合类命令、 大值数据针对这种情况如何解决呢&#xff1f;操作bigkeybigkey耗时原因如何定位出bigKey--bigkeys这里我需要提…

34岁本科男,做了5年功能测试想转行,除了进厂还能干什么?

我的建议是不要给自己设限。任何一个行业只要做到顶尖都是很有作为的&#xff0c;何况是IT行业&#xff0c;本身就比别的行业有优势&#xff0c;如果你现在是功能测试&#xff0c;应该想的是进阶自动化测试或者测试开发 如何在半年时间由功能测试成长为年薪30W的测试开发&#…

【Hydro】常用地下水与溶质运移模拟软件

饱和地下水流和溶质运移常见的模拟软件 常用的求解地下水水流和溶质运移方程的数学方法有两种&#xff1a;有限差分法和有限元法。两者主要的差别在于离散模型区的方法不同。基于不同的数学方法&#xff0c;当前市场上有一些地下水模拟图形用户界面&#xff0c;它们在基本功能…