RocketMQ批量消息

news2024/11/15 8:38:03

RocketMQ消息发送基本示例(推送消费者)-CSDN博客

RocketMQ消费者主动拉取消息示例-CSDN博客

RocketMQ顺序消息-CSDN博客

RocketMQ广播消息-CSDN博客

RocketMQ延时消息-CSDN博客

批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去,原先的都是一次发一条.批量消息的好处是减少网络IO,提高吞吐量.

批量消息的使用限制:

消息大小不能超过4M,虽然源码注释不能超过1M,实际使用不超过4M即可.平衡整体性能,建议保持1M

相同的Topic

相同的waitStoreMsgOK

不能是延迟,不能是事务消息

注意:::::::批量发送主要是为了优化生产者发送消息的效率,但在消费者端,消息仍然是逐条处理的。

package com.example.rocketmqdemo.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
 * @author hrui
 * @date 2024/8/1 12:16
 */
public class BatchProducer {

    public static void main(String[] args) {
        //创建一个DefaultMQProducer实例,指定生产者组名为"group1"
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");//生产者组和消费者组是不同概念  不需要相同

        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");

        try {
            //启动生产者实例
            producer.start();

            //批量发送用一个Message数组或集合
            List<Message> messages = new ArrayList<>();

            //发送10条消息
            for (int i = 0; i < 2; i++) {
                //创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号
                Message message = new Message("Batch", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));
                messages.add(message);
            }
            //批量发送消息
            SendResult sendResult = producer.send(messages);

            //打印消息发送结果
            System.out.println("批量消息发送成功:返回---->" + sendResult);
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        } finally {
            //关闭生产者实例,释放资源
            producer.shutdown();
        }
    }
}

package com.example.rocketmqdemo.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author hrui
 * @date 2024/8/1 12:20
 */
public class BathConsumer {
    public static void main(String[] args) {
        //创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"
        //采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");

        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");

        try {
            //订阅主题"Topic1",过滤标签为"*",表示接收所有消息
            consumer.subscribe("Batch", "*");

            //设置消息监听器,处理接收到的消息
            //可以传入两种类型的监听器:
            //1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
            //2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
            consumer.setMessageListener(new MessageListenerConcurrently() {


                //consumeMessage方法用于处理接收到的消息列表
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                    System.out.println(Thread.currentThread().getName());
                    for (int i=0;i<list.size();i++){
                        System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的
                    }

                    //返回消费状态,CONSUME_SUCCESS表示消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            //启动消费者实例,开始接收消息
            consumer.start();
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        }
    }
}

如果List<Message> 超过4M

如果超过4M  可以用分批次处理  可以自定义实现迭代器去实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1966419.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot四川旅游攻略分享互动平台-计算机毕业设计源码70222

摘 要 本研究基于Spring Boot框架开发了一款高效、可靠的四川旅游攻略分享互动平台。该系统主要面向管理员、普通用户和商家用户&#xff0c;涵盖了多个功能模块&#xff0c;包括旅游景点、旅游攻略、景点订单、酒店订单、酒店信息等。通过对系统需求的分析和设计&#xff0c;…

从数据规划到产品运营,拆解数据资产产品化的6大路径

数据资源入表对于企业数据资产的估值影响并不大&#xff0c;要想提升数据资产的整体价值&#xff0c;将数据资产进行产品化是更有效的途径之一。 那么&#xff0c;数据资产产品化的具体路径是怎样的&#xff1f; 在由WakeData惟客数据联合星光数智推出的直播栏目《星光对话》…

打破自闭症束缚:儿童康复案例揭秘

在自闭症的阴霾下&#xff0c;孩子们仿佛被困在一个无形的牢笼中&#xff0c;与外界的世界隔绝。然而&#xff0c;通过不懈的努力和科学的康复方法&#xff0c;许多孩子正在逐渐打破这一束缚&#xff0c;走向充满希望的未来。让我们一同走进几个令人鼓舞的儿童康复案例&#xf…

如何通过阿里云服务器部署hexo博客(超详细)

&#x1f44f;大家好&#xff01;我是和风coding&#xff0c;希望我的文章能给你带来帮助&#xff01; &#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44d;三连支持&#x1f44d;一下博主哦 &#x1f4dd;点击 我的主页 还可以看到和风的其他内容噢&#x…

零基础入门转录组数据分析——机器学习算法之boruta(筛选特征基因)

零基础入门转录组数据分析——机器学习算法之boruta&#xff08;筛选特征基因&#xff09; 目录 零基础入门转录组数据分析——机器学习算法之boruta&#xff08;筛选特征基因&#xff09;1. boruta基础知识2. boruta&#xff08;Rstudio&#xff09;——代码实操2. 1 数据处理…

[Docker][Docker Volume]详细讲解

目录 1.什么是存储卷&#xff1f;2.为什么需要存储卷&#xff1f;1.数据丢失问题2.性能问题3.宿主机和容器互访不方便4.容器和容器共享不方便 3.存储卷分类1.volume docker 管理卷2.bind mount 绑定数据卷3.tmpfs mount 临时数据卷 5.管理卷 Volume1.创建卷1.-v 参数2.--mount …

《Milvus Cloud向量数据库指南》——向量数据库性价比大比拼:谁才是性能之王?

在分析这份向量数据库(Vector Databases)的性价比排名表格时,我们需要从多个维度深入探讨,包括但不限于硬件配置、价格/性能比(QP$,即每百万次查询所花费的价格)、数据集大小、查询类型(无标量过滤、低标量过滤、高标量过滤)以及不同服务提供商之间的比较。以下是一个…

微波治疗仪,美容仪,爆脂仪电源板

分享一下爆脂仪&#xff0c;美容仪&#xff0c;微波治疗仪电源板&#xff0c;高压输出为-2000v&#xff0c;驱动电流最大100mA&#xff0c;匹配磁控管功率输出100w

KubeBlocks v0.9 解读|最高可管理 10K 实例的 InstanceSet 是什么?

实例&#xff08;Instance&#xff09;是 KubeBlocks 中的基本单元&#xff0c;它由一个 Pod 和若干其它辅助对象组成。为了容易理解&#xff0c;你可以先把它简化为一个 Pod&#xff0c;下文中将统一使用实例这个名字。 InstanceSet 是一个通用 Workload API&#xff0c;负责…

python-进度条和计时器

from tqdm import tqdm import time# 设置任务的总步骤数 total_steps 100# 使用tqdm创建进度条 with tqdm(totaltotal_steps, unitstep) as pbar:# 开始计时start_time time.time()# 模拟任务步骤for i in range(total_steps):# 模拟每一步的工作负载time.sleep(0.1) # 假设…

C语言:自定义类型进阶(结构体、联合体、枚举)

自定义类型&#xff08;结构体、联合体、枚举&#xff09; 一、结构体&#xff08;一&#xff09;结构体的内存对齐1、结构体内存对齐规则&#xff08;1&#xff09;引子&#xff08;2&#xff09;offsetof 宏函数&#xff08;3&#xff09;内存对齐原理&#xff08;4&#xff…

【HM】DevEco Studio提供Hot Reload(热重载)能力确实好用!帮助开发者更快速进行调试。但要注意以下几点?

​​ 仅支持开发者在真机上运行/调试运行&#xff1a; 1、运行时&#xff0c;选择带H标识的entry&#xff1b; 2、运行设备选择已连接的真机&#xff1b; 3、运行中修改了文件后&#xff0c;点击H标识&#xff0c;在真机即刻可见效果。 注意约束条件&#xff1a; 只支持真…

硬盘信息,电脑硬盘查看工具

硬盘信息&#xff0c;电脑硬盘查看工具 硬盘信息&#xff0c;电脑硬盘查看工具

FPGA开发——状态机的使用

一、概述 我们在使用FPGA进行开发的过程当中&#xff0c;实现一个东西用得最多的实现方法就是状态机的实现方法&#xff0c;用一句话总结就是万物皆可状态机&#xff0c;这和我们在学习Linux时常说的在Linux中万物都是文件差不多&#xff0c;这里就主要就是突出状态机的应用范…

Moretl 文件同步工具 1.1.0.3

永久免费: 前往Gitee最新版本 更新内容 Winform全部切换到.Net Framework 4.0. 更符合大部分的自动化设备. Web提供.Net Framework 4.0的运行时环境安装包 Web打开时,若当前IP为设备,直接显示设备信息 介绍 用途: 定时全量或增量 采集工控机,办公电脑文件以及日志.(SCADA,I…

Apache JMeter是一款纯java编写负载功能测试和性能测试开源工具软件

​​ jmeter性能测试 1. Jmeter简介 Apache JMeter是一款纯java编写负载功能测试和性能测试开源工具软件。相比Loadrunner而言&#xff0c;JMeter小巧轻便且免费&#xff0c;逐渐成为了主流的性能测试工具&#xff0c;是每个测试人员都必须要掌握的工具之一。 本文为JMeter…

进口不锈钢309S螺栓的应用优势

进口不锈钢309S螺栓因其优异的性能和广泛的应用范围而在许多行业中备受青睐。309S不锈钢是一种含硫的易切削不锈钢&#xff0c;具有良好的耐高温和耐腐蚀性能&#xff0c;使其成为高温环境下理想的选择。下面我们就来详细探讨一下进口不锈钢309S螺栓的应用优势。 一、309S不锈钢…

请你谈谈:spring拦截器的应用-preHandle postHandle afterCompletion执行顺序问题的讨论?

首先我们&#xff0c;给出一个demo来看下拦截器方法执行顺序&#xff1a; import org.springframework.stereotype.Component; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView;import javax.servlet.http.…

第一百八十五节 Java XML教程 - Java DOM简介

Java XML教程 - Java DOM简介 DOM是标准的树结构&#xff0c;其中每个节点包含来自XML结构的一个组件。 XML文档中两种最常见的节点类型是元素节点和文本节点。 使用Java DOM API&#xff0c;我们可以创建节点&#xff0c;删除节点&#xff0c;更改其内容&#xff0c;并遍历节…

TypeScript 定义不同的类型(详细示例)

还是大剑师兰特&#xff1a;曾是美国某知名大学计算机专业研究生&#xff0c;现为航空航海领域高级前端工程师&#xff1b;CSDN知名博主&#xff0c;GIS领域优质创作者&#xff0c;深耕openlayers、leaflet、mapbox、cesium&#xff0c;canvas&#xff0c;webgl&#xff0c;ech…