【实战】Spring Cloud Stream3.0 整合RocketMq

news2024/11/13 9:21:38

文章目录

    • 前言
    • 技术积累
      • Spring Cloud Stream3.0新特性
      • RocketMq简介
    • 实战演示
      • 引入Maven依赖
      • 增加application配置
      • 消息生产者
      • 消息消费者

前言

相信很多同学用使用过rocketmq消息中间件,且大多情况下是使用原生的rocketmq-spring-boot-starter 进行集成然后创建一个rocketMQTemplate发送的生产者和@RocketMQMessageListener监听的消费者。今天我们就不按常理出牌,使用Spring Cloud Stream来进行整合RocketMq。如果我们有在一个项目中需要引入多个MQ的需求,用Spring Cloud Stream简直不要太好。当然,我们是直接使用Spring Cloud Stream3.0,不再像之前低版本那样需要引入通道类进行指定,3.0版本可用直接字配置文件进行粘接绑定信道,简直不要太爽。

技术积累

Spring Cloud Stream3.0新特性

Spring Cloud Stream 3.0 引入了一些新特性,包括对新版本Spring Boot和Spring Cloud的支持,以及对反序列化错误处理的改进。
以下是一些主要的新特性:
支持Spring Boot 2.x和Spring Cloud 2020.0.x。
改进了消息中间件的错误处理,提供了更好的异常传播和提供了更多的配置选项来自定义错误处理。
提供了对函数式编程模型的支持。
提供了对Kafka消息传递保证的配置选项。
提供了对消息转换器的支持,可以在发送和接收消息之前进行自定义转换。

RocketMq简介

‌RocketMQ是‌Apache基金会下的一个开源分布式消息中间件,设计用于云原生环境,支持高吞吐量和流处理,广泛应用于金融级稳定性场景。 它具备以下核心特性:
云原生:RocketMQ设计为与云和容器技术(如Kubernetes)友好,支持无限弹性的扩缩。
高吞吐:能够保证万亿级别的吞吐量,满足微服务与大数据场景的需求。
流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
金融级稳定性:广泛用于交易核心链路,确保系统的稳定运行。
架构极简:采用零外部依赖的Shared-nothing架构,简化系统设计和维护。
生态友好:无缝对接微服务、实时计算、数据湖等周边生态,便于集成和使用。
支持多种消息类型:包括普通消息、顺序消息、事务消息、批量消息、定时(延时)消息、消息回溯等,满足不同业务场景需求。
易用性与灵活性:提供多种发送与消费模式,丰富的客户端支持,以及易于运维与管理的工具和界面。
在这里插入图片描述

实战演示

今天的重点不要RocketMq的使用,而是Spring Cloud Stream3.0如何整合RocketMq。以下是一个简单的整合DEMO,仅供学习使用,如果需要应用与生产环境需要增加一些额外的方案。比如死信或者消费失败重试机制等等。

引入Maven依赖

这里需要注意SpringBoot与SpriingCloud版本对应,SpringCloud版本与RocketMq Starter版本对应

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
    <java.version>8</java.version>
    <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
</dependencies>
    <!--rocketmq-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        <version>2.2.2.RELEASE</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-acl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.7.1</version>
    </dependency>

</dependencies>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

增加application配置

testChannel可以作为输入输出信道

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      binders:
        my-rocketmq:
          type: rocketmq
      function:
        definition: testChannel
      bindings:
        testChannel-in-0:
          binder: my-rocketmq
          destination: test-rocket-topic
          group: test-rocket-group
          content-type: text/plain
          # 设置spring cloud stream次数1,表示禁用,异常情况下只消费一次消息
          consumer:
            max-attempts: 1
        testChannel-out-0:
          binder: my-rocketmq
          destination: test-rocket-topic
          content-type: text/plain

消息生产者

直接可以用过StreamBridge 进行手动发送

@RestController
@RequestMapping("/base")
public class BaseController {

    @Resource
    private StreamBridge streamBridge;

    //@Resource
    private MqChannel mqChannel;

    @GetMapping("/send")
    public Boolean sendMessage(String msg) {
        boolean send = streamBridge.send("testChannel-out-0", MessageBuilder.withPayload("rocket测试:" + msg).build());
     
        return true;
    }
}

消息消费者

直接监听testChannel通道,默认监听testChannel-input-0信道

/**
 * RocketChannel
 * @author senfel
 * @version 1.0
 * @date 2024/7/23 12:20
 */
@Configuration
public class RocketChannel {

    /**
     * testChannel 消费者
     * @author senfel
     * @date 2024/7/23 12:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> testChannel(){
        return message -> {
            System.out.println("接收到消息Payload:" + message.getPayload());
            System.out.println("接收到消息Header:" + message.getHeaders());
        };
    }
}

测试用例
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1945211.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity显示泰语且兼容泰语音标

前言&#xff1a;使用Unity开发的游戏需要支持泰语本地化&#xff0c;以及解决显示泰语时Unity的bug 目录 1、Text组件显示泰语2、TextMeshPro组件显示泰语 现在很多游戏都需要显示泰语&#xff0c;下面将介绍Unity如何显示泰语&#xff0c;&#xff08;仅介绍Unity字体方面的设…

Transformer自然语言处理实战pdf阅读

一.第一章 欢迎来到transformer的世界 1.解码器-编码器框架 在Transformer出现之前&#xff0c;NLP的最新技术是LSTM等循环架构。这些架 构通过在神经网络连接使用反馈循环&#xff0c;允许信息从一步传播到另一 步&#xff0c;使其成为对文本等序列数据进行建模的理想选择。如…

多表查询时条件写在where和join on的区别

文章目录 一、初始数据二、问题分析三、总结 先说结论&#xff0c; 1.如果想要拿到主表不受到关联表查询条件的数据的话&#xff0c;那么建议直接将查询条件放到on之后。 2.如果将关联表的条件查询放在where之后&#xff0c;可能会将主表中的数据进行排除。 所以如果想实现的查…

总结一些vue3小知识3

1.限制时间选择器只能选择后面的日期 说明&#xff1a;disabled-date属性是一个用来判断该日期是否被禁用的函数&#xff0c;接受一个 Date 对象作为参数。 应该返回一个 Boolean 值。 <el-date-picker class"w180" v-model"datas.form.timeDate[0]" …

6.6 使用dashboard商城搜索导入模板

本节重点介绍 : 模板商城中搜索模板导入模板修改模板 大盘模板商城地址 免费的 地址 https://grafana.com/grafana/dashboards 搜索模板技巧 详情 导入dashboard 两种导入模式 url导入id导入json文件导入 导入 node_exporter模板 https://grafana.com/grafana/dashboa…

萤石举办2024夏季新品发布会,全力推进“2+5+N”智能家居新生态

7月24日&#xff0c;“智动新生&#xff0c;尽在掌控”2024萤石夏季新品发布会在杭州成功举办。本次发布会上&#xff0c;“智慧生活守护者”萤石深入挖掘应用场景&#xff0c;重磅发布了包括智能健康手表、智能家居AI主机、生态控制器、智家APP等多款创新性的产品及应用&#…

短视频时代,云微客AI批量混剪技术有多厉害?

在数字媒体日益盛行的今天&#xff0c;视频剪辑已经成为了一种热门的技能&#xff0c;加上短视频创作在当下也变得越来越流行&#xff0c;因此云微客短视频AI批量剪辑就显得非常必要。近些年&#xff0c;随着人工智能技术的发展&#xff0c;在很大程度上简化了批量剪辑的过程&a…

linux之网络子系统-本机发包到本机 实现

一、前言 在linux之网络子系统-网络协议栈 发包收包详解-CSDN博客 文章中&#xff0c;详细介绍了跨主机之间的数据包发送的源码流程。除了跨主机&#xff0c;还有本机发包到本机是如何实现的&#xff1f;就是 saddr ip地址为 127.0.0.1 . 二、发送数据包到 127.0.0.1 首先&a…

Go语言编程 学习笔记整理 第2章 顺序编程 前半部分

前言&#xff1a;《Go语言编程》编著 许式伟 吕桂华 等 1.1 变量 var v1 int var v2 string var v3 [10]int // 数组 var v4 []int // 数组切片 var v5 struct { f int } var v6 *int // 指针 var v7 map[string]int // map&#xff0c;key为string类型&#xff0c;value为in…

神经网络理论(机器学习)

motivation 如果逻辑回归的特征有很多&#xff0c;会造出现一些列问题&#xff0c;比如&#xff1a; 线性假设的限制&#xff1a; 逻辑回归是基于线性假设的分类模型&#xff0c;即认为特征与输出之间的关系是线性的。如果特征非常多或者特征与输出之间的关系是非线性的&#…

职场新人必备神器:四款PDF转Word在线转换工具大比拼

关于PDF文件格式转换这件事&#xff0c;其实已经变成了职场人都要会的基础技能了&#xff0c;那么要如何才能够快速且完成的PDF转换为Word呢&#xff1f;今天就让我用自己的毕生所学给大家说说四款pdf转word在线转换免费的工具吧&#xff0c;下面一起来了解一下吧。 一、福昕PD…

【C++】流插入和流提取运算符重载

目录 前言ostream和istream自定义类型的流插入重载自定义类型的流提取重载解决私有问题日期类总接口 前言 我们在上一节实现日期类时&#xff0c;在输入和输出打印时&#xff0c;经常会调用两个函数&#xff1a; void Insert()//输入函数{cin >> _year;cin >> _mo…

ARDUINO 上传失败:上传错误:退出状态 2常见原因及解决方法Failed uploading: uploading error: exit status

前言&#xff1a; 串口监视器可显示各种ESP32打印信息 下述均为USB TYPEC 数据线正常的情况下的报错&#xff0c;如果数据线或串口有问题 原因1&#xff1a;无法连接到ESP32&#xff1a;串行数据流停止&#xff1a;可能存在串行噪音或损坏 解决方法&#xff1a;ESP32电路板是…

一文速览llama 3.1及其微调:长度终于到128K,故可让paper-review数据集直接微调

前言 llama3 刚出来时&#xff0c;其长度只有8K对于包括我司在内的大模型开发者是个小小的缺憾&#xff0c;好在很快&#xff0c;在7.23日&#xff0c;Meta发布了Llama 3.1&#xff0c;其意义在于 很明显&#xff0c;随着llama的影响力越来越大&#xff0c;Meta想让llama类似…

(雷达数据处理中的)跟踪算法(1) --- 整体目录

说明 目标跟踪是雷达数据处理中核心的步骤之一&#xff0c;基于雷达的各项应用往往需要跟踪模块所输出的结果。比如在车载雷达领域&#xff0c;目标跟踪位于点云聚类和ADAS功能实现之间(关于聚类&#xff0c;可以参考我之前的博文[1]&#xff1a;&#xff08;毫米波雷达数据处理…

macOS 10.15中屏蔽Microsoft Edge浏览器的更新提示

文章目录 1.效果对比2.安装描述文件3.停用描述文件4.高级操作&#xff08;可选&#xff09;参考文献 最近在macOS10.15系统&#xff0c;打开Microsoft Edge浏览器&#xff0c;每次打开都有个烦人的提示“ 要获取将来的 microsoft edge 更新&#xff0c;需要 macos 10.15 或更高…

C#实战 | 天行健、上下而求索

本文介绍C#开发入门案例。 01、项目一&#xff1a;创建控制台应用“天行健&#xff0c;君子以自强不息” 项目说明&#xff1a; 奋斗是中华民族的底色&#xff0c;见山开山&#xff0c;遇水架桥&#xff0c;正是因为自强不息的奋斗&#xff0c;才有了辉煌灿烂的中华民族。今…

单向链表知识汇总

提示&#xff1a;本文章参考知乎大佬和一位博主大佬 单向链表 1.前置知识(部分最好记忆)1.1 链表组成1.2 链表插入分三种情况1.2.1头插1.2.2 中间插1.2.2 结尾插 1.3 链表的删除1.51.61.7 2.链表各种接口的实现2.1 链表的打印2.1 链表的节点的申请2.2 单链表节点增加2.2.1 单链…

【常微分方程】

框架 常微分方程的概念一阶微分方程可变离分量齐次方程一阶线性微分方程可降阶的高阶微分方程二阶常系数齐次线性微分方程二阶常系数非齐次线性微分方程 讲解 【1】 常微分方程&#xff1a;是微分方程的特殊情况&#xff1b; 阶&#xff1a;是方程未知函数的最高阶导数的阶数&…

关于pycharm上push项目到gitee失败原因

版权声明&#xff1a;本文为博主原创文章&#xff0c;如需转载请贴上原博文链接&#xff1a;https://blog.csdn.net/u011628215/article/details/140577821?spm1001.2014.3001.5502 前言&#xff1a;最近新建项目push上gitee都没有问题&#xff0c;但是当在gitee网站进行了一个…