【RocketMQ】004-Spring Boot 集成 RocketMQ

news2025/1/11 4:31:57

【RocketMQ】004-Spring Boot 集成 RocketMQ

文章目录

  • 【RocketMQ】004-Spring Boot 集成 RocketMQ
  • 一、基本使用
    • 1、创建 Spring Boot 项目,并引入 `RocketMQ` 依赖
    • 2、`application.yml` 配置
    • 3、消息生产者
    • 4、消息消费者
    • 5、消息调用接口
    • 6、启动 `RocketMQ`
    • 7、启动项目,并访问
    • 8、启动 `rocketmq-dashboard` ,并查看
      • 主题
      • 消费者
      • 消息
  • 二、常用消息种类
    • 1、常用消息种类
    • 2、普通消息
      • 代码示例
      • 代码解释
      • 单向消息
      • 同步消息
      • 异步消息
    • 3、延时消息
    • 4、顺序消息
    • 5、事务消息
    • 6、批量消息
  • 三、参考文章

一、基本使用

1、创建 Spring Boot 项目,并引入 RocketMQ 依赖

<!--Rocket MQ-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2、application.yml 配置

rocketmq:
  # NameServer地址
  name-server: localhost:9876
  # 生产者
  producer:
    # 发送同一类消息的设置为同一个group,保证唯一
    group: springboot_producer_group
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false
  # 消费者
  consumer:
    group: springboot_consumer_group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 10

3、消息生产者

package com.example.mq.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * mq 生产者
 *
 * @author zibo
 * @date 2023/5/17 15:48
 * @slogan 慢慢学,不要停。
 */
@Slf4j
@Service
public class MQProducerService {

    // 直接注入使用,用于发送消息到 broker 服务器
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     *
     * @param msg 消息可以是任何对象,如:String、Map、对象等
     */
    public void send(String msg) {
        // 写法一
        rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
        // 写法二
        // rocketMQTemplate.send("springboot_topic:test", MessageBuilder.withPayload(msg).build());
    }
}

4、消息消费者

package com.example.mq.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * mq 消费者
 *
 * @author zibo
 * @date 2023/5/17 15:48
 * @slogan 慢慢学,不要停。
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "springboot_topic", selectorExpression = "test", consumerGroup = "springboot_consumer_group")
public class MQConsumerService implements RocketMQListener<String> {

    // 监听到消息就会执行此方法
    @Override
    public void onMessage(String msg) {
        log.info("监听到消息:msg={}", msg);
    }
}

5、消息调用接口

package com.example.mq.controller;

import com.example.mq.service.MQProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * mq 接口
 *
 * @author zibo
 * @date 2023/5/17 15:48
 * @slogan 慢慢学,不要停。
 */
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {

    @Autowired
    private MQProducerService mqProducerService;

    @GetMapping("/send")
    public void send() {
        mqProducerService.send("测试消息");
    }

}

6、启动 RocketMQ

  • 启动 NameServer :双击 mqnamesrv.cmd 启动即可!
  • 启动 Broker :双击 mqbroker.cmd 启动即可!

7、启动项目,并访问

http://localhost:8080/rocketmq/send

控制台日志

2023-05-17 16:04:54.126  INFO 19772 --- [onsumer_group_1] c.example.mq.service.MQConsumerService   : 监听到消息:msg=测试消息

8、启动 rocketmq-dashboard ,并查看

主题

image-20230517160840104

消费者

image-20230517161012491

消息

image-20230517161106989

二、常用消息种类

1、常用消息种类

  1. 普通消息(Normal Message):普通消息是最常用的消息类型,一旦发送就会立即被投递给消费者进行消费
  2. 延时消息(Delay Message):延时消息是指发送后延迟一段时间后再投递给消费者。你可以指定延时级别,例如延迟 10 秒、1 分钟、1 小时等。
  3. 顺序消息(Orderly Message):顺序消息是指保证消息按照发送的顺序被消费的消息类型。你可以为同一消息队列中的消息指定相同的消息队列选择器(Message Queue Selector),从而保证消息按照发送顺序被消费。
  4. 事务消息(Transaction Message):事务消息是指将消息发送与本地事务操作相结合,可以保证消息和本地事务的最终一致性。发送事务消息时,你需要实现事务监听器(Transaction Listener)来执行本地事务和提交事务状态。
  5. 批量消息(Batch Message):批量消息是一次发送多条消息的方式,可以减少网络开销和提高消息吞吐量。你可以将多个消息封装成一个消息列表,然后使用批量发送方法一次性发送。

2、普通消息

代码示例

rocketMQTemplate.convertAndSend("springboot_topic:test", msg);

代码解释

  • rocketMQTemplate:是 RocketMQ 提供的用于发送消息的模板类,需要在 Spring Boot 中配置和注入。
  • convertAndSend方法:是 RocketMQTemplate 类的方法,用于将消息对象转换并发送消息。它接受两个参数:
    • "springboot_topic:test":是要发送消息的目标主题和标签。在这个示例中,主题"springboot_topic"标签"test"。你可以根据实际情况修改主题和标签。
    • msg:是要发送的消息内容。它可以是字符串、对象或其他数据类型。RocketMQTemplate 会根据消息内容的类型进行转换。

单向消息

/**
 * 发送单向消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendOneWay(String msg) {
    rocketMQTemplate.sendOneWay("springboot_topic:test", msg);
}

同步消息

/**
 * 发送同步消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public SendResult sendSync(String msg) {
    SendResult result = rocketMQTemplate.syncSend("springboot_topic:test", msg);
    log.info("发送结果:{}", result);
    return result;
}

异步消息

/**
 * 发送异步消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendAsync(String msg) {
    rocketMQTemplate.asyncSend("springboot_topic:test", msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("发送成功");
        }

        @Override
        public void onException(Throwable throwable) {
            log.info("发送失败");
        }
    });
}

3、延时消息

/**
 * 发送延时消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendDelay(String msg) {
    rocketMQTemplate.syncSendDelayTimeSeconds("springboot_topic:test", msg, 5);
}

4、顺序消息

/**
 * 发送顺序消息
 *
 * @param msg 消息可以是任何对象,如:String、Map、对象等
 */
public void sendOrderly(String msg) {
    // 第一条
    rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "1");
    // 第二条
    rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "2");
    // 第三条
    rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "3");
}

5、事务消息

6、批量消息

/**
 * 发送批量消息
 *
 * @param msgList 消息列表
 */
public void sendBatch(List<String> msgList) {
    List<Message<String>> rocketMQMessages = new ArrayList<>();
    for (String msg : msgList) {
        rocketMQMessages.add(MessageBuilder.withPayload(msg).build());
    }
    rocketMQTemplate.syncSend("springboot_topic:test", rocketMQMessages);
}

三、参考文章

SpringBoot整合RocketMQ,老鸟们都是这么玩的!

https://juejin.cn/post/7220075270664405052

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/538579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(1)LED

LED正负极&#xff1a;大红旗——负极&#xff0c;小红旗——正极 如何看原理图电阻/电容值&#xff1a; eg&#xff1a; 102 10 2 10 * 10 ^ 2 1000 473 47 3 47 * 10 ^ 3 47000单片机使用TTL电频&#xff1a;高电平&#xff08;逻辑1&#xff09;5V 低电平&#xff…

C语言操作符详解(上)

C语言操作符详解&#xff08;上&#xff09; 前言1. 算术操作符2. 移位操作符2.1 左移操作符(<<)2.2 右移操作符&#xff08;>>&#xff09; 3. 位操作符3.1 按位与&#xff08;&&#xff09;3.2 按位或&#xff08;|&#xff09;3.4 按位异或&#xff08;^&am…

(4)定时器

51单片机的定时器属于单片机的内部资源&#xff0c;其电路的连接和运转均在单片机内部完成 作用&#xff1a; 用于计时系统替代长时间Delay&#xff0c;提高运行效率和速度任务切换 STC89C52定时器资源&#xff1a; 定时器个数&#xff1a;3个&#xff08;T0,T1,T2&#xf…

【MySQL】MySQL 运算符

目录 一、运算符简述 二、运算符使用 1.算术运算符 1.1 加法运算符 1.2 减法运算符 1.3 乘法与除法运算符 1.4 求模&#xff08;求余&#xff09;运算符 2.比较运算符 2.1 等号运算符 2.2 安全等于运算符 2.3 不等于运算符 2.4 空运算符 2.5 非空运算符 2.6 最小…

深度剖析Mybatis-plus Injector SQL注入器

背景 在项目中需要同时操作Sql Server 以及 MySQL 数据库&#xff0c;可能平时直接使用 BaseMapper中提供的方法习惯 了&#xff0c;不用的话总感觉影响开发效率&#xff0c;但是两个数据库的SQL语法稍微有点差别&#xff0c;有些暴露的方法并不能直接使用&#xff0c;所以便想…

WebSocket的那些事(3-STOMP实操篇)

目录 一、序言二、STOMP详解1、STOMP简单介绍2、STOMP协议内容3、使用STOMP的好处 三、代码示例1、Maven依赖2、开启WebSocket消息代理3、控制器4、前端页面greeting.html 四、测试1、连接服务端2、发送消息 五、STOMP消息传播流程六、结语 一、序言 上节中我们在 WebSocket的…

(11)LCD1602液晶显示屏

LCD1602&#xff08;Liquid Crystal Display&#xff09;液晶显示屏是一种字符型液晶显示模块&#xff0c;可以显示ASCII码的标准字符和其它的一些内置特殊字符&#xff0c;还可以有8个自定义字符&#xff0c;自带芯片扫描 显示容量&#xff1a;162个字符&#xff0c;每个字符…

【C++】STL六大组件简介

STL(standard template libaray-标准模板库)&#xff1a;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组件库&#xff0c;而且是一个包罗数据结构与算法的软件框架。 1.STL的版本介绍 原始版本 Alexander Stepanov、Meng Lee 在惠普实验室完成的原始版本&#xff…

Unity里面CG和HLSL在写法上的一些区别

大家好&#xff0c;我是阿赵。这里继续讲URP相关的内容。 这次想讲的是CG和HLSL在写法上的一些区别。 一、为什么开始用HLSL 首先&#xff0c;基本上大家都知道的事情再说一遍。 三种Shader编程语言&#xff1a; 1、基于OpenGL的OpenGL Shading Language&#xff0c;缩写GLSL…

接口测试中postman环境和用例集

postman的环境使用 postman里有环境的设置&#xff0c;就是我们常说的用变量代替一个固定的值&#xff0c;这样做的好处是可以切换不同的域名、不同的环境变量&#xff0c;不同的线上线下账户等等场景。下面就看下怎么用吧。 创建一个Environment postman有一个envrionment&am…

Java是如何实现双亲委托机制的

Java 是一种面向对象的编程语言&#xff0c;它有一套独特的类加载机制。其中&#xff0c;双亲委托加载机制是 Java 类加载机制中的一个重要概念。本文将介绍 Java 的双亲委托加载机制是如何实现的&#xff0c;并解释其作用和优点。 Java 类加载机制 在 Java 中&#xff0c;类的…

瀑布流组件陷入商品重复怪圈?我是如何用心一解的!

目录 背景 瀑布流组件 什么是瀑布流组件 如何实现一个瀑布流组件 商品重复的原因 解决方案 方法一&#xff08;复杂&#xff0c;不推荐&#xff09;&#xff1a;标记位大法 方法二&#xff08;优雅&#xff0c;推荐&#xff09;&#xff1a;Promise 队列 大法 总结 背…

解决新思路#报错:ping: www.baidu.com: 未知的名称或服务--照着步骤来还是ping不通的可能原因

最近由ubantu转为centos7,配置hadoop&#xff0c;配置静态ip的过程中一直ping不通。之前配置ubantu的也是&#xff0c;终于这次在重启虚拟机和主机后发现了原因。 中途尝试过: 1.三次以上命令行反复操作 2.图形界面设置 3.看是否主机的网络适配器的网关与设置的IP地址产生冲突…

JavaScript实现计算100之间能被5整除的数的代码

以下为实现计算100之间能被5整除的数的程序代码和运行截图 目录 前言 一、计算100之间能被5整除的数 1.1 运行流程及思想 1.2 代码段 1.3 JavaScript语句代码 1.4 运行截图 前言 1.若有选择&#xff0c;您可以在目录里进行快速查找&#xff1b; 2.本博文代码可以根据题…

2023最新100道渗透测试面试题(附答案)

眨眼间2023年快过去一半了&#xff0c;不知道大家有没有找到心仪的工作呀&#xff0c;今天我给大家整理了100道渗透测试面试题给大家&#xff0c;需要答案的话可以在评论区给我留言哦~ 第一套渗透面试题 什么是渗透测试&#xff1f;它的目的是什么&#xff1f; 渗透测试的五个…

DirectX12 简单入门(一)

在很久以前写过关于DirectX9的一些应用&#xff0c;直到现在DirectX12已经普及了。写完几个DirectX12测试代码之后&#xff0c;写一篇DirectX12简单入门介绍一下基本概念&#xff0c;以及环境搭建和编程过程。 编程环境 与DirectX9不同&#xff0c;在DirectX12开发中微软将需…

『MySQL 实战 45 讲』“order by” 是怎么工作的

“order by” 是怎么工作的 首先创建一个表 CREATE TABLE t ( id int(11) NOT NULL, city varchar(16) NOT NULL, name varchar(16) NOT NULL, age int(11) NOT NULL, addr varchar(128) DEFAULT NULL, PRIMARY KEY (id), KEY city (city) ) ENGINEInnoDB;全字段排序 在 cit…

自己搭建go web 框架

思想base部分day1:封装gee封装context上下文封装tree路由树分组封装group与中间件封装文件解析封装封装错误处理 思想 web框架服务主要围绕着请求与响应来展开的 搭建一个web框架的核心思想 1 便捷添加响应路径与响应函数(base) 2 能够接收多种数据类型传入(上下文context) 3 构…

【Linux】Linux入门学习之常用命令五

介绍 这里是小编成长之路的历程&#xff0c;也是小编的学习之路。希望和各位大佬们一起成长&#xff01; 以下为小编最喜欢的两句话&#xff1a; 要有最朴素的生活和最遥远的梦想&#xff0c;即使明天天寒地冻&#xff0c;山高水远&#xff0c;路远马亡。 一个人为什么要努力&a…

支付系统设计五:对账系统设计01-总览

文章目录 前言一、对账系统构建二、执行流程三、获取支付渠道数据1.接口形式1.1 后台配置1.2 脚本编写1.2.1 模板1.2.2 解析脚本 2.FTP形式2.1 后台配置2.2 脚本编写2.2.1 模板2.2.2 解析脚本 四、获取支付平台数据五、数据比对1. 比对模型2. 比对器 总结 前言 从《支付系统设…