重学SpringBoot3-集成RocketMQ(一)

news2025/1/23 15:00:05

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成RocketMQ(一)

  • 环境准备
  • 1. 配置项目依赖
  • 2. 配置 RocketMQ 信息
    • 2.1配置文件
    • 2.2导入自动配置类
    • 2.3创建Topic
  • 3. 生产者代码示例
    • 3.1同步消息
    • 3.2 异步消息
    • 3.3 单向消息
    • 3.4顺序消息
    • 3.5延时消息
  • 4. 消费者代码示例
  • 5. 调用生产者发送消息
  • 6. 启动项目并验证
  • 7. 整合总结

Spring Boot 3 与 RocketMQ 整合,可以通过 Spring Messaging 结合 RocketMQ 的 rocketmq-spring-boot-starter 实现。在这个整合过程中,RocketMQ 作为消息队列系统,Spring Boot 负责提供应用框架,整合可以让开发者更加便捷地使用 RocketMQ 的生产和消费功能。今天就先介绍下SpringBoot3整合RocketMQ5.x,并给出常见消息类型代码示例。

环境准备

  • Spring Boot 3.x 项目
  • RocketMQ 服务器:版本V5.3,包括 NameServerBroker,可以本地搭建或者使用云服务,搭建部分后面单独出教程。
  • RocketMQ 依赖:Spring Boot 与 RocketMQ 的整合依赖 rocketmq-spring-boot-starter

1. 配置项目依赖

在 Spring Boot 项目的 pom.xml 中添加 RocketMQ 相关依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version> <!-- 或选择最新稳定版本 -->
</dependency>

2. 配置 RocketMQ 信息

application.yml 文件中配置 RocketMQ 的相关连接信息,包括 name-server 和其他基础配置。

2.1配置文件

rocketmq-spring-boot-starter 2.2.0(不含)以下版本

spring:
  rocketmq:
    name-server: localhost:9876  # NameServer 地址,集群使用';'隔开
    producer:
      group: springboot-producer-group  # 生产者组名称
      send-message-timeout: 3000
      retry-times-when-send-failed: 2
      retry-next-server: true
      access-key: RocketMQ    # 若启用了 ACL 功能
      secret-key: 12345678    # 若启用了 ACL 功能
    consumer:
      group: springboot-consumer-group  # 消费者组名称
      topic: test-topic  # 订阅的主题
      access-key: RocketMQ    # 若启用了 ACL 功能
      secret-key: 12345678    # 若启用了 ACL 功能

rocketmq-spring-boot-starter 2.2.0及其以上版本

rocketmq:
  name-server: localhost:9876  # NameServer 地址,集群使用';'隔开
  producer:
    group: springboot-producer-group  # 生产者组名称
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-next-server: true
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能
  consumer:
    group: springboot-consumer-group  # 消费者组名称
    topic: test-topic  # 订阅的主题
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能

2.2导入自动配置类

按照之前介绍的自动配置,想让 RocketMQ 配生效,需要在启动类上添加如下代码或单独写个配置类:

@Import(RocketMQAutoConfiguration.class)

否在会报错:A component required a bean of type ‘org.apache.rocketmq.spring.core.RocketMQTemplate’ that could not be found.

@SpringBootApplication
@Import(RocketMQAutoConfiguration.class)
public class SpringBoot308RocketmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBoot308RocketmqApplication.class, args);
    }
}

导入自动配置类

2.3创建Topic

示例代码仅一本地一个服务,即一个生产者和消费者,只需选一个broker,否在有些消息将无法消费。

创建Topic

3. 生产者代码示例

在 Spring Boot 项目中创建一个生产者服务,可以作为工具类,使用 RocketMQ 发送消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class RocketMQProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // 发送简单消息
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.println("Message sent: " + message);
    }
}

3.1同步消息

同步发送消息是指,Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。RocketMQ 同步消息的方法形如 syncXx()。

    /**
     * 同步类型消息
     *
     * @param topic
     * @param message
     */
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.syncSend(topic, message);
        System.out.println("Message sent: " + message);
    }

同步消息

3.2 异步消息

异步发送消息是指,Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。RocketMQ 同步消息的方法形如 asyncXx()。

/**
 * 异步类型消息
 *
 * @param topic
 * @param message
 */
public void asyncSendMessage(String topic, String message) {
    rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("Async message sent: " + message);
        }

        @Override
        public void onException(Throwable e) {
            System.out.println("Async message error: " + e);
        }
    });
    System.out.println("Message sent: " + message);
}

异步消息

3.3 单向消息

单向发送消息是指,Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK。该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。

    /**
     * 发送单向消息 
     *
     * @param topic
     * @param message
     */
    public void sendOneWayMessage(String topic, String message) {
        rocketMQTemplate.sendOneWay(topic, message);
        System.out.println("One way message sent: " + message);
    }

单向消息

3.4顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

    /**
     * 发送顺序消息
     */
    public void sendOrderlyMessage(String topic, String message, String shardingKey) {
        for (int i = 0; i < 10; i++) {
            String orderlyMessage = message + i;
            rocketMQTemplate.syncSendOrderly(topic, orderlyMessage, shardingKey);
            System.out.println("Orderly message sent: " + orderlyMessage + " with shardingKey: " + shardingKey);
        }
    }

顺序消息

3.5延时消息

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级默认有18个,可以在broker.conf中增加配置,然后重启broker:

# 延时等级
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

代码很简单:

    /**
     * 发送延迟消息
     *
     * @param topic
     * @param message
     * @param delayLevel
     */
    public void sendDelayedMessage(String topic, String message, int delayLevel) {
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 3000, delayLevel);
        System.out.println("Delayed message sent: " + message + " with delayLevel: " + delayLevel);
    }

延时消息

除此之外,RocketMQ 还支持事务消息、批量消息、消息过滤等,后面再详细介绍。

4. 消费者代码示例

使用 @RocketMQMessageListener 注解来订阅主题并监听消息的到达,处理消息的消费逻辑。

package com.example.boot308rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @author CoderJia
 * @create 2024/09/09 15:12
 * @Description
 **/
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "springboot-consumer-group")
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理接收到的消息
        System.out.println("Received message: " + message);
    }
}

5. 调用生产者发送消息

为了便于测试,创建一个简单的 Spring Boot Controller层代码,用于调用生产者发送消息。

/*
 * Copyright 2013-2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.boot308rocketmq.controller;

import com.example.boot308rocketmq.RocketMQProducer;
import jakarta.annotation.Resource;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * @author CoderJia
 * @create 2024/9/9 下午 15:08
 * @Description
 **/
@Controller
public class MessageController {

    @Resource
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/sendMessage")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        rocketMQProducer.sendOneWayMessage("test-topic", message);
        return ResponseEntity.ok("Message sent: " + message);
    }

    @GetMapping("/sendOrderlyMessage")
    public ResponseEntity<String> sendOrderlyMessage(@RequestParam String message) {
        rocketMQProducer.sendOrderlyMessage("test-topic", message, "orderKey");
        return ResponseEntity.ok("Message sent: " + message);
    }

    @GetMapping("/sendDelayedMessage")
    public ResponseEntity<String> sendDelayedMessage(@RequestParam String message, @RequestParam int delayLevel) {
        rocketMQProducer.sendDelayedMessage("test-topic", message, delayLevel);
        return ResponseEntity.ok("Delayed message sent: " + message + " with delayLevel: " + delayLevel);
    }
}

6. 启动项目并验证

  1. 启动 RocketMQ 的 NameServerBroker
  2. 启动 Spring Boot 项目。
  3. 打开浏览器或者使用 Postman 访问发送消息的接口:

普通消息

http://localhost:8080/sendMessage?message=HelloRocketMQ

顺序消息

http://localhost:8080/sendOrderlyMessage?message=HelloRocketMQ

延迟消息

http://localhost:8080/sendDelayedMessage?message=HelloDelayedRocketMQ&delayLevel=3

7. 整合总结

  • 生产者:通过 RocketMQTemplate 提供了发送消息的方法,包括同步消息、异步消息、顺序消息、延迟消息等。
  • 消费者:使用 @RocketMQMessageListener 注解,能够便捷地监听指定主题并消费消息。
  • 事务消息:RocketMQ 还支持事务消息,适合实现两阶段提交的事务模型,后面会着重介绍。

这种整合方式在 Spring Boot 3 中非常自然,并且 rocketmq-spring-boot-starter 进一步简化了配置和集成,使得开发者可以专注于业务逻辑的实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2134727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ERP进销存多仓库管理系统源码 带完整的安装代码包以及搭建部署教程

系统概述 ERP进销存多仓库管理系统是一款专为中小企业量身定制的集成化管理软件&#xff0c;它集成了采购管理、销售管理、库存管理、财务管理以及多仓库协同作业等核心模块。通过统一的平台&#xff0c;企业可以实时掌握商品从入库到出库的全过程&#xff0c;实现库存的自动化…

【OpenGL】OpenGL学习笔记(一):绘制三角形、初识VAO和VBO

文章目录 前言绘制目标 前言 最近开始研究用 QT 做开发来学习 OpenGL &#xff0c;想着别学完了就忘了&#xff0c;所以准备新开一个 OpenGL 专栏。开发环境已经搭好了&#xff0c;但是没弄教程&#xff0c;最近比较忙&#xff0c;暂时先把核心代码放过来&#xff0c;先开个草…

微信支付开发-需求整理及需求设计

一、客户要求 1、通过唤醒机器人参与答题项&#xff0c;机器人自动获取题目&#xff0c;用户进行答题&#xff1b; 2、用户答对题数与后台设置的一样或者更多&#xff0c;则提醒用户可以领取奖品&#xff0c;但是需要用户支付邮费&#xff1b; 3、用户在几天之内不能重复领取奖…

分布式新能源的能量管理

在新能源的概念下&#xff0c;可以将其定义为&#xff1a;新能源是指具有一定能量的清洁的可再生能源&#xff0c;它是一种新型的可持续发展的资源和能源。生物质能是借助各类绿色植物的光合作用实现能量转换。地热能主要来源于地球熔岩内部存在的天然热能&#xff0c;海洋能通…

Vue3项目打包报错-内存溢出解决方法

错误&#xff1a;FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory 1、安装cross-env和increase-memory-limit 命令行&#xff1a;npm install cross-env increase-memory-limit 2、package.json添加如下内容&a…

胤娲科技:一场前所未有的运维革命

嘿&#xff0c;朋友们&#xff0c;你们有没有想过&#xff0c;如果电信网络里突然来了位“超级大脑”&#xff0c;我们的生活会是啥样&#xff1f; 以前啊&#xff0c;网络一有点小情绪&#xff0c;运维小哥就得像侦探一样&#xff0c;层层抽丝剥茧找问题。但现在&#xff0c;大…

CMU 10423 Generative AI:HW1(编程部分:在GPT-2模型中实现RoPE、GQA)

完整代码和PDF笔记&#xff1a;https://github.com/YM2025/CMU_10423_2024S 文章目录 1 概述Rotary Positional Embeddings (RoPE)Grouped Query Attention (GQA)实验任务 2 项目文件1. requirements.txt2. input.txt3. chargpt.py4. mingpt/a. model.pyb. trainer.pyc. utils.…

从 Postgres 到 ClickHouse:数据建模指南

本文字数&#xff1a;7149&#xff1b;估计阅读时间&#xff1a;18 分钟 作者&#xff1a;Sai Srirampur 本文在公众号【ClickHouseInc】首发 上个月&#xff0c;我们收购了专注于 Postgres CDC 的 PeerDB。PeerDB 使得数据从 Postgres 复制到 ClickHouse 变得既快速又简单。Pe…

iceoryx共享内存通信

共享内存原理 当POSIX系统中的进程启动时,它会被赋予自己的虚拟地址空间。 虚拟地址空间跨越的范围对于不同的进程可能是相同的,但是在特定地址可访问的数据对于每个进程可能是不同的。 在进程的虚拟地址空间内,有许多“内存区域”用于加载或映射数据。这些内存区域通常是…

内存魔术师:精通内存函数的艺术

嘿嘿,家人们,今天咱们来详细剖析C语言中的内存函数,好啦,废话不多讲,开干! 目录 1.memcpy使用与模拟实现 1.1:memcpy的使用 1.2:memcpy的模拟实现 2:memmove的使用与模拟实现 2.1:memmove的使用 2.1.1:memcpy处理重叠空间 2.1.2:memmove处理重叠空间 2.2:memove的模拟实…

【机器学习随笔】基于kmeans的车牌类型分类注意点

kmeans是无监督的聚类算法&#xff0c;可用于数据的分类。本文尝试用kmeans对车牌类型进行分类&#xff0c;记录使用过程中的注意点。 kmeans使用过程中涉及两个大部分&#xff0c;模型与分析。模型部分包括训练模型和使用模型&#xff0c;分析部分主要为可视化分析。两部分的主…

这东西有点上头,不小心刷到天亮了。。。

相信很多每天勤奋刷题的小伙伴已经发现了&#xff0c;面试鸭又又又升级更新了&#xff01; 打开首页就让人眼前一亮&#xff0c;优化了岗位分类导航栏&#xff0c;找起目标题库更轻松了。毕竟鸭鸭目前已经有 6000 道面试题、上百个题库&#xff0c;一不小心就会淹没在浩瀚题海…

如何优化MySql的性能

优化MySQL的性能是一个复杂但至关重要的任务&#xff0c;它涉及到多个层面的调整和优化。以下是一些关键的步骤和策略&#xff0c;可以帮助你提高MySQL数据库的性能&#xff1a; 1. 优化数据库设计 选择合适的数据类型&#xff1a;确保你使用的数据类型是适合你的数据的&#…

Three.js 实战【4】—— 3D地图渲染

初始化场景&准备工作 在vue3threejs当中&#xff0c;初始化场景的代码基本上是一样的&#xff0c;可以参考前面几篇文章的初始化场景代码。在这里进行渲染3D地图还需要用到d3这个库&#xff0c;所以需要安装一下d3&#xff0c;直接npm i即可。 再从阿里云这里提供的全国各…

SQL server 6.5升级到SQL server 2019的方法

背景&#xff1a; 对日项目&#xff0c;客户的旧系统的数据库用的是SQL server 6.5&#xff0c;操作系统是windows NT。新系统要求升级到SQL server 2019&#xff0c;查了下资料发现旧系统的版本实在是太久远了&#xff0c;90年代的。 数据库部分的升级思路是这样的&#xff…

git 更新LingDongGui问题解决

今天重新更新灵动gui的代码&#xff0c;以便使用最新的arm-2d&#xff0c;本来以为是比较简单的一件事情&#xff08;因为以前已经更新过一次&#xff09;&#xff0c;却搞了大半天&#xff0c;折腾不易啊&#xff0c;简单记录下来&#xff0c;有同样遇到问题的同学参考&#x…

AI算法部署方式对比分析:哪种方案性价比最高?

随着人工智能技术的飞速发展&#xff0c;AI算法在各个领域的应用日益广泛。AI算法的部署方式直接关系到系统的性能、实时性、成本及安全性等多个方面。本文将探讨AI算法分析的三种主要部署方式&#xff1a;本地计算、边缘计算和云计算&#xff0c;并详细分析它们的优劣性。 一、…

基于vue框架的宠物交流平台1n2n3(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;会员,宠物信息,宠物类型,团队信息,申请领养,团队申请,领养宠物 开题报告内容 基于Vue框架的宠物交流平台开题报告 一、项目背景 随着现代生活节奏的加快与人们情感需求的日益增长&#xff0c;宠物已成为众多家庭不可或缺的重要成员。…

基于Python的影视数据可视化---附源码75141

摘 要 本文基于Python语言&#xff0c;设计并实现了一个影视数据可视化系统&#xff0c;包括首页、公告通知、新闻资讯和电影信息等功能模块。通过对影视数据的采集、处理和可视化展示&#xff0c;该系统旨在为用户提供全面的影视信息和数据分析服务。在研究背景中&#xff0c…

编译运行 webAssembly(wasm)

环境准备&#xff1a; lunix下docker 参考https://hub.docker.com/r/emscripten/emsdk 拉编译环境 docker pull emscripten/emsdk 编译 随便找个目录&#xff0c;敲下面命令&#xff0c;编译一个webAssembly 程序 # create helloworld.cpp cat << EOF > hellowo…