芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门

news2025/3/25 23:25:57

1. 概述

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

Spring Cloud Stream

2.1 Spring Cloud Stream 是什么

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,它通过 Spring Integration 与消息中间件(如 RabbitMQ、Kafka、RocketMQ)进行连接。

2.2 核心概念

  • Binder:与消息中间件集成的组件,负责创建对应的 Binding。
  • Binding:消息中间件与应用程序之间的桥梁,分为 Input Binding(用于消费消息)和 Output Binding(用于生产消息)。

2.3 Broker 的角色

Broker 是消息队列中间件的代理服务器,负责存储消息、转发消息。例如,在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储,同时为消费者的拉取请求作准备。

三、快速入门

3.1 搭建生产者

3.1.1 引入依赖

pom.xml 中引入 Spring Cloud Alibaba RocketMQ 相关依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3.1.2 配置文件

application.yaml 中添加 Spring Cloud Alibaba RocketMQ 相关配置:

spring:
  application:
    name: demo-producer-application
  cloud:
    stream:
      bindings:
        demo01-output:
          destination: DEMO-TOPIC-01
          content-type: application/json
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          demo01-output:
            producer:
              group: test
              sync: true
3.1.3 创建 MySource 接口

声明名字为 Output Binding:

public interface MySource {
    @Output("demo01-output")
    MessageChannel demo01Output();
}
3.1.4 创建 Demo01Message 类

作为示例消息:

public class Demo01Message {
    private Integer id;
    // getter 和 setter 方法
}
3.1.5 创建 Demo01Controller 类

提供发送消息的 HTTP 接口:

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {
    @Autowired
    private MySource mySource;

    @GetMapping("/send")
    public boolean send() {
        Demo01Message message = new Demo01Message().setId(new Random().nextInt());
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();
        return mySource.demo01Output().send(springMessage);
    }
}
3.1.6 创建 ProducerApplication 类

启动应用:

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

3.2 搭建消费者

3.2.1 引入依赖

与生产者类似,引入 Spring Cloud Alibaba RocketMQ 相关依赖。

3.2.2 配置文件

application.yaml 中添加消费者相关的配置:

spring:
  application:
    name: demo-consumer-application
  cloud:
    stream:
      bindings:
        demo01-input:
          destination: DEMO-TOPIC-01
          content-type: application/json
          group: demo01-consumer-group-DEMO-TOPIC-01
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          demo01-input:
            consumer:
              enabled: true
              broadcasting: false
3.2.3 创建 MySink 接口

声明名字为 Input Binding:

public interface MySink {
    String DEMO01_INPUT = "demo01-input";

    @Input(DEMO01_INPUT)
    SubscribableChannel demo01Input();
}
3.2.4 创建 Demo01Message 类

与生产者一致。

3.2.5 创建 Demo01Consumer 类

消费消息:

@Component
public class Demo01Consumer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @StreamListener(MySink.DEMO01_INPUT)
    public void onMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}
3.2.6 创建 ConsumerApplication 类

启动应用:

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

四、定时消息

4.1 定时消息的概念

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

4.2 实现定时消息

在发送消息时,通过设置消息的延迟级别来实现定时消息。例如:

@GetMapping("/send_delay")
public boolean sendDelay() {
    Demo01Message message = new Demo01Message().setId(new Random().nextInt());
    Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费
            .build();
    return mySource.demo01Output().send(springMessage);
}

五、消费重试

5.1 消费重试的机制

当消息消费失败时,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer,让 Consumer 有机会重新消费消息。

5.2 配置消费重试

在配置文件中设置消费重试相关的配置项:

spring:
  cloud:
    stream:
      bindings:
        demo01-input:
          consumer:
            max-attempts: 1
      rocketmq:
        bindings:
          demo01-input:
            consumer:
              delay-level-when-next-consume: 0

六、消费异常处理机制

6.1 异常处理的方式

Spring Cloud Stream 提供了通用的消费异常处理机制,可以通过 @ServiceActivator@StreamListener 注解订阅错误通道,实现自定义的异常处理逻辑。

6.2 实现异常处理

在消费者中添加异常处理方法:

@Component
public class Demo01Consumer {
    // ...

    @ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")
    public void handleError(ErrorMessage errorMessage) {
        logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
    }

    @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void globalHandleError(ErrorMessage errorMessage) {
        logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
    }
}

七、广播消费

7.1 广播消费的概念

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

7.2 配置广播消费

在配置文件中设置 broadcasting 配置项为 true

spring:
  cloud:
    stream:
      bindings:
        demo01-input:
          consumer:
            broadcasting: true

八、顺序消息

8.1 顺序消息的概念

RocketMQ 支持普通顺序消息和完全严格顺序消息,确保消息按顺序消费。

8.2 实现顺序消息

在生产者中设置分区 key 表达式,在消费者中设置顺序消费:

# 生产者配置
spring:
  cloud:
    stream:
      bindings:
        demo01-output:
          producer:
            partition-key-expression: payload['id']
      rocketmq:
        bindings:
          demo01-output:
            producer:
              group: test
              sync: true

# 消费者配置
spring:
  cloud:
    stream:
      bindings:
        demo01-input:
          consumer:
            orderly: true

九、消息过滤

9.1 消息过滤的方式

RocketMQ 提供基于 Tag 和 SQL92 的消息过滤方式。

9.2 基于 Tag 过滤

在生产者中设置消息的 Tag,在消费者中设置过滤的 Tag:

// 生产者
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
        .setHeader(MessageConst.PROPERTY_TAGS, "yunai")
        .build();

// 消费者配置
spring:
  cloud:
    stream:
      bindings:
        demo01-input:
          consumer:
            tags: yunai || yutou

9.3 基于 SQL92 过滤

在消费者中设置 SQL92 过滤表达式:

spring:
  cloud:
    stream:
      bindings:
        demo01-input:
          consumer:
            sql: "id > 100"

十、事务消息

10.1 事务消息的概念

RocketMQ 提供完整的事务消息功能,确保分布式事务的最终一致性。

10.2 实现事务消息

在生产者中发送事务消息,并实现事务监听器:

@GetMapping("/send_transaction")
public boolean sendTransaction() {
    Demo01Message message = new Demo01Message().setId(new Random().nextInt());
    Args args = new Args().setArgs1(1).setArgs2("2");
    Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
            .setHeader("args", JSON.toJSONString(args))
            .build();
    return mySource.demo01Output().send(springMessage);
}

@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务逻辑
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 回查本地事务状态
        return RocketMQLocalTransactionState.COMMIT;
    }
}

十一、监控端点

11.1 监控端点的作用

Spring Cloud Stream 提供了自定义监控端点,用于获取 Binding 和 Channel 信息,以及 RocketMQ 客户端的健康状态。

11.2 配置监控端点

pom.xml 中引入 Spring Boot Actuator 相关依赖,并在配置文件中开放监控端点:

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      enabled: true
      show-details: ALWAYS

十二、更多的配置项信息

12.1 RocketMQ Binder Properties

配置项包括 name-serveraccess-keysecret-key 等。

12.2 RocketMQ Consumer Properties

配置项包括 enabletagssqlbroadcastingorderly 等。

12.3 RocketMQ Provider Properties

配置项包括 enablegroupmaxMessageSizetransactional 等。

十三、接入阿里云的消息队列 RocketMQ

13.1 配置阿里云 RocketMQ

在配置文件中设置访问阿里云 RocketMQ 的账号、Namesrv 地址等参数:

spring:
  cloud:
    stream:
      bindings:
        demo01-output:
          destination: TOPIC_YUNAI_TEST
      rocketmq:
        binder:
          name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
          access-key: ${ALIYUN_ACCESS_KEY}
          secret-key: ${ALIYUN_SECRET_KEY}
        bindings:
          demo01-output:
            producer:
              group: GID_PRODUCER_GROUP_YUNAI_TEST
              sync: true

总结

本文详细介绍了如何在 Spring Cloud Alibaba 中使用 RocketMQ 作为消息队列,从基础概念到快速入门,再到高级特性,如定时消息、消费重试、广播消费等,帮助开发者全面了解并应用 RocketMQ 到实际项目中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2321597.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

html css js网页制作成品——HTML+CSS+js迪奥口红网站网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…

PPT 转高精度图片 API 接口

PPT 转高精度图片 API 接口 文件处理 / 图片处理&#xff0c;将 PPT 文件转换为图片序列。 1. 产品功能 支持将 PPT 文件转换为高质量图片序列&#xff1b;支持 .ppt 和 .pptx 格式&#xff1b;保持原始 PPT 的布局和样式&#xff1b;转换后的图片支持永久访问&#xff1b;全…

python学习笔记--实现简单的爬虫(二)

任务&#xff1a;爬取B站上最爱欢迎的编程课程 网址&#xff1a;编程-哔哩哔哩_bilibili 打开网页的代码模块&#xff0c;如下图&#xff1a; 标题均位于class_"bili-video-card__info--tit"的h3标签中&#xff0c;下面通过代码来实现&#xff0c;需要说明的是URL中…

【颠覆性缓存架构】Caffeine双引擎缓存实战:CPU和内存双优化,命中率提升到92%,内存减少75%

千万级QPS验证&#xff01;Caffeine智能双缓存实现 92%命中率&#xff0c;内存减少75% 摘要&#xff1a; 本文揭秘千万级流量场景下的缓存革命性方案&#xff01;基于Caffeine打造智能双模式缓存系统&#xff0c;通过冷热数据分离存储与精准资源分配策略&#xff0c;实现CPU利…

智能汽车图像及视频处理方案,支持视频智能包装能力

美摄科技的智能汽车图像及视频处理方案&#xff0c;通过深度学习算法与先进的色彩管理技术&#xff0c;能够自动调整图像中的亮度、对比度、饱和度等关键参数&#xff0c;确保在各种光线条件下&#xff0c;图像都能呈现出最接近人眼的自然色彩与细节层次。这不仅提升了驾驶者的…

jenkins+1panel面板java运行环境自动化部署java项目

本文章不包含1panel面板安装、jenkins部署、jenkins连接git服务器等操作教程&#xff0c;如有需要可以抽空后期补上 jenkins安装插件Publish Over SSH 在系统配置添加服务器 查看项目的工作空间 项目Configure->构Post Steps选择Send files or execute commands over SSH…

C语言 【实现电脑关机小游戏】非常好玩

引言 在时间限制内做出正确的回答&#xff0c;时间一到&#xff0c;电脑自动关机&#xff0c;听起来是不是很有意思&#xff0c;下面来看看怎么实现吧。 注意&#xff1a;该游戏只在windows系统下可以玩&#xff0c; 一、游戏原理&#xff1a; 在Windows系统下&#xff0c;通…

[网络安全] 滥用Azure内置Contributor角色横向移动至Azure VM

本文来源于团队的超辉老师&#xff0c;其系统分析了Azure RBAC角色模型及其在权限滥用场景下的攻击路径。通过利用AADInternals工具提升用户至Contributor角色&#xff0c;攻击者可在Azure VM中远程执行命令&#xff0c;创建后门账户&#xff0c;实现横向移动。文中详述了攻击步…

vue3,element-plus 表格单选、多选、反选、全选

准备 定义数据 // 表格 const table ref(); // 表格数据 import type { User } from "/interface"; const tableData ref<User[]>([]); // 表格选集 const tableSelection ref<User[]>([]); // 表格选择行 const tableSelectedRow ref<User>…

【Linux】从开发到系统管理深入理解环境变量

文章目录 前言一、环境变量概念1.1 为什么需要环境变量&#xff1f;1.2 环境变量的本质特征 二、环境变量PATH2.1 PATH的运作机制2.2 常见环境变量及其作用2.3 环境变量操作指南 三、再谈环境变量3.1main函数命令行参数解析3.2 环境变量的继承机制3.3 本地变量与内部构建命令 总…

【CGE】社会核算矩阵构建(一):SAM基本结构

【CGE】社会核算矩阵构建&#xff08;一&#xff09;&#xff1a;SAM基本结构 社会核算矩阵构建&#xff08;一&#xff09;&#xff1a;SAM基本结构一、SAM的概念和基本特点二、SAM的基本结构1.开放经济体的SAM表结构2.SAM表各账户的主要核算内容&#xff08;1&#xff09;社会…

Ubuntu 系统部署 Ollama + DeepSeek + Docker + Ragflow

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; Mysql数据库规范 一、Ol…

第三讲 | C/C++内存管理完全手册

C/C内存管理 一、 C/C内存分布二、 C语言中动态内存管理方式&#xff1a;malloc/calloc/realloc/free三、 C内存管理方式1. new/delete操作内置类型2. new和delete操作自定义类型 四、operator new和operator delete函数&#xff08;重点&#xff09;五、new和delete的实现原理…

2021年蓝桥杯第十二届CC++大学B组真题及代码

目录 1A&#xff1a;空间&#xff08;填空5分_单位转换&#xff09; 2B&#xff1a;卡片&#xff08;填空5分_模拟&#xff09; 3C&#xff1a;直线&#xff08;填空10分_数学排序&#xff09; 4D&#xff1a;货物摆放&#xff08;填空10分_质因数&#xff09; 5E&#xf…

秒杀业务优化之从分布式锁到基于消息队列的异步秒杀

一、业务场景介绍 优惠券、门票等限时抢购常常出现在各类应用中&#xff0c;这样的业务一般为了引流宣传而降低利润&#xff0c;所以一旦出现问题将造成较大损失&#xff0c;那么在业务中就要求我们对这类型商品严格限时、限量、每位用户限一次、准确无误的创建订单&#xff0c…

纯vue手写流程组件

前言 网上有很多的vue的流程组件&#xff0c;但是本人不喜欢很多冗余的代码&#xff0c;喜欢动手敲代码&#xff1b;刚开始写的时候&#xff0c;确实没法下笔&#xff0c;最后一层一层剥离&#xff0c;总算实现了&#xff1b;大家可以参考我写的代码&#xff0c;可以拿过去定制…

WPS宏开发手册——使用、工程、模块介绍

目录 系列文章前言1、开始1.1、宏编辑器使用步骤1.2、工程1.3、工程 系列文章 使用、工程、模块介绍 JSA语法 第三篇练习练习题&#xff0c;持续更新中… 前言 如果你是开发人员&#xff0c;那么wps宏开发对你来说手拿把切。反之还挺吃力&#xff0c;需要嘻嘻&#xf…

django入门教程之request和reponse【二】

接上节&#xff1a;入门【一】 再创建一个orders子应用&#xff0c;python manager.py startapp orders&#xff0c;orders目录中新建一个urls.py文件。结构如图&#xff1a; 通过上节课&#xff0c;我们知道在views.py文件中编写函数时&#xff0c;有一个默认入参request&…

RAG优化:python从零实现[吃一堑长一智]循环反馈Feedback

本文将介绍一种有反馈循环机制的RAG系统,让当AI学会"吃一堑长一智",给传统RAG装了个"后悔"系统,让AI能记住哪些回答被用户点赞/拍砖,从此告别金鱼记忆: 每次回答都像在玩roguelike:失败结局会强化下次冒险悄悄把优质问答变成新知识卡牌,实现"以…

【Linux】VMware17 安装 Ubuntu24.04 虚拟机

目录 安装教程 一、下载 Ubuntu 桌面版iso映像 二、安装 VMware 三、安装 Ubuntu 桌面版 VMware 创建虚拟机 挂载 Ubuntu ISO 安装 Ubuntu 系统 安装教程 一、下载 Ubuntu 桌面版iso映像 链接来自 清华大学开源软件镜像站 ISO文件地址&#xff1a;ubuntu-24.04.2-des…