Springboot集成RocketMQ——简单使用

news2025/1/11 8:05:49

目录

1.MQ选型

2.RocketMQ基本架构

3.Springboot集成RocketMQ

4.顺序消息

5.延时消息

6.事务消息


1.MQ选型

目前市面上的MQ选型:主要分为3个类型

  1. Kafka:吞吐量大,且性能好,集群高可用;会丢失数据,功能较为单一(即场景单一,适合于数据量大且频繁,如日志分析等)
  2. RabbitMQ:消息可靠性高,功能全面;吞吐量较低,并发性能不高,消息积累会严重影响性能(即消息消费需较快)
  3. RocketMQ:高吞吐、高性能、高可用;官方文档及周边生态不成熟,客户端只支持java。

简而言之,Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。

2.RocketMQ基本架构

RocketMQ的基本架构如下图所示:

Producer,生产者:消息的生产者,一般为上游系统。

Topic,主题:消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。 主题的作用主要如下:(1)定义数据的分类隔离:将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。(2)定义数据的身份和权限:由于消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。

Queue,队列:队列是 RocketMQ 中消息存储和传输的实际容器,也是 RocketMQ 消息的最小存储单元。 RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。

Subscription,订阅关系:订阅关系是 RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。通过配置订阅关系,可控制如下传输行为:(1)消息过滤规则:用于控制消费者在消费消息时,选择主题内的哪些消息进行消费。(2)消费状态:RocketMQ 服务端默认提供订阅关系持久化的能力,即消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。

ConsumerGroup,消费者分组:消费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息。

Comsumer,消费者:消息的消费者,即对消息进行接收和处理的相关下游系统。

一般来说,在RocketMQ中,生产者生产出消息后,指定对应的Topic、订阅关系(Tags参数)、队列(hashkey参数)后,将消息发送至RocketMQ客户端;消费者对RocketMQ客户端进行监听,当监听到有自己订阅的Topic下的消息时,进行接收并进行消费。

3.Springboot集成RocketMQ

首先,引入相关依赖:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

其次,对RocketMQ进行配置:

server:
  port: 8080

spring:
  application:
    name: cloud-rocket-mq

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: test-group   #生产者组名,规定在一个应用里面必须唯一
    send-message-timeout: 5000  #消息发送的超时时间,单位ms
    retry-times-when-send-async-failed: 5   #异步消息发送失败重试的次数

RocketMQ支持我们异步发送普通消息。

普通消息是指:上游系统(生产者)将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。 

(1)生产者代码编写:

@Slf4j
@RestController
public class SendMessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/send")
    public void send(@RequestParam("message") String message) throws InterruptedException {
        //发送异步消息,参数:topic、消息
        rocketMQTemplate.convertAndSend("topic_test:tagA",message+"tagA");
        rocketMQTemplate.convertAndSend("topic_test:tagB",message+"tagB");
        log.info("已发送异步消息");
    }
}

(2)消费者代码编写:

@Service
@Slf4j
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "consumer_topic_test",selectorExpression = "tagA || tagC")
public class MessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("收到消息:"+s);
    }
}

(3)代码逻辑:

在生产者端,我们发送了一个消息到 topic_test 这一Topic下,并指定tagA订阅规则下的消费者组可以进行消费。

在消费者端,我们定义其消费者组名称,订阅关系为:订阅 topic_test 下的 tagA 或者 tagB消息,并进行消费。

可以看到,消费者成功监听到 topic_test:tagA 下的消息。

4.顺序消息

RocketMQ中可以发送顺序消息,即支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

如上图所示,在分布式系统中,我们有多个生产者,执行同一套代码,顺序消息可以保证系统按照多个生产者发出消息的前后顺序,进行顺序消费,如:以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

 代码:

//发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列
rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");
rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");

5.延时消息

即消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 RocketMQ 定时消息可以实现超时任务的检查触发。

代码:

//发送延时消息
rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build(), 3000, 2);

其中,第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h 

6.事务消息

 分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。

事务消息就是在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

简单来说就是,保证本地事务执行成功,消费者才会接受消息进行消费。

执行过程:

代码:

(1)生产者:

    @RequestMapping("/send/transaction")
    public void sendTransactionMessage(@RequestParam("msg") String msg){
        //发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
        //参数一:topic;参数二:消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:tagA"
                , MessageBuilder.withPayload(msg).build(),null);
        //发送状态
        String sendStatus = result.getSendStatus().name();
        //本地事务执行状态
        String localState = result.getLocalTransactionState().name();
        log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);

    }

(2)消费者端代码和上文相同,保持不变。

(3)本地事务:

/**
 * 生产者消息监听器:
 *    用于监听本地事务执行的状态和检查本地事务状态。
 * @author qzz
 */
@RocketMQTransactionListener
@Slf4j
public class TransactionMsgConfig implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务(在发送消息成功时执行)
     * @param message
     * @param o
     * @return commit or rollback or unknown
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        try {
            //处理业务
            String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
            log.info("执行本地业务,消息为:"+jsonStr);
            //模拟网络波动
            //Thread.sleep(35000);
            //被除数为0,模拟业务出错
            //int a = 10/0;
        }catch (Exception e){
            log.error("事务执行出错:"+e.getMessage());
            //返回ROLLBACK状态,进行回滚
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        log.info("事务提交,消息正常处理");
        //返回COMMIT状态的消息会立即被消费者消费到
        return RocketMQLocalTransactionState.COMMIT;

    }

    /**
     * 检查本地事务的状态
     * @param message
     * @return
     */
    @Override
    //超时、事务状态unknown等会调用该方法
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("消息回查");
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}


我们需要编写一个本地事务执行类继承 RocketMQLocalTransactionListener 类。

在该类中我们对本地事务的异常进行捕捉,如果出现异常,则返回 ROLLBACK执行状态,顺利执行,则最终返回 COMMIT状态。

如果出现超时等网络波动或是UNKNOWN状态等情况,该类则会调用 checkLocalTransaction方法,返回方法中定义的事务状态。

(4)执行:

1.顺利执行,消费者成功消费:

可以看到,消息成功发送,消费者成功消费。

2.本地事务出现异常:

 可以看到,本地事务抛出了异常,事务进行了回滚,消费者没有进行消费。

3.模拟超时

可以看到,当事务在一段时间内未返回对应事务状态 ,则会调用对应回查方法,直至事务成功返回事务执行状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/927915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux安装FileBrowser(简洁版)

项目简介安装目录 mkdir -p /opt/filebrowser/data && cd /opt/filebrowser 安装包下载 wget https://github.com/filebrowser/filebrowser/releases/download/v2.24.2/linux-amd64-filebrowser.tar.gz 注意&#xff1a;https://github.com/filebrowser/filebrowser/re…

Django(5)-视图函数和模板渲染

Django 中的视图的概念是「一类具有相同功能和模板的网页的集合」 在我们的投票应用中&#xff0c;我们需要下列几个视图&#xff1a; 问题索引页——展示最近的几个投票问题。 问题详情页——展示某个投票的问题和不带结果的选项列表。 问题结果页——展示某个投票的结果。 投…

vr内容编辑软件降低了虚拟现实项目开发门槛

VR虚拟场景编辑器是一种专门用于创建、修改和设计虚拟场景的工具。它利用vr虚拟现实技术&#xff0c;让用户可以在三维空间中直接对场景进行操作和编辑。这种编辑器的出现&#xff0c;使得用户可以更加直观、自由地进行场景设计和制作&#xff0c;为诸多领域带来了新的可能性。…

Qt串口通信学习文档

这是官方文档&#xff0c;我也在学习。 QSerialPort Class | Qt Serial Port 5.15.14https://doc.qt.io/qt-5/qserialport.html

AIGC 图表可视化案例实操

AIGC ChatGPT 对于职场办公提高办公效率那是肯定的。例如做数据分析与可视化如下图&#xff1a; 做这样的一个图表可视化分析&#xff0c;我们会用到HTML &#xff0c;JS&#xff0c;Echarts。 但是代码测试起来&#xff0c;与调试比较费时间&#xff0c;所以我们可以让AIGC …

前端需要理解的跨平台知识

混合开发是指使用多种开发模开发App的一种开发模式&#xff0c;涉及到两大类技术&#xff1a;原生 Native、Web H5。原生 Native 主要指 iOS&#xff08;Objective C&#xff09;、Android&#xff08;Java&#xff09;&#xff0c;原生开发效率较低&#xff0c;开发完成需要重…

Metasploit提权

一、bypassuac 用户账户控制&#xff08;User Account Control&#xff0c;简写作UAC)是微软公司在其Windows Vista及更高版本操作系统中采用的一种控制机制。其原理是通知用户是否对应用程序使用硬盘驱动器和系统文件授权&#xff0c;以达到帮助阻止恶意程序&#xff08;有时也…

编写c语言程序调用openssl编译出的动态链接库

文章目录 一、编译生成链接库二、示例一&#xff1a;调用RAND_bytes函数三、示例二&#xff1a;调用SHA256 一、编译生成链接库 下载安装openssl并编译生成链接库的过程在我的另一篇文章中已经详细说明了&#xff1a;Ubuntu中安装OpenSSL 此外&#xff0c;我们还需要提前了解…

Android studio之GridView使用

目录 效果图&#xff1a;![在这里插入图片描述](https://img-blog.csdnimg.cn/86e4a48a71164dec82613d58b1fbaa1c.jpeg)代码&#xff1a; 效果图&#xff1a; 代码&#xff1a; UserGridviewAdapter package com.example.gridviewpro.Adapter;import android.content.Contex…

Python练习 函数取列表最小数

练习2&#xff1a;构造一个功能函数&#xff0c;可以解决如下问题&#xff1a; 要求如下&#xff1a; 1&#xff0c;任意输入一个列表&#xff0c;函数可以打印出列表中最小的那个数&#xff0c; 例&#xff1a;输入: 23,56,67,4,17,9 最小数是 &#xff1a;4 方法一: #内置函…

AURIX TriCore内核架构学习笔记

名词缩写 ISA - Instruction Set Architecture&#xff0c;指令集架构PC - Program Counter, holds the address of the instruction that is currently runningGPRs - 32 General Purpose RegistersPSW - Program Status WordPCXI - Previous Context InformationCSA - Conte…

C语言练习2(巩固提升)

C语言练习2 选择题 前言 “志之所趋&#xff0c;无远弗届&#xff0c;穷山距海&#xff0c;不能限也。”对想做爱做的事要敢试敢为&#xff0c;努力从无到有、从小到大&#xff0c;把理想变为现实。要敢于做先锋&#xff0c;而不做过客、当看客&#xff0c;让创新成为青春远航的…

word电子报刊制作过程

随之网络的迅猛发展&#xff0c;利用计算机排版技术编辑制作电子报刊也很普及了。这里教大家如何将WODR转换成翻页的电子报刊 我们可以使用FLBOOK制作电子报刊&#xff0c;操作很简单 1.搜索FLBOOK制作电子杂志平台 2.点击登录与注册&#xff0c;可支持QQ、微信登录 3.现在点击…

Unity中的数学基础——贝塞尔曲线

一&#xff1a;前言 一条贝塞尔曲线是由一组定义的控制点P0到 Pn&#xff0c;n1为线性&#xff0c;n2为二次......第一个和最后一个控制点称为起点和终点&#xff0c;中间的控制点一般不会位于曲线上 获取两个点之间的点就是通过线性插值&#xff08; Mathf.Lerp&#xff09…

C++信息学奥赛1127:图像旋转

这段代码的功能是输入一个二维数组 arr&#xff0c;然后按列逆序输出该数组的元素。 #include<iostream> #include<cmath> #include <iomanip> using namespace std; int main() {int n,m; // 定义变量n和m&#xff0c;表示数组的行数和列数cin>>n>…

W5500-EVB-PICO进行UDP组播数据回环测试(九)

前言 上一章我们用我们的开发板作为UDP客户端连接服务器进行数据回环测试&#xff0c;那么本章我们进行UDP组播数据回环测试。 什么是UDP组播&#xff1f; 组播是主机间一对多的通讯模式&#xff0c; 组播是一种允许一个或多个组播源发送同一报文到多个接收者的技术。组播源将…

<C++> 类和对象-类的默认成员函数

1.类的默认成员函数 默认成员函数&#xff1a;用户没有显式实现&#xff0c;编译器会生成的成员函数称为默认成员函数。 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;编译器会…

kali安装使用dirsearch扫描文件

apt-get install dirsearch使用 irsearch -u [协议://域名 | ip端口]可以在这里面看到扫出来的结果 利用 发现了 通过githack将源码下来 知乎&#xff1a;AJEST安全实验室

[技术杂谈]MobaXterm中文乱码编码问题一种解决方法

今日使用mobaxterm连接树莓派发现安装出现乱码&#xff0c;看不清文字是什么。最最简单方式是ssh设置终端字体&#xff0c;具体步骤为&#xff1a; 1. 右键会话&#xff0c;点击编辑会话 2.在以下画面点击终端字体设置 3.选择编码&#xff1a;GBK或者ISO-8859-1

Git 版本控制系统

git相关代码 0、清屏幕&#xff1a;clear 1、查看版本号 git -v2、暂存、更改、提交 3、当前项目下暂存区中有哪些文件 git ls-files4、查看文件状态 git status -s5、暂时存储&#xff0c;可以临时恢复代码内容 git restore 目标文件 //&#xff08;注意&#xff1a;完全…