010 rocketmq批量消息

news2025/3/3 4:58:50

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
 tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        String topic = "TopicTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));


        //then you could split the large list into small ones:
        ListSplitter splitter = new ListSplitter(messages);

        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                SendResult sendResult = producer.send(listItem);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                //handle the error
            }
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
                Message message = messages.get(nextIndex);


                //计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量

                int tmpSize = message.getTopic().length() + message.getBody().length;
                //属性值的添加
                Map<String, String> properties = message.getProperties();
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    //+key + value
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }




                tmpSize = tmpSize + 20; //for log overhead
                if (tmpSize > SIZE_LIMIT) {
                    //it is unexpected that single message exceeds the SIZE_LIMIT
                    //here just let it go, otherwise it will block the splitting process
                    if (nextIndex - currIndex == 0) {
                        //if the next sublist has no element, add this one and then break, otherwise just break
                        nextIndex++;
                    }
                    break;
                }
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BatchConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);


                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2308753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Qt】MVC设计模式

目录 一、搭建MVC框架 二、创建数据库连接单例类SingleDB 三、数据库业务操作类model设计 四、control层&#xff0c;关于model管理类设计 五、view层即为窗口UI类 一、搭建MVC框架 里面的bin、lib、database文件夹以及sqlite3.h与工程后缀为.pro文件的配置与上次发的文章…

ARM 处理器平台 eMMC Flash 存储磨损测试示例

By Toradex秦海 1). 简介 目前工业嵌入式 ARM 平台最常用的存储器件就是 eMMC Nand Flash 存储&#xff0c;而由于工业设备一般生命周期都比较长&#xff0c;eMMC 存储器件的磨损寿命对于整个设备来说至关重要&#xff0c;因此本文就基于 NXP i.MX8M Mini ARM 处理器平台演示…

本地部署DeepSeek-R1(Dify发件邮箱、找回密码、空间名称修改)

Dify配置发件邮箱 DIfy默认邮箱配置为空&#xff0c;在邀请团队成员注册时是不会发送邀请链接的&#xff0c;只能通过手动复制生成的注册链接发送给对应的人去注册设置密码。 这样很麻烦&#xff0c;并且在找回密码时也接收不了邮件&#xff0c;无法重置密码。 找到本地部署…

EasyRTC:支持任意平台设备的嵌入式WebRTC实时音视频通信SDK解决方案

随着互联网技术的飞速发展&#xff0c;实时音视频通信已成为各行各业数字化转型的核心需求之一。无论是远程办公、在线教育、智慧医疗&#xff0c;还是智能安防、直播互动&#xff0c;用户对低延迟、高可靠、跨平台的音视频通信需求日益增长。 一、WebRTC与WebP2P&#xff1a;实…

数据库数据恢复—SQL Server附加数据库报错“错误 823”怎么办?

SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”&#xff0c;附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份&#xff0c;备份时间太久&#xff0c;或者其他原因导致备份不可用&#xff0c;那么就需要通过专业手段对数据库进行数据恢复…

HTMLS基本结构及标签

HTML5是目前制作网页的核心技术&#xff0c;有叫超文本标记语言。 基本结构 声明部分位于文档的最前面&#xff0c;用于向浏览器说明当前文档使用HTML标准规范。 根部标签位于声明部分后&#xff0c;用于告知浏览器这是一个HTML文档。< html>表示文档开始&#xff0c;&l…

IDEA集成DeepSeek,通过离线安装解决无法安装Proxy AI插件问题

文章目录 引言一、安装Proxy AI1.1 在线安装Proxy AI1.2 离线安装Proxy AI 二、Proxy AI中配置DeepSeek2.1 配置本地部署的DeepSeek&#xff08;Ollama方式&#xff09;2.2 通过第三方服务商提供的API进行配置 三、效果测试 引言 许多开发者尝试通过安装Proxy AI等插件将AI能力…

phpstudy安装教程dvwa靶场搭建教程

GitHub - digininja/DVWA: Damn Vulnerable Web Application (DVWA) Dvwa下载地址 Windows版phpstudy下载 - 小皮面板(phpstudy) 小皮下载地址 1选择windows 版本&#xff0c;点击立即下载 下载完成&#xff0c;进行解压&#xff0c;注意不要有中文路径 点击.exe文件进行安装…

【linux】详谈 环境变量

目录 一、基本概念 二、常见的环境变量 取消环境变量 三、获取环境变量 通过代码获取环境变量 环境变量的特性 1. getenv函数:获取指定的环境变量 2. environ获取环境变量 四、本地变量 五、定义环境变量的方法 临时定义&#xff08;仅对当前会话有效&#xff09; 永…

【Linux高级IO】多路转接(poll epoll)

目录 1. poll 2. epoll 2.1 epoll_ctl 2.2 epoll_wait 2.3 epoll原理 2.4 epoll的工作模式 2.5 epoll的惊群效应 使用建议 总结 1. poll poll也是实现 I/O 多路复用的系统调用&#xff0c;可以解决select等待fd上限的问题&#xff0c;将输入输出参数分离&#xff0c;不需要…

供应链管理系统--升鲜宝门店收银系统功能解析,登录、主界面、会员 UI 设计图(一)

供应链管理系统--升鲜宝门店收银系统功能解析&#xff0c;登录、主界面 会员 UI 设计图&#xff08;一&#xff09;

【Linux系统编程】基础IO--磁盘文件

目录 前言 磁盘的机械构成 盘片介绍 盘片与磁头 数据的存储&#xff08;硬件&#xff09; 磁盘的物理存储 逻辑结构&#xff1a;磁道/柱面、扇面、扇区 磁盘I/O的基本单位与扇区的存储密度 CHS定位法&#xff1a;数据的查找 磁盘的逻辑存储 扇区的抽象结构(数据…

C# .NET Core HttpClient 和 HttpWebRequest 使用

HttpWebRequest 这是.NET创建者最初开发用于使用HTTP请求的标准类。HttpWebRequest是老版本.net下常用的&#xff0c;较为底层且复杂&#xff0c;访问速度及并发也不甚理想&#xff0c;但是使用HttpWebRequest可以让开发者控制请求/响应流程的各个方面&#xff0c;如 timeouts,…

[3/11]C#性能优化-实现 IDisposable 接口-每个细节都有示例代码

[3]C#性能优化-实现 IDisposable 接口-每个细节都有示例代码 前言 在C#开发中&#xff0c;性能优化是提升系统响应速度和资源利用率的关键环节。 当然&#xff0c;同样是所有程序的关键环节。 通过遵循下述建议&#xff0c;可以有效地减少不必要的对象创建&#xff0c;从而减…

1.C语言初识

C语言初识 C语言初识基础知识hello world数据类型变量、常量变量命名变量分类变量的使用变量的作用域 常量字符字符串转义字符 选择语句循环语句 函数&#xff1b;数组函数数组数组下标 操作符操作符算术操作符移位操作符、位操作符赋值操作符单目操作符关系操作符逻辑操作符条…

软件测试中的BUG

文章目录 软件测试的生命周期BugBug 的概念描述 Bug 的要素案例Bug 级别Bug 的生命周期与开发产生争执怎么办&#xff1f;【高频面试题】先检查自身&#xff0c;Bug 是否描述的不清楚站在用户角度考虑并抛出问题Bug 的定级要有理有据提⾼自身技术和业务水平&#xff0c;做到不仅…

TinyEngine v2.2版本发布:支持页面嵌套路由,提升多层级路由管理能力开发分支调整

2025年春节假期已过&#xff0c;大家都带着慢慢的活力回到了工作岗位。为了让大家在新的一年继续感受到 Tiny Engine 的成长与变化&#xff0c;我们很高兴地宣布&#xff1a;TinyEngine v2.2版本正式发布&#xff01;本次更新带来了重要的功能增强------页面支持嵌套路由&#…

Web自动化之Selenium添加网站Cookies实现免登录

在使用Selenium进行Web自动化时&#xff0c;添加网站Cookies是实现免登录的一种高效方法。通过模拟浏览器行为&#xff0c;我们可以将已登录状态的Cookies存储起来&#xff0c;并在下次自动化测试或爬虫任务中直接加载这些Cookies&#xff0c;从而跳过登录步骤。 Cookies简介 …

Storm实时流式计算系统(全解)——中

storm编程的基本概念-topo-spout-bolt 例如下&#xff1a; storm 编程接口-spout的结构及组件实现 storm编程案例-spout组件-实现 这是我的第一个组件&#xff08;spout组件继承BaseRichSput&#xff09;所有重写内部的三个方法&#xff0c;用于接收数据&#xff08;这里数据是…

让deepseek更专业的提示词教程

一、明确需求和目标 在使用DeepSeek之前&#xff0c;首先要明确你的需求和目标。例如&#xff0c;你是要生成一篇学术论文的摘要&#xff0c;还是一个商业文案的大纲&#xff0c;亦或是一段技术分析。明确的目标可以帮助你更有针对性地编写提示词。 二、使用专业术语和结构化…