四、RocketMQ发送普通消息、批量消息和延迟消息

news2025/1/11 1:39:41

Producer发送普通消息的方式

1.同步发送消息

同步消息代表发送端发送消息到broker之后,等待消息发送结果后,再次发送消息
在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送消息
@Test
public void syncSend() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
    // 1.创建生产端,声明在哪个生产组
    DefaultMQProducer producer = new DefaultMQProducer("test_group");

    // 2.注册NameServer地址
    producer.setNamesrvAddr(NAME_SERVER_ADDR);

    // 3.构建Message实体,指定topic、tag、body
    Message message = new Message("test", "hello world".getBytes());

    // 4.启动生产端
    producer.start();

    // 5.发送消息
    SendResult sendResult = producer.send(message);
    System.out.println(sendResult.getSendStatus());
}

2.异步发送消息

异步消息代表发送端发送完消息后,会直接返回,但是可以注册一个回调函数,当broker将消息落盘后,回调这个回调函数
在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送消息,并且实现SendCallback接口

注:这里必须等待异步返回,否则消费者无法消费成功

@Test
public void asyncSend() throws  RemotingException, InterruptedException, MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("test_group");
    producer.setNamesrvAddr(NAME_SERVER_ADDR);
    Message message = new Message("test", "tag-a","hello world".getBytes());
    producer.start();
    CountDownLatch countDownLatch = new CountDownLatch(1);
    // 发送消息,并且实现SendCallback接口
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            countDownLatch.countDown();
            System.out.println("发送成功:" + sendResult.getSendStatus());
        }

        @Override
        public void onException(Throwable e) {
            countDownLatch.countDown();
            System.out.println("发送失败:" + e);
        }
    });
    countDownLatch.await();
}

3、发送单向消息

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短

在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送单向消息
@Test
public void sendOneWay() throws  RemotingException, InterruptedException, MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("test_group");
    producer.setNamesrvAddr(NAME_SERVER_ADDR);
    Message message = new Message("test","tag-a", "hello world".getBytes());
    producer.start();
    producer.sendOneway(message);
}

Producer发送批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

@Test
public void sendBatch() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
    producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);

    // 构造批量消息
    List<Message> list = new ArrayList<>();
    list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world0".getBytes(Charset.defaultCharset())));
    list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world1".getBytes(Charset.defaultCharset())));
    list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world2".getBytes(Charset.defaultCharset())));
    producer.start();

    // 发送批量消息
    producer.send(list);
    producer.shutdown();
}

**注:**需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。

Producer发送延迟消息

Producer想要发送延迟消息,只要设置Message的DelayTimeLevel属性大于0即可。

RocketMQ无法随意设置延迟消息的延迟时间,只能根据延迟级别进行

延迟级别和延迟时间的对应关系

延迟级别延迟时间延迟级别延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h
@Test
public void sendDelay() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
    producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);
    producer.start();

    Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes(Charset.defaultCharset()));
    // 设置延迟级别
    message.setDelayTimeLevel(3);

    // 发送批量消息
    SendResult sendResult = producer.send(message);
    System.out.println(sendResult.getSendStatus());
    producer.shutdown();
}

延迟消息的原理

延迟消息并不会直接发送到指定的topic,而是发送到一个延迟消息对应的topic中

当延迟消息的时间到达后,在将消息发送到指定的topic中

延迟消息投递的流程

  1. producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别

  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别的delayLevel-1

  3. mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列

  4. 根据消费偏移量offset从commitLog中解析出对应消息

  5. 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

  6. 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1091175.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文阅读之《Learn to see in the dark》

Learning to See in the Dark-CVPR2018 Chen ChenUIUC&#xff08;伊利诺伊大学厄巴纳-香槟分校&#xff09; Qifeng Chen, Jia Xu, Vladlen Koltun Intel Labs(英特尔研究院) 文章链接&#xff1a;https://arxiv.org/pdf/1805.01934.pdfhttps://arxiv.org/pdf/1805.01934.p…

使用hugging face开源库accelerate进行多GPU训练(单机多卡)时,在保存模型结构的时候出现的问题

目录 问题描述问题分析问题解决 问题描述 我在保存模型结构的时候&#xff0c;先获取模型参数&#xff0c;然后再保存&#xff0c;代码如下&#xff1a; 图示代码是在训练主循环中的&#xff1a; 这种情况下会出现报错&#xff1a; nboundLocalError: UnboundLocalErrorloc…

计算机毕业设计选什么题目好?springboot 医院门诊在线预约挂号系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

《C++ Primer》练习9.52:使用栈实现四则运算

栈可以用来使用四则运算&#xff0c;是一个稍微有点复杂的题目&#xff0c;去学习了一下用栈实现四则运算的原理&#xff0c;用C实现了一下。首先要把常见的中缀表达式改成后缀表达式&#xff0c;然后通过后缀表达式计算&#xff0c;具体的原理可以参考这位博主的文章&#xff…

本、硕、博区别真的辣么大吗?

61&#xff1a; 发际线已经说明了一切…… Super Mario&#xff1a; 小学&#xff0c;老师告诉学生&#xff1a;“森林里有只老虎&#xff0c;已经被我关在笼子里&#xff0c;我会带你去那个地方&#xff0c;然后给你一把猎枪&#xff0c;告诉你猎枪怎么用&#xff0c;并开枪…

RFID拓展的相关问答

基于&#xff1a; Research Reading: Smart Parking Applications Using RFID Technology-CSDN博客这篇文章总结了无线射频识别&#xff08;RFID&#xff09;技术在自动化中的应用及其在停车场管理系统中的解决方案。文章提到&#xff0c;RFID技术在自动化中可以降低交易成本&…

4. redis排名系统之C++实战操作对比MySQL

一、MySQL实现方法 假设我们要设计一款排名系统&#xff0c;那必然要涉及到两大类数据&#xff1a;武器数据和非武器的通用数据&#xff0c;它他通常有一个共用的属性&#xff1a;那就是主键唯一的&#xff0c;例如玩家的数字编号&#xff0c;通常在MySQL中是自增的无符号整数…

【牛客面试必刷TOP101】Day9.BM37 二叉搜索树的最近公共祖先和BM42 用两个栈实现队列

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;牛客面试必刷TOP101 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01;&#xff01;&…

pycharm安装汉化包失败解决方法

在pycharm -setting-plugins-搜索“Chinese”进入此界面&#xff1a; 点击install&#xff0c;在安装时出现&#xff1a;Plugin "Chinese (Simplified) Language Pack / 中文语言包" was not installed: Invalid filename returned by a server 解决方法&#xff1a…

boot分页

List<ElectricDispatchTodoPO> todoList electricDispatchTodoService.queryTodlList(vo, sysStaffVO);// 计算总记录数int total todoList.size();// 如果总记录数大于0PageInfo<ElectricDispatchTodoPO> pageInfo new PageInfo<>();if (total > 0) {…

手把手教你分析IIS日志——IP访问次数,URI访问统计等

配置IIS网站的日志 下载日志分析工具 https://gitee.com/tangdd369098655/open-network-disk 解压打开 选择文件 指定分析规则&#xff08;还可以自己写规则哦~~&#xff09; 运行规则进行分析 今天就写到这里啦~ 小伙伴们&#xff0c;(&#xffe3;ω&#xffe3;(&#x…

Win10 环境下 VS2022 暴力编译PP-OCRv4

1 环境准备 下载PaddleOCR PaddleOCR C 部署代码位于 PaddleOCR\deploy\cpp_infer目录下 复制cpp_infer目录下include和src到项目目录下paddle_inference paddle_inference opencv 这里使用已经安装好的opencv4.5.5下载dirent-master.zip 下载dirent-master.zip, 解压并复制d…

MyLife - Docker安装Consul

Docker安装Consul 个人觉得像consul之类的基础设施在线上环境直接物理机安装使用可能会好些。但是在开发测试环境用docker容器还是比较方便的。这里学习下docker安装consul使用。 1. Consul 镜像库地址 Consul 镜像库地址&#xff1a;https://hub.docker.com/r/hashicorp/consu…

CleanMyMacX4.12.3最新免费版mac电脑管家

当我们收到一台崭新的mac电脑&#xff0c;第一步肯定是找到一款帮助我们管理电脑运行的“电脑管家”&#xff0c;监控内存运行、智能清理系统垃圾、清理Mac大文件旧文件、消除恶意软件、快速卸载更新软件、隐私保护、监控系统运行状况等。基本在上mac电脑防护一款CleanMyMac就够…

C/C++陷阱——变量名和函数名的冲突问题

C语言/C陷阱——变量名和函数名的冲突问题 先来看这两串代码&#xff1a; 代码一&#xff1a; #include <stdio.h> #include <stdlib.h>int rand 1;int main() {printf("%d\n", rand);return 0; }代码二&#xff1a; #include <stdio.h> #inc…

芯片设计:一颗芯片到底是如何诞生的(上)

目录 芯片设计整体流程 小故事&#xff1a;苹果的芯片设计路 需求分析 架构设计 逻辑设计 前端设计与验证 逻辑综合 DFT&#xff08;可测试性设计&#xff09; 物理实现 小结 芯片设计整体流程 一颗芯片&#xff0c;是如何诞生的呢&#xff1f;其实一颗芯片项目就是…

vue引入jQuery

配置 下载 npm install jquery --save在build的webpack.base.conf中 var webpackrequire("webpack")在module.exports中: plugins: [ //   new webpack.optimize.CommonsChunkPlugin(common.js),new webpack.ProvidePlugin({jQuery: "jquery",$: &quo…

三十五、【进阶】MySQL性能查看

1、基础语法 show global status like Com_______; 2、实际操作 &#xff08;1&#xff09;查看当前数据库sql语句的执行频率 show global status like Com_______; &#xff08;2&#xff09; 执行依次select语句 &#xff08;3&#xff09; 再次查看当前数据库sql语句的执…

GBJ2510-ASEMI电源控制柜专用GBJ2510

编辑&#xff1a;ll GBJ2510-ASEMI电源控制柜专用GBJ2510 型号&#xff1a;GBJ2510 品牌&#xff1a;ASEMI 封装&#xff1a;GBJ-4 恢复时间&#xff1a;&#xff1e;50ns 正向电流&#xff1a;25A 反向耐压&#xff1a;1000V 芯片个数&#xff1a;4 引脚数量&#xf…