RocketMQ 消息传递模型

news2025/1/15 16:31:58

文章目录

  • 0. 前言
  • 1. RocketMQ的消息传递模型
    • 1.1. 同步发送
    • 1.2. 异步发送
    • 1.3. 单向发送
  • 2. RocketMQ的批量发送和消费
    • 2.1 批量发送
    • 2.2 批量消费
    • 2.3 Spring Boot集成RocketMQ官方starter 示例
  • 3. 总结
  • 4. 参考文档
  • 5. 源码地址

在这里插入图片描述

0. 前言

RocketMQ 支持6种消息传递方式,我们本次来聊三种消息传递模型,分别是可靠的同步传输、可靠的异步传输和单向传输。

  1. 可靠的同步传输(Reliable Synchronous Transmission):这是最常见的模型,生产者发送消息后,会等待消费者响应,确认消息已被消费者接收并处理。这种模式虽然可靠,但是由于需要等待确认,所以传输速度相对较慢。

  2. 可靠的异步传输(Reliable Asynchronous Transmission):在这种模型中,生产者发送消息后,不等待消费者的确认,直接返回,继续发送下一条消息。消费者在接收到消息后,会异步地确认消息。这种模式的传输速度较快,但是可能会存在消息丢失的风险。

  3. 单向传输(One-way Transmission):这种模型更加简单,生产者只负责发送消息,不关心消费者是否接收和处理,也不需要任何确认。这种模式通常用于对可靠性要求不高,但对速度要求高的场景,比如日志收集。

1. RocketMQ的消息传递模型

Spring boot 集成很简单
1.配置依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.version}</version>
</dependency>
  1. application.properties文件中配置RocketMQ的相关属性:
spring.rocketmq.name-server=127.0.0.1:9876
spring.rocketmq.producer.group=producerGroup

1.1. 同步发送

  • 定义:
    RocketMQ同步发送是指生产者发送消息后,会在收到服务器返回确认的应答后才会发送下一条消息。这样发送消息的方式会增加消息发送的耗时,但能够确保消息被服务器成功接收。

  • 适用场景:
    对于一些重要的消息通知、短信通知、短信营销系统等,需要确保消息的准确无误的到达,可以采用RocketMQ的同步发送方式。

  • Springboot 集成使用示例:通过RocketMQTemplate的syncSend方法发送消息。

// 发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendMessage() {
    rocketMQTemplate.syncSend("my-topic", "Hello, RocketMQ");
}

  1. 同步发送方式会阻塞当前线程,直到服务器返回响应,因此需要考虑到这种方式可能会影响系统的吞吐量。
  2. RocketMQ的同步发送方式能够保证消息的可靠性,但也需要保证RocketMQ服务器的高可用,防止服务器出现问题导致消息丢失。
  3. 在使用RocketMQ的同时,还需要注意消息的顺序性和消费者的消费能力,避免出现消息堆积的情况。

1.2. 异步发送

  • RocketMQ异步发送是指在发送消息时,不等待消息发送结果的返回,而是通过回调函数来处理消息发送的结果。

  • 适用场景:

    • 需要发送大量消息,并且对消息发送的响应时间要求不高的场景。
    • 需要异步处理消息发送结果的场景,例如发送短信、邮件等通知类消息。

使用RocketMQ的RocketMQTemplate发送消息

@Service
public class MessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 处理发送成功的逻辑
            }

            @Override
            public void onException(Throwable throwable) {
                // 处理发送异常的逻辑
            }
        });
    }
}

在需要发送消息的地方调用MessageProducersendMessage方法:

@RestController
public class MessageController {
    
    @Autowired
    private MessageProducer messageProducer;
    
    @GetMapping("/send")
    public String sendMessage() {
        String topic = "testTopic";
        String message = "Hello RocketMQ!";
        messageProducer.sendMessage(topic, message);
        return "Message sent successfully!";
    }
}
  • 异步发送消息需要通过回调函数来处理发送结果,需要考虑回调函数的执行时间和顺序,以确保消息发送的可靠性。
  • 异步发送消息可能会导致消息发送的顺序不确定,需要在接收端进行相关处理,保证消息的处理顺序。
  • 异步发送消息时,需要注意控制并发量,避免发送过多消息导致系统负载过高。

1.3. 单向发送

  • 定义:单向发送是指消息生产者发送消息后,不等待服务器回执响应,即发送后不关心是否到达broker。这种方式发送消息的过程网络开销最小,速度最快。

  • 适用场景:适用于某些耗时非常短,但是对可靠性要求并不高的场景,比如日志收集。

    • 单向发送方式并不能保证消息一定会被成功消费,因为它并不关心消息是否正确到达broker,所以如果你的业务对消息的可靠性有较高要求,不推荐使用单向发送。
    • 在大流量的情况下,单向发送方式由于其网络开销小,速度快的特点,可以显著提高系统的吞吐量。

然后通过RocketMQTemplatesendOneWay方法来发送单向消息:

 @Service
 public class MyProducer {
     @Autowired
     private RocketMQTemplate rocketMQTemplate;
 
     public void sendMsg(String topic, String msg) {
         // 这里的topic需要和你在RocketMQ中设置的topic相对应
         rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msg).build());
     }
 }

2. RocketMQ的批量发送和消费

2.1 批量发送

批量发送的优点和使用场景:
优点:批量发送可以减少网络开销,提高消息传输的吞吐量,特别是在网络带宽充足的情况下。使用场景:适合大量小消息的发送,例如日志收集,统计数据等。

如何进行批量发送:

List<Message> msgs = new ArrayList<>();
msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(msgs);
} catch (Exception e) {
    e.printStackTrace();
}

2.2 批量消费

批量消费的优点和使用场景:
优点:批量消费可以减少消费者与消息队列的通信次数,提高消费效率。使用场景:适合处理大量小消息的场景,例如日志处理,统计数据等。

如何进行批量消费:
在RocketMQ中,批量消费主要通过设置consumer的consumeMessageBatchMaxSize属性,一次性从队列中拉取多条消息。

consumer.setConsumeMessageBatchMaxSize(10);  //一次消费10条消息

2.3 Spring Boot集成RocketMQ官方starter 示例

批量消费的前提是生产者发送的是批量消息。这个由于RocketMQ的设计,目前的版本中并不支持批量消费单条发送的消息。
这里以Spring Boot集成RocketMQ官方starter为例,首先在pom.xml中添加依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

批量发送示例:

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendBatchMessages() {
    List<Message> msgs = new ArrayList<>();
    msgs.add(new Message("TopicA", "TagA", "OrderID001", "Hello world 0".getBytes()));
    msgs.add(new Message("TopicA", "TagA", "OrderID002", "Hello world 1".getBytes()));
    msgs.add(new Message("TopicA", "TagA", "OrderID003", "Hello world 2".getBytes()));
    rocketMQTemplate.syncSend("TopicA",msgs);
}

批量消费示例:

@Service
@RocketMQMessageListener(topic = "TopicA", consumerGroup = "my-consumer_group")
public class BatchConsumer implements RocketMQListener<List<String>> {
    @Override
    public void onMessage(List<String> messages) {
        for (String msg : messages) {
            System.out.println("Receive message: " + msg);
        }
    }
}

3. 总结

同步传输模型(Synchronous)
在同步传输模型中,消息发送方(Producer)发送消息后会一直等待消息被确认(Acknowledgement)后才继续执行后续操作。消息接收方(Consumer)在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型保证了消息的可靠性,但会造成消息发送方的阻塞。

异步传输模型(Asynchronous)
在异步传输模型中,消息发送方发送消息后不会立即等待确认,而是继续执行后续操作。消息接收方在接收到消息后,会发送确认消息给消息发送方,告知消息已经成功接收。这种模型可以提高消息发送方的吞吐量,但消息的可靠性需要通过设置重试和回调机制来保证。

单向传输模型(Oneway)
在单向传输模型中,消息发送方发送消息后不会等待确认,也不会接收到消息接收方的确认消息。消息发送方无法得知消息是否成功接收,也无法进行重试。这种模型适用于对消息可靠性要求不高,但对发送性能要求较高的场景,如日志记录等。

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1008529.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java 基础篇】Java 泛型:类型安全的编程指南

在 Java 编程中&#xff0c;泛型是一项强大的特性&#xff0c;它允许您编写更通用、更安全和更灵活的代码。无论您是初学者还是有经验的 Java 开发人员&#xff0c;了解和掌握泛型都是非常重要的。本篇博客将从基础概念一直深入到高级应用&#xff0c;详细介绍 Java 泛型。 什…

nrf52832蓝牙GAP 通用访问规范

nrf52832蓝牙GAP 通用访问规范 文章目录 nrf52832蓝牙GAP 通用访问规范前言一、蓝牙GAP&#xff08;通用访问配置文件&#xff09;可以设置什么参数&#xff1f;二、使用步骤广播名称修改广播名字长度&#xff1b;全显示和自定义显示中文显示广播名称 蓝牙图标没有图标加入图标…

ArmSom-W3开发板之PCIE的开发指南(一)

1. 简介 RK3588从入门到精通本⽂介绍RK平台配置pcie的方法开发板&#xff1a;ArmSoM-W3 2、PCIE接口概述 PCIe&#xff08;Peripheral Component Interconnect Express&#xff09;是一种用于连接计算机内部组件的高速接口标准。以下是关于PCIe接口的简要介绍&#xff1a; …

【计算机网络】 TCP流量控制——滑动窗口和累积应答

文章目录 累积应答TCP流量控制——滑动窗口 累积应答 我们前面所说的是我们每发送一个包对端就要回一个ack&#xff0c;那么这样效率太慢了&#xff0c;我们这里就有一个累积应答的机制&#xff0c;就是说我们客户端累积发送多个包&#xff0c;然后服务端再统一进行回复。 TCP…

被“多元平等共融”种草——2023谷歌开发者大会参会体验

谷歌开发者大会又称Google I/O Connect&#xff0c;是谷歌公司每年一次举办的开发者年会&#xff0c;谷歌中国的开发者大会按照惯例是在每年9月份的上海世博中心举办&#xff0c;为期两天。这两天中&#xff0c;通过主旨大会和多场连续专题演讲以及现场演示向参会人员展示谷歌产…

【Teams】Teams的组织名称变更

最近在使用Teams的过程中&#xff0c;发现有些企业创建组织的过程中创建了默认的组织&#xff1a;MSFT。如果创建组织的过程中没有修改组织名称&#xff0c;我们就会发现默认的组织名称就是MSFT。如果多个企业没有更改MSFT则可能在切换Teams账户的时候可能不知道目前Teams切换的…

Jetpack Compose基础组件之 — Text

Text的源码参数预览 Composable fun Text(text: String,modifier: Modifier Modifier,color: Color Color.Unspecified,fontSize: TextUnit TextUnit.Unspecified,fontStyle: FontStyle? null,fontWeight: FontWeight? null,fontFamily: FontFamily? null,letterSpac…

YOLO物体检测-系列教程1:YOLOV1整体解读(预选框/置信度/分类任/回归任务/损失函数/公式解析/置信度/非极大值抑制)

&#x1f388;&#x1f388;&#x1f388;YOLO 系列教程 总目录 YOLOV1整体解读 YOLOV2整体解读 YOLOV1提出论文&#xff1a;You Only Look Once: Unified, Real-Time Object Detection 1、物体检测经典方法 two-stage&#xff08;两阶段&#xff09;&#xff1a;Faster-rc…

PMP-项目启动过程组的重要性

一、什么是项目启动过程组 启动过程组包括定义一个新项目或现有项目的一个新阶段&#xff0c;授权开始该项目或阶段的一组过程。启动过程组的目的是&#xff1a;协调相关方期望与项目目的&#xff0c;告知相关方项目范围和目标&#xff0c;并商讨他们对项目及相关阶段的参与将如…

flask查询工具

fist_index.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>电话查询工具</title> </head> <body><table><form action"/search_phone" method"get&…

《PostgreSQL备份与恢复:步骤与最佳实践》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f405;&#x1f43e;猫头虎建议程序员必备技术栈一览表&#x1f4d6;&#xff1a; &#x1f6e0;️ 全栈技术 Full Stack: &#x1f4da…

将vue项目变成可发布的npm包项目

第一步&#xff1a; 在main.ts 文件的平级上新建一个index.ts文件 &#xff0c;文件中导出你想发布的组件 第二步&#xff1a; 在package.json文件的平级上新建index.js文件 第三步&#xff1a; 修改package.json文件&#xff0c;新增命令 "buildnpm": "vu…

c#设计模式-创建型模式 之 建造者模式

简介&#xff1a; 将一个复杂对象的构建与表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。提供了一种创建对象的最佳方式。一个 Builder 类会一步一步构造最终的对象。该 Builder 类是独立于其他对象的。意图是将一个复杂的构建与其表示相分离&#xff0c;使得同样…

LeetCode_模拟_中等_2596.检查骑士巡视方案

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 骑士在一张 n x n 的棋盘上巡视。在有效的巡视方案中&#xff0c;骑士会从棋盘的左上角出发&#xff0c;并且访问棋盘上的每个格子恰好一次 。 给你一个 n x n 的整数矩阵 grid &#xff0c;由范围 [0, n * …

SAP 委外联产品 如何分摊加工费 ?

SAP 委外联产品 如何分摊加工费 &#xff1f; 目前对委外联产品分摊加工费还没好办法&#xff0c;看上去与委外副产品业务是一样的&#xff0c;除了主数据设置多了一些。 委外物料与联产品物料都设置S价&#xff0c;跑物料分类账时根据主数据设置分摊规则将差异分摊到对应的物…

获取Windows 10中的照片(旧版)下载

Windows 10中的新版照片应用&#xff0c;目前发现无法直接打开部分iOS设备上存储的照片。需要使用照片&#xff08;旧版&#xff09;才行。 但目前应用商店中无法直接搜索到照片&#xff08;旧版&#xff09;&#xff0c;因此笔者提供如下链接&#xff0c;可以直接访问并呼出W…

Swift学习内容精选(二)

Swift 类是构建代码所用的一种通用且灵活的构造体。 我们可以为类定义属性&#xff08;常量、变量&#xff09;和方法。 与其他编程语言所不同的是&#xff0c;Swift 并不要求你为自定义类去创建独立的接口和实现文件。你所要做的是在一个单一文件中定义一个类&#xff0c;系…

详解带头双向循环列表

目录 前言 一、带头双向循环链表的结构 二、 带头双向循环链表的实现 2.1链表的创建 2.2开辟新的结点 2.3初始化 2.4释放销毁 2.5链表的打印 2.7尾插 2.8尾删 2.9头插 2.10头删 三、带头双向循环链表中间随机值的插入和删除 3.1在pos位置插入x 3.2删除pos位置的…

C#回调函数学习1

回调函数&#xff08;Callback Function&#xff09;是一种函数指针&#xff0c;它指向的是由用户自己定义的回调函数。我们将这个回调函数的指针作为参数传递给另外一个函数&#xff0c;在这个函数工作完成后&#xff0c;它将通过这个回调函数的指针来回调通知调用者处理结果。…

XREAL 联合创始人吴克艰谈AR:下一代计算平台及其关键技术

// 编者按&#xff1a;一种行业观点是&#xff0c;AR或是未来十年、三十年的革命性技术&#xff0c;是下一代计算平台。近半个世纪&#xff0c;我们总能听到苹果在AR行业的创新动作&#xff0c;开辟了新的硬件范式。AR/VR行业为苹果不断欢呼的同时&#xff0c;激发了人们的好…