如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费

news2024/11/13 3:57:04

文章目录

      • 准备工作
      • 项目依赖
      • 配置 RocketMQ
      • 生产批量消息
      • 消费批量消息
      • 测试批量消息发送和消费
      • 总结
      • 推荐阅读文章

RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能。

准备工作

在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。

项目依赖

首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

这个依赖包包含了与 RocketMQ 集成所需的所有内容。

配置 RocketMQ

application.yml 文件中添加 RocketMQ 的相关配置:

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: batchConsumerGroup
  producer:
    group: batchProducerGroup
  • name-server:RocketMQ 服务的地址
  • consumer.group:消息消费的分组
  • producer.group:消息生产的分组

确保 name-server 地址是正确的,指向你的 RocketMQ 服务。

生产批量消息

创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class BatchProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendBatchMessages() {
        List<Message<String>> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();
            messages.add(message);
        }
        
        rocketMQTemplate.syncSend("BatchTopic", messages, 10000);
        System.out.println("批量消息发送成功!");
    }
}
  • 这里,我们创建了 10 条消息并将它们添加到列表 messages 中。
  • 调用 rocketMQTemplate.syncSend 方法将消息批量发送到主题 BatchTopic

消费批量消息

接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {

    @Override
    public void onMessage(List<String> messages) {
        System.out.println("批量接收到消息:");
        messages.forEach(message -> System.out.println("消息内容:" + message));
    }
}

在这段代码中:

  • @RocketMQMessageListener 注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题 BatchTopic 和消费分组 batchConsumerGroup
  • consumeMessageBatchMaxSize = 10 表示每次批量消费最多 10 条消息。
  • onMessage 方法会处理接收到的消息列表,并逐条打印出消息内容。

测试批量消息发送和消费

创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private BatchProducer batchProducer;

    @GetMapping("/sendBatchMessages")
    public String sendBatchMessages() {
        batchProducer.sendBatchMessages();
        return "批量消息已发送";
    }
}

通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。

  • 调用这个接口会将批量消息发送到 RocketMQ 主题 BatchTopic
  • BatchConsumer 会自动接收并批量处理这些消息。

总结

我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:

  1. 使用 BatchProducer 类批量发送消息。
  2. 使用 BatchConsumer 类批量消费消息,并设置最大批量大小。
  3. 通过简单的 REST API 控制消息发送,确保一切顺利。

批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。

推荐阅读文章

  • 由 Spring 静态注入引发的一个线上T0级别事故(真的以后得避坑)

  • 如何理解 HTTP 是无状态的,以及它与 Cookie 和 Session 之间的联系

  • HTTP、HTTPS、Cookie 和 Session 之间的关系

  • 什么是 Cookie?简单介绍与使用方法

  • 什么是 Session?如何应用?

  • 使用 Spring 框架构建 MVC 应用程序:初学者教程

  • 有缺陷的 Java 代码:Java 开发人员最常犯的 10 大错误

  • 如何理解应用 Java 多线程与并发编程?

  • 把握Java泛型的艺术:协变、逆变与不可变性一网打尽

  • Java Spring 中常用的 @PostConstruct 注解使用总结

  • 如何理解线程安全这个概念?

  • 理解 Java 桥接方法

  • Spring 整合嵌入式 Tomcat 容器

  • Tomcat 如何加载 SpringMVC 组件

  • “在什么情况下类需要实现 Serializable,什么情况下又不需要(一)?”

  • “避免序列化灾难:掌握实现 Serializable 的真相!(二)”

  • 如何自定义一个自己的 Spring Boot Starter 组件(从入门到实践)

  • 解密 Redis:如何通过 IO 多路复用征服高并发挑战!

  • 线程 vs 虚拟线程:深入理解及区别

  • 深度解读 JDK 8、JDK 11、JDK 17 和 JDK 21 的区别

  • 10大程序员提升代码优雅度的必杀技,瞬间让你成为团队宠儿!

  • “打破重复代码的魔咒:使用 Function 接口在 Java 8 中实现优雅重构!”

  • Java 中消除 If-else 技巧总结

  • 线程池的核心参数配置(仅供参考)

  • 【人工智能】聊聊Transformer,深度学习的一股清流(13)

  • Java 枚举的几个常用技巧,你可以试着用用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2238095.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于梧桐数据库的实时数据分析解决方案

一、背景 在当今信息时代&#xff0c;数据的价值不言而喻。然而&#xff0c;处理海量数据并将其转化为有意义的洞察力是一项艰巨的任务。传统的数据处理方法已经无法满足我们日益增长的需求。为了满足这一挑战&#xff0c;实时数据处理系统应运而生。 ​ 实时数据处理系统是一…

javascript实现国密sm4算法(支持微信小程序)

概述&#xff1a; 本人前端需要实现sm4计算的功能&#xff0c;最好是能做到分多次计算。 本文所写的代码在现有sm4的C代码&#xff0c;反复测试对比计算过程参数&#xff0c;成功改造成sm4的javascript代码&#xff0c;并成功验证好分多次计算sm4数据 测试平台&#xff1a; …

jmeter常用配置元件介绍总结之jsr223执行python脚本

系列文章目录 安装jmeter jmeter常用配置元件介绍总结之jsr223执行python脚本 1.安装jsr223执行python插件2.基础语法介绍2.1.log2.2.parameters向脚本传参与接参2.3.vars2.4.props2.5.prev 3.常用脚本3.1.MD5加密单个参数&#xff1a;3.2.MD5加密多个参数&#xff1a;3.3.URLe…

【MongoDB】MongoDB的聚合(Aggregate、Map Reduce)与管道(Pipline) 及索引详解(附详细案例)

文章目录 MongoDB的聚合操作&#xff08;Aggregate&#xff09;MongoDB的管道&#xff08;Pipline操作&#xff09;MongoDB的聚合&#xff08;Map Reduce&#xff09;MongoDB的索引 更多相关内容可查看 MongoDB的聚合操作&#xff08;Aggregate&#xff09; 简单理解&#xff…

快速了解SpringBoot 统一功能处理

拦截器 什么是拦截器&#xff1a; 拦截器是Spring框架提供的重要功能之一&#xff0c;主要进行拦截用户请求&#xff0c;在指定方法前后&#xff0c;根据业务需求&#xff0c;执行预先设定的代码。 也就是说,允许开发⼈员提前预定义⼀些逻辑,在⽤⼾的请求响应前后执⾏.也可以…

百兆网络变压器在无人机系统起到什么作用

华强盛电子 导读&#xff1a; 百兆网络变压器&#xff08;通常指的是100M Ethernet Transformer&#xff09;在无人机系统中扮演着重要的角色&#xff0c;尤其是在网络通信和电气隔离方面 1.电气隔离 网络变压器通过提供电气隔离&#xff0c;帮助保护无人机的电子设备免受电流…

在双显示器环境中利用Sunshine与Moonlight实现游戏串流的同时与电脑其他任务互不干扰

我和老婆经常会同时需要操作家里的电脑&#xff0c;在周末老婆有时要用电脑加班上网办公&#xff0c;而我想在难得的周末好好地Game一下&#xff08;在客厅用电视机或者平板串流&#xff09;&#xff0c;但是电脑只有一个&#xff0c;以往我一直都是把电脑让给老婆&#xff0c;…

【Vue】Vue3.0(十七)Vue 3.0中Pinia的深度使用指南(基于setup语法糖)

上篇文章&#xff1a; 【Vue】Vue3.0&#xff08;十一&#xff09;Vue 3.0 中 computed 计算属性概念、使用及示例 &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Vue专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年11月10日15点23分 文章…

深度学习基础练习:从pytorch API出发复现LSTM与LSTMP

2024/11/5-2024/11/7&#xff1a; 前置知识&#xff1a; [译] 理解 LSTM(Long Short-Term Memory, LSTM) 网络 - wangduo - 博客园 【官方双语】LSTM&#xff08;长短期记忆神经网络&#xff09;StatQuest_哔哩哔哩_bilibili 大部分思路来自于&#xff1a; PyTorch LSTM和LSTMP…

【芯智雲城】Sigmastar星宸科技图传编/解码方案

一、图传技术简介 图传是指将图像或媒体内容从一个设备传输到另外一个设备的技术&#xff0c;传输的媒介可以是无线电波、光纤、以太网等。图传系统主要由图像采集设备、传输设备和接收设备组成&#xff0c;图像采集设备负责采集实时图像&#xff0c;传输设备将采集到的图像转…

JavaFX史上最全教程 - Shape - JavaFX矩形椭圆

avaFX Shape类定义了常见的形状&#xff0c;如线&#xff0c;矩形&#xff0c;圆&#xff0c;Arc&#xff0c;CubicCurve&#xff0c;Ellipse和QuadCurve。 在场景图上绘制矩形需要宽度&#xff0c;高度和左上角的&#xff08;x&#xff0c;y&#xff09;位置。 要在JavaFX中…

【Windows修改Docker Desktop(WSL2)内存分配大小】

记录一下遇到使用Docker Desktop占用内存居高不下的问题 自从使用了Docker Desktop&#xff0c;电脑基本每天都需要重启&#xff0c;内存完全不够用&#xff0c;从16g扩展到24&#xff0c;然后到40G&#xff0c;还是不够用&#xff1b;打开Docker Desktop 运行时间一长&#x…

使用QLoRA和自定义数据集微调大模型

大家好&#xff0c;大语言模型&#xff08;LLMs&#xff09;对自然语言处理&#xff08;NLP&#xff09;的影响是非常深远的&#xff0c;不仅提高了任务效率&#xff0c;还催生出新能力&#xff0c;推动了模型架构和训练方法的创新。尽管如此强大&#xff0c;但LLMs也有局限&am…

Mac M1 Docker创建Rocketmq集群并接入Springboot项目

文章目录 前言Docker创建rocketmq集群创建rocketmq目录创建docker-compose.yml新增broker.conf文件启动容器 Springboot 接入 rocketmq配置maven依赖修改appplication.yml新增消息生产者新增消费者测试发送消息 总结 前言 最近公司给配置了一台mac&#xff0c;正好有时间给装一…

golang分布式缓存项目 Day2

注&#xff1a;该项目原作者&#xff1a;https://geektutu.com/post/geecache-day1.html。本文旨在记录本人做该项目时的一些疑惑解答以及部分的测试样例以便于本人复习。 支持并发读写 接下来我们使用 sync.Mutex 封装 LRU 的几个方法&#xff0c;使之支持并发的读写。在这之…

abap 可配置通用报表字段级日志监控

文章目录 1.功能需求描述1.1 功能1.2 效果展示2.数据库表解释2.1 表介绍3.数据库表及字段3.1.应用日志数据库抬头表:ZLOG_TAB_H3.2.应用日志数据库明细表:ZLOG_TAB_P3.3.应用日志维护字段配置表:ZLOG_TAB_F4.日志封装类5.代码6.调用方式代码7.调用案例程序demo1.功能需求描述 …

材质(三)——材质参数集和材质函数

a.之前是针对材质在材质蓝图里面 类似 于静态更改的方法&#xff0c; b.材质参数集 &#xff0c;对外开放参数&#xff0c;可以手动更改&#xff0c;已然是一种封闭的静态更改方法 c.那么材质函数&#xff0c;将参数集对外开放&#xff0c;可以在关卡蓝图 通过程序 算法 去动…

随机采样之接受拒绝采样

之前提到的逆变换采样&#xff08;Inverse Transform Sampling&#xff09;是一种生成随机样本的方法&#xff0c;它利用累积分布函数&#xff08;CDF&#xff09;的逆函数来生成具有特定分布的随机变量。以下是逆变换采样的缺点&#xff1a; 计算复杂性&#xff1a;对于某些分…

软件设计师:排序算法总结

一、直接插入 排序方式&#xff1a;从第一个数开始&#xff0c;拿两个数比较&#xff0c;把后面一位跟前面的数比较&#xff0c;把较小的数放在前面一位 二、希尔 排序方式&#xff1a;按“增量序列&#xff08;步长&#xff09;”分组比较&#xff0c;组内元素比较交换 假设…

信息安全工程师(78)网络安全应急响应技术与常见工具

前言 网络安全应急响应是指为应对网络安全事件&#xff0c;相关人员或组织机构对网络安全事件进行监测、预警、分析、响应和恢复等工作。 一、网络安全应急响应技术 网络安全应急响应组织 构成&#xff1a;网络安全应急响应组织主要由应急领导组和应急技术支撑组构成。领导组负…