SpringCloud之Stream框架集成RocketMQ消息中间件

news2025/1/12 8:42:55

        Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

     目前 Spring Cloud Stream只支持 RabbitMQ 和 Kafka 的自动化配置。

     Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口.

在 SpringCloudStream 3.x 版本前是通过 @StreamListener 和 @EnableBinding 进行消息的发送和消费的,springCloudStream 3.x 版本后 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解,不建议使用了;后续的版本更新中替换成函数式的方式实现。

既然通过四大函数式接口的方式替换了注解的方式 那么该如何进行绑定呢?

通过 spring.cloud.stream.function.definition:名称的方式进行绑定 公开 topic。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个 topic拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入 - + -in- + < index >

     例如:myTopic-in-0

  • 输出 - + -out- + < index >

       例如:myTopic-out-0

注意:这里的 functionName需要和代码声明的函数名称还有spring.cloud.stream.function.definition下的名称保持一致(后面还会在项目实战中展示一遍)

代码示例:

----------------------------------项目实战--------------------------------------

看下我们项目中的配置,配置文件是放在nacos上面的:

消息发送:

/**
 * @ClassName MessageParamParentDto
 * @Author zxd
 * @Version 1.0.0
 * @Description TODO
 * @CreateTime 2023/6/13 11:27 - 星期二
 */
@Data
public class MessageParamParentDto implements Serializable {

    private static final long serialVersionUID = 7963819193258646924L;


    private  String routeUrl;

}

--------------------------------------------------------------------------------------------------------------

/**
 * @ClassName MessageParamDto
 * @Author kch
 * @Version 1.0.0
 * @Description 消息队列接收系统消息实体对象
 * @CreateTime 2022/9/18 15:16 - 星期日
 */
@Data
public class MessageParamDto  extends MessageParamParentDto implements Serializable {

    private static final long serialVersionUID = 7111819193258646924L;

    /**
     * 消息模板code
     */
    @NotNull(message = "消息模板不能为空")
    private String templateCode;

    /**
     * 可变参数,必传字段
     * 该参数匹配模板字符串中的变量和URL中的变量,所以模板和URL中的变量名不能重复
     */
    @NotNull(message = "参数不能为空")
    private Map<String, String> params;

    /**
     * 消息详情跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)
     * 例如:userId=1&userCode=test
     */
//    private String routerParams;

    /**
     * 消息操作跳转路径参数(没有不传,有参数按照URL参数拼接规范拼接,不加?号)
     * 例如:userId=1&userCode=test
     */
//    private String contentPathParams;

    /**
     * 接收者租户
     */
    @NotNull(message = "接收者租户ID不能为空")
    private Long tenantId;

    /**
     * 接收人
     */
    @NotNull(message = "接收者用户ID不能为空")
    @Size(min = 1, message = "接收者用户ID不能为空")
    private List<RecipientUser> recipientUsers;

    @Valid
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class RecipientUser implements Serializable {

        /**
         * 接收人id
         */
        @NotNull(message = "接收者用户ID不能为空")
        private Long recipientId;

        /**
         * 接收人手机号
         */
        @Pattern(regexp = RegexPool.MOBILE, message = "手机格式错误")
        private String phone;

    }

}

-----------------------------------------------------------------------------------------------------------

/**
 * @ClassName MessageMqBinding
 * @Author zpp
 * @Version 1.0.0
 * @Description TODO
 * @CreateTime 2023/2/10 15:37 - 星期五
 */
public interface MessageMqBinding {

    /**
     * 系统消息生产者交换机
     */
    String MESSAGE_MQ_OUTPUT = "dyzsMessageProvider-out-0";
}

----------------------------------------------------------------------------------------

@Slf4j
@RestController
@RequestMapping("/mq")
public class MessageMqController {
    @Resource
    private StreamBridge streamBridge;
    
    /**
     * @param :
     * @Author zpp
     * @Description 发送系统消息
     * @Date 2023/2/10 15:27
     * @Return com.zysy.common.api.entity.Result<java.lang.Boolean>
     */
    @PostMapping
    public Result<Boolean> sendMessage(@RequestBody @Validated MessageParamDto dto) {
        log.info("接收到系统消息发送请求:{}", JSONObject.toJSONString(dto));
        MessageMQParamDto paramDto = new MessageMQParamDto(dto);
        paramDto.setCreateBy(UserUtil.getUserId());
        paramDto.setCreateDept(UserUtil.getDeptId());
        List<MessageMQParamDto> paramDtoList = new ArrayList<>();
        paramDtoList.add(paramDto);
        MessageBuilder builder = MessageBuilder.withPayload(paramDtoList)
                .setHeader("Content-Type", "application/json");
        return Result.success(streamBridge.send(MessageMqBinding.MESSAGE_MQ_OUTPUT, builder.build()));
    }

------------------------------------------------------------------------------------------------------

消息消费:

          下图是在代码中配置的消息消费者,这里的函数名称要和上图中的function.definition配置的名称一样;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1071342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jenkins配置钉钉通知

Jenkins 作为最流行的开源持续集成平台&#xff0c;其强大的拓展功能一直备受测试人员及开发人员的青睐。大家都知道我们可以在 Jenkins 中安装 Email 插件支持构建之后通过邮件将结果及时通知到相关人员。 但其实 Jenkins 还可以支持钉钉消息通知&#xff0c;其主要通过 Ding…

Android原生实现控件阴影方案(API28及以上)

Android控件的阴影效果的实现方式有很多种&#xff0c;这里介绍一下另一种Android原生的阴影实现方案&#xff08;API28及以上&#xff09;。 我们利用elevation、outlineAmbientShowColor、outlineSpotShadowColor来实现一个带阴影的Button。 实现效果如下图&#xff0c;阴影宽…

第0章 前言

大家好&#xff0c;我叫 Rick Blyth&#xff0c;我是一名软件开发人员、企业家、创始人、博主和父亲 &#x1f44b; 几年前&#xff0c;在成功构建和扩展了一些自筹资金的 Micro SaaS&#xff08;微型 SaaS&#xff09; 应用后&#xff0c;我放弃了&#xff08;薪水不错但很糟…

SpringBoot 实现数据脱敏

SpringBoot 实现数据脱敏 前言Hutool 实现数据脱敏引入依赖脱敏工具类代码实现 使用注解的方式定义枚举自定义序列化类定义注解测试 前言 数据脱敏是指对某些敏感信息通过脱敏规则进行数据的变形&#xff0c;实现敏感隐私数据的可靠保护。 数据脱敏常用规则有替换、重排、加密…

接口测试和性能测试的区别

最近我在一个论坛上看到了一个关于性能测试和接口测试的经典问题&#xff0c;问题如下&#xff1a; 问题&#xff1a;后端性能测试&#xff0c;一个功能其实都是由后台多个接口组成的。 例如一个单据的保存&#xff0c;可能后台需要调用几个接口。用LR录制这个功能做性能测试。…

Java如何进行数据脱敏

1.SQL数据脱敏实现 MYSQL(电话号码,身份证)数据脱敏的实现 1 2 3 4 5 6 7 8 -- CONCAT()、LEFT()和RIGHT()字符串函数组合使用&#xff0c;请看下面具体实现 -- CONCAT(str1,str2,…)&#xff1a;返回结果为连接参数产生的字符串 -- LEFT(str,len)&#xff1a;返回从字符串st…

Vue Router的进阶

进阶 导航守卫 官方文档上面描述的会比较深奥&#xff0c;而守卫类型也比较多&#xff0c;其中包含了全局前置守卫、全局解析守卫、全局后置钩子、路由独享守卫、组件内守卫。每一种守卫的作用和用法都不相同。这会使得大家去学习的时候觉得比较困难&#xff0c;这边主要介绍…

如何平衡需求的优先级冲突?

每个项目都有各种需求&#xff0c;如业务需求、技术需求、用户需求、系统需求。我们需要对这些需求进行优先级排序&#xff0c;平衡不同利益相关者的需求&#xff0c;以更好满足客户需求&#xff0c;确保关键业务目标得到优先满足&#xff0c;并合理分配资源&#xff0c;避免资…

移植 NetXDuo 到 STM32F4 芯片

移植 NetXDuo 到 STM32F4 芯片 1. NetXDuo 和 ThreadX 源码获取2. 准备工作2.1 基本工程模板获取 —— CubeMx 3.ThreadX 移植3.1 添加到工程3.2 文件修改3.3 补充完成回调函数 4. NetXDuo 移植4.1 将 NetXDuo 添加到工程4.2 驱动层实现4.3 测试 1. NetXDuo 和 ThreadX 源码获取…

RT-Thread 中断管理(学习二)

中断的底半处理 RT-Thread不对中断服务程序所需要的处理时间做任何假设、限制&#xff0c;但如同其它实时操作系统或非实时操作系统一样&#xff0c;用户需要保证所有的中断服务程序在尽可能短的时间内完成。这样在发生中断嵌套&#xff0c;或屏蔽了相应中断源的过程中&#x…

小黑开始了拉歌训练,第一次进入部室馆,被通知要去当主持人心里有些紧张的leetcode之旅:337. 打家劫舍 III

小黑代码&#xff08;小黑卡在了bug中&#xff0c;上午一步步探索做出&#xff0c;非常NB!!!&#xff09; # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left lef…

Hive窗口函数回顾

1.语法 1.1 基于行的窗口函数 Hive的窗口函数分为两种类型&#xff0c;一种是基于行的窗口函数&#xff0c;即将某个字段的多行限定为一个范围&#xff0c;对范围内的字段值进行计算&#xff0c;最后将形成的字段拼接在该表上。 注意&#xff1a;在进行窗口函数计算之前&#…

X86指令基本格式

X86指令基本格式 1 什么是机器码2 X86指令基本格式3 指令前缀3.1 第一组&#xff1a;封锁和重复执行前缀3.2 第二组&#xff1a;段前缀3.3 第三组&#xff1a;修改操作数默认长度3.4 第四组&#xff1a;修改默认地址长度 4 操作码5 ModR/M与SIB5.1 ModR/M字节5.2 SIB字节 6 地址…

uCharts常用图表组件demo

带渐变阴影的曲线图 <view class"charts-box"><qiun-data-charts type"area" :opts"opts" :chartData"chartData" :ontouch"true":background"rgba(256,256,256,0)" /> </view>data(){return{…

嵌入式学习(1)HAL库

文章目录 1.HAL库文件介绍2.HAL库编程目录结构3.使用cubemx生成HAL库编程目录结构 1.HAL库文件介绍 2.HAL库编程目录结构 3.使用cubemx生成HAL库编程目录结构

【JavaEE重点知识归纳】第7节:类和对象

目录 一&#xff1a;了解面向对象 1.什么是面向对象 2.面向对象和面向过程区分 二&#xff1a;类定义和使用 1.什么是类 2.练习&#xff1a;定义一个学生类 三&#xff1a;类的实例化 1.什么是实例化 2.类和对象的说明 四&#xff1a;认识this 1.为什么要有this引用…

rails 常量自动加载和重新加载机制

在Rails中&#xff0c;有一个称为"常量自动加载和重新加载机制"的功能&#xff0c;它使得在开发和生产环境中能够自动加载和重新加载类和模块。这个机制允许您不必手动管理类的加载&#xff0c;使得开发更加方便。 快乐学习&#xff1a; 自动加载、重新加载 自动加…

Yii2全拦截路由catchAll的使用

定义&#xff1a;catchAll 路由&#xff08;全拦截路由&#xff09; 应用场景&#xff1a;网站维护的时候需要向用户抛出一个维护的页面&#xff0c;方便提醒用户 使用方法&#xff1a; 1、在应用配置中设置 yii\web\Application::catchAll 属性 2、新增对应的控制器方法 3、…

【Putty】win10 / win 11:SSH 远程连接工具 Putty 下载、安装

目录 一、Jmerter 连接 SSH 隧道的 mysql&#xff08;不可行&#xff09; 二、Putty 介绍 三、Putty 的下载 四、Putty 无需安装直接使用 五、Putty 使用 &#xff08;1&#xff09;我需要连接 ssh 隧道的 MySQL 参数如下 &#xff08;2&#xff09;Putty 使用教程 一、…

MA-SAM:模态不可知的三维医学图像分割SAM自适应

论文&#xff1a;MA-SAM: Modality-agnostic SAM Adaptation for 3D Medical Image Segmentation | Papers With Code 代码&#xff1a;GitHub - cchen-cc/MA-SAM: PyTorch implementation for MA-SAM 机构&#xff1a;a)高级医疗计算和分析中心&#xff0c;麻省总医院和哈佛…