RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

news2025/1/12 13:11:14

文章目录

    • 添加 RocketMQ 依赖
    • 消费者 Consumer
      • YAML 配置
      • 创建监听器
      • 消息过滤
        • Tag 过滤
    • 生产者 Producer
      • YAML 配置
      • 发送同步消息
      • 发送异步消息
      • 发送单向消息
      • 发送延迟消息
      • 发送顺序消息
      • 发送批量消息
      • 发送集合消息

添加 RocketMQ 依赖

  1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:

    image-20230527214713414

  2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    

消费者 Consumer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:
  name-server: 192.168.68.121:9876     # rocketMq的nameServer地址

创建监听器

创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component@RocketMQMessageListener,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"消息内容:"+msg);
    }

}

@RocketMQMessageListener 注解参数如下:

参数描述
topic消费者订阅的主题
consumerGroup消费者组
consumeMode消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY
messageModel消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING
selectorType过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92
selectorExpression过滤消息的表达式:Tag | SQL92【`tag1
maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。
delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

消息过滤

Tag 过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

编写并启动消费者项目订阅 tagTopic 主题:

@Component
@RocketMQMessageListener(topic = "tagTopic",
        consumerGroup = "boot-mq-group-consumer",
        selectorType = SelectorType.TAG,
        selectorExpression = "java")
public class MQMsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送一个带 Tag 的同步消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/tag")
    public String sendSyncMessage() {
        SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");
        return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId();
    }
    
}

运行项目,访问接口:http://localhost:8080/send/tag

image-20230528191958989

查看 RocketMQ 控制台,可以看到消息带有 java tag:

image-20230528191938535

查看消费者项目的 IDEA 控制台:

image-20230528191142421

生产者 Producer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:
  name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
  producer:
    group: boot-mq-group-producer # 生产者组名

注:生产者需要标注生产者组名,否则会报异常:'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

发送同步消息

编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/sync/{msg}")
    public String sendSyncMessage(@PathVariable String msg){
        SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);
        return "发送状态:"+result.getSendStatus()+"<br>消息id:"+result.getMsgId();
    }

}

运行项目,访问接口:http://localhost:8080/send/sync/同步消息

image-20230527231022909

访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

image-20230527231142472

发送异步消息

不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

编写 Controller,使用 RocketMQTemplate 的 asyncSend() 方法发送异步消息,并使用回调接口打印发送的结果:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/async/{msg}")
    public String sendAsyncMessage(@PathVariable String msg) {
        rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步消息发送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步消息发送失败");
            }
        });
        System.out.println("异步消息已发送完成");
        return "发送异步消息";
    }
  
}

运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

image-20230527232838438

访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

image-20230527233249499

发送单向消息

编写 Controller,使用 RocketMQTemplate 的 sendOneWay() 方法发送单向消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/oneWay/{msg}")
    public String sendOneWayMessage(@PathVariable String msg) {
        rocketMQTemplate.sendOneWay("oneWayTopic",msg);
        return "单向消息发送成功";
    }

}

运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

image-20230527233640217

访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

image-20230527233751658

发送延迟消息

编写并启动消费者项目订阅 delayTopic 主题:

@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/delay/{msg}")
    public String sendDelayMessage(@PathVariable String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        // 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);
        return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId()+"<br>消息发送时间:"+new Date();
    }

}

运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

image-20230528141811562

查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

image-20230528141834080

发送顺序消息

编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

public class Order {
    //订单号
    private String orderId;
    //订单名称
    private String orderName;
    //订单的流程顺序
    private String seq;
}

编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

@Component
@RocketMQMessageListener(topic = "orderlyTopic",
        consumerGroup="boot-mq-group-consumer",
        consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        System.out.println("消费者:"+message);
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly() 方法发送同步顺序消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/orderly")
    public String sendOrderlyMessage() {
        List<Order> orders = Arrays.asList(
                new Order(UUID.randomUUID().toString(), "下订单", "1"),
                new Order(UUID.randomUUID().toString(), "发短信", "1"),
                new Order(UUID.randomUUID().toString(), "物流", "1"),
                new Order(UUID.randomUUID().toString(), "签收", "1"),

                new Order(UUID.randomUUID().toString(), "下订单", "2"),
                new Order(UUID.randomUUID().toString(), "发短信", "2"),
                new Order(UUID.randomUUID().toString(), "物流", "2"),
                new Order(UUID.randomUUID().toString(), "签收", "2")
        );
        //控制流程:下订单->发短信->物流->签收
        //将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
        orders.forEach(order -> {
            rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());
        });
        return "发送成功";
    }

}

运行项目,访问接口:http:localhost:8080/send/orderly

image-20230528152807514

查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

image-20230528152925141

查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

image-20230528152848032

发送批量消息

编写并启动消费者项目订阅 batchOrderly 主题:

@Component
@RocketMQMessageListener(topic = "batchOrderly",
        consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        System.out.println(Thread.currentThread().getName()+":"+message);
    }
  
}

编写生产者 Controller,将消息打包成 Collection<Message> msgs 传入 syncSend() 方法中发送:

@RestController
public class ProducerController {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  @GetMapping("/send/batch")
  public String sendOrderlyMessage() {

    List<Message> messages = Arrays.asList(
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build()
    );
    return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();
    
  }

}

运行项目,访问接口:http:localhost:8080/send/batch

image-20230528161620859

查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

image-20230528161706194

查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

image-20230528161804943

发送集合消息

编写并启动消费者项目订阅 listTopic 主题:

@Component
@RocketMQMessageListener(topic = "listTopic",
        consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<List<Order>> {

    @Override
    public void onMessage(List<Order> orders) {
        orders.forEach(o -> {
            System.out.println(Thread.currentThread().getName()+":"+o);
        });
    }

}

编写生产者 Controller,将集合传入 syncSend() 方法中发送:

@RestController
public class ProducerController {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  @GetMapping("/send/list")
  public String sendOrderlyMessage() {

    List<Order> orders = Arrays.asList(
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1")
    );
    rocketMQTemplate.syncSend("listTopic",orders);
    return "发送成功";
  }

}

运行项目,访问接口:http:localhost:8080/send/list

image-20230528161620859

查看 RocketMQ 控制台,可以看到队列中一条消息:

image-20230528163701846

查看消费者项目的 IDEA 控制台,进行消费:

image-20230528163745691

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/580086.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Latex在同一figure中排版多张图片的方法

Latex在同一figure中排版多张图片的方法 主要使用了minipage&#xff08;子图&#xff09;语法。minipage可以嵌套&#xff0c;子图还可以分解为更多子图&#xff0c;功能很好玩&#xff0c;无聊可以自己试试。下面介绍几种常用效果的实现方法。 并排显示两张图&#xff0c;并…

StarRocks 中的数据模型和索引使用

一、StarRocks 数据模型 StarRocks 支持四种数据模型&#xff0c;分别是明细模型 (Duplicate Key Model)、聚合模型 (Aggregate Key Model)、更新模型 (Unique Key Model) 和主键模型 (Primary Key Model)。 1.1 明细模型 明细模型是默认的建表模型。如果在建表时未指定任何…

chatgpt赋能python:Python自动截屏教程

Python 自动截屏教程 介绍 Python 是一种高级程序设计语言&#xff0c;已广泛用于各种应用中&#xff0c;从 Web 开发到机器学习都有很多案例。其中&#xff0c;Python 的截屏功能得到了越来越多的关注&#xff0c;因为在很多应用场景中&#xff0c;自动截屏可以帮助我们更有…

【JVM】11. 垃圾回收及回收算法算法

文章目录 11.1. 垃圾回收概述11.1.1. 什么是垃圾&#xff1f;什么是垃圾&#xff1f; 11.1.2. 为什么需要GC11.1.3. 早期垃圾回收11.1.4. Java垃圾回收机制担忧GC主要关注的区域 11.2. 垃圾回收相关算法11.2.1. 标记阶段&#xff1a;引用计数算法方式一&#xff1a;引用计数算法…

C++数据结构:散列表简单实现(hash表)

文章目录 前言一、设计思想二、实现步骤1、定义节点2、定义Hash表类 三、数据示例总结 前言 散列表是一种常用的数据结构&#xff0c;它可以快速地存储和查找数据。散列表的基本思想是&#xff0c;将数据的关键字映射到一个有限的地址空间中&#xff0c;然后在该地址空间中存储…

Nacos源码-从Demo出发研究事件驱动与观察者模式的应用

在我们分析 Nacos 源码时&#xff0c;会看见大量的事件发布的动作&#xff0c;不管是客户端注册/下线、服务改变、服务订阅等等都是利用了事件发布。 下面我在自己的项目中&#xff0c;引入Nacos的依赖进行一个简单的demo的演示&#xff0c;我个人认为其和spring容器的listene…

Koa学习1:初始化项目

前言 作为前端开发者&#xff0c;最适合我们的后端就是node了&#xff0c;node的框架挺多的。选择Koa是因为国内用的挺多的、关于这方面的教程也很多、而且比较适合小项目。 学习教程是&#xff1a;【杰哥课堂】-项目实战-NodeKoa2从零搭建通用API服务 写这些文章&#xff0…

K8s in Action 阅读笔记——【5】Services: enabling clients to discover and talk to pods

K8s in Action 阅读笔记——【5】Services: enabling clients to discover and talk to pods 你已了解Pod以及如何通过ReplicaSets等资源部署它们以确保持续运行。虽然某些Pod可以独立完成工作&#xff0c;但现今许多应用程序需要响应外部请求。例如&#xff0c;在微服务的情况…

在Python中载入大量图片型数据集,与matlab结合使用时,如何解决RAM的占用爆炸性增长的问题

在Python中载入大量图片时&#xff0c;由于每张图片都会被转换成Numpy数组并存储在内存中&#xff0c;因此可能会导致RAM的占用爆炸性增长。为了减少RAM的使用&#xff0c;可以考虑采用以下方法&#xff1a; Python和Matlab结合使用。首先&#xff0c;可以使用Python的Pillow库…

【Linux】遇事不决,可先点灯,LED驱动的进化之路---1

【Linux】遇事不决&#xff0c;可先点灯&#xff0c;LED驱动的进化之路---1 前言&#xff1a; 一、最简单的LED驱动程序 1.1 字符设备驱动程序框架 1.2 程序实战 1.2.1 驱动程序&#xff08;led_drive_simple.c&#xff09; 1.2.2 应用程序&#xff08;led_test_simple.c…

C#,码海拾贝(25)——求解“三对角线方程组”的“追赶法”之C#源代码,《C#数值计算算法编程》源代码升级改进版

using System; namespace Zhou.CSharp.Algorithm { /// <summary> /// 求解线性方程组的类 LEquations /// 原作 周长发 /// 改编 深度混淆 /// </summary> public static partial class LEquations { /// <summary> /…

Apache Kafka - 理解Kafka内部原理

文章目录 Kafka的实现机制1. 集群成员关系&#xff1a;2. 控制器*&#xff1a;3. Kafka的复制&#xff1a;4. 请求处理&#xff1a;5. 物理存储&#xff1a; 导图 Kafka的实现机制 作为Kafka专家&#xff0c;我很高兴为您深入解释Kafka的实现机制。我将从以下几个方面对Kafka进…

ARM体系结构与异常处理

目录 一、ARM体系架构 1、ARM公司概述 ARM的含义 ARM公司 2.ARM产品系列 3.指令、指令集 指令 指令集 ARM指令集 ARM指令集 Thumb指令集 &#xff08;属于ARM指令集&#xff09; 4.编译原理 5.ARM数据类型 字节序 大端对齐 小端对齐 …

VTK安装和运行

创建日期: 2019-04-02 09:19:00 开始 学习资源 官方网站&#xff1a;https://vtk.org/ GitHub&#xff1a;https://github.com/Kitware/VTK 官方教程&#xff1a;https://vtk.org/Wiki/VTK/Tutorials 官方文档&#xff1a;https://vtk.org/documentation/ 用户手册&#…

RocketMQ 学习教程——(一)安装 RocketMQ

文章目录 RocketMQ 安装下载安装上传服务器配置环境变量修改 runserver.sh修改 runbroker.sh修改 broker.conf启动 安装 RocketMQ 控制台安装Linux 防火墙命令 Docker 安装 RocketMQ拉取镜像启动 NameServer 服务启动 Broker 服务启动控制台 RocketMQ 官网&#xff1a; http://…

​【编写UI自动化测试集】Appium+Python+Unittest+HTMLRunner​

简介 获取AppPackage和AppActivity 定位UI控件的工具 脚本结构 PageObject分层管理 HTMLTestRunner生成测试报告 启动appium server服务 以python文件模式执行脚本生成测试报告 下载与安装 下载需要自动化测试的App并安装到手机 获取AppPackage和AppActivity 方法一 有源码的…

算法11.从暴力递归到动态规划4

算法|11.从暴力递归到动态规划4 1.最长公共子序列 题意&#xff1a;给定两个字符串str1和str2&#xff0c;返回这两个字符串的最长公共子序列长度 比如 &#xff1a; str1 “a12b3c456d”,str2 “1ef23ghi4j56k” 最长公共子序列是“123456”&#xff0c;所以返回长度6 解…

【PowerShell】PowerShell 7.1 之后版本的安装

当前以下操作系统支持PowerShell 7.1 版本的安装,非Windows 系统支持的版本和要求有一定的限制。 Windows 8.1/10 (including ARM64)Windows Server 2012 R2, 2016, 2019, and Semi-Annual Channel (SAC)Ubuntu 16.04/18.04/20.04 (including ARM64)Ubuntu 19.10 (via Snap pa…

图的邻接矩阵表示

设图有n个顶点&#xff0c;则邻接矩阵是一个n*n的方阵&#xff1b;若2个顶点之间有边&#xff0c;则方阵对应位置的值为1&#xff0c;否则为0&#xff1b; 看几个例子&#xff1b; 此图的邻接矩阵是 0 1 1 1 1 0 1 0 1 1 0 1 1 0…

学习 xss+csrf 组合拳

目录 1.xss基础铺垫 1.1反射型xss 1.2存储型xss 1.3基于DOM的xss 1.4xss漏洞的危害 1.5xss漏洞的黑盒测试 1.6xss漏洞的白盒测试 2.csrf基础铺垫 2.1csrf攻击原理 2.2csrf攻击防护 3.应用案例 3.1存储型xsscsrf组合拳 3.2csrfselfxss组合拳 1.xss基础铺垫 跨站脚…