SpringCloudStream 3.x rabbit 使用

news2025/1/16 16:44:24

1. 前言

今天带来的是SpringCloudStream 3.x 的新玩法,通过四大函数式接口的方式进行数据的发送和监听。本文将通过 rabbitMQ 的方式进行演示

3.x版本后是 可以看到 @StreamListener 和 @EnableBinding 都打上了@Deprecated 注解。后续的版本更新中会逐渐替换成函数式的方式实现。 既然通过四大函数式接口的方式替换了注解的方式 那么
该如何进行绑定呢?通过:spring.cloud.stream.function.definition: 名称 的方式进行绑定 公开topic。不管是创建 Consumer 还是 Supplier 或者是 Function Stream都会将其的 方法名称 进行 一个topic 拆封 和 绑定 假设 创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 in 和 out 两个通道 input - < functionName > + -in- + < index > output - <
functionName > + -out- + < index > 格式拆分 myTopic-in-0 myTopic-out-0

2. 项目演练

spring boot用的是2.7.0的

2.1 引用依赖

 <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.2 修改配置文件

server:
  port: 8080
# rabbitmq 消费者配置
spring:
  rabbitmq:
    host: localhost  # rabbitmq服务地址
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        testSupplier-out-0: # 生产者配置
          content-type: application/json
          destination: demo-destination #交换机
          binder: rabbit # mq类型
        testConsumer-in-0: # 消费者配置
          content-type: application/json
          destination: demo-destination #交换机
          group: demo-group #消费者分组
          binder: rabbit
        testSupplier1-out-0: # 生产者配置
          content-type: application/json
          destination: demo1-destination
          binder: rabbit
        testFunction-in-0: # 消费者配置
          content-type: application/json
          destination: demo1-destination
          group: demo1-group
          binder: rabbit
        testFunction-out-0: # 生产者配置
          content-type: application/json
          destination: demo2-destination
          binder: rabbit
        testConsumer1-in-0: # 消费者配置
          content-type: application/json
          destination: demo2-destination
          group: demo2-group
          binder: rabbit
    function:
      definition:  testSupplier;testConsumer;testSupplier1;testFunction;testConsumer1; # 绑定

2.3 具体使用

2.3.1 自动发送消息

修改配置文件
在这里插入图片描述
在这里插入图片描述
定义生产者bean

  /**
     * 注意方法名称 testSupplier 要与配置文件中的spring.cloud.stream.bindings.testSupplier-out-0 保持一致
     * 其中 -out-0 是固定写法,out 标识生产者类型,0是生产者索引
     */
    @Bean
    public Supplier<Person> testSupplier() {
        return ()->{
            Person person = new Person();
            person.setName("zhang");
            System.out.println("testSupplier生产消息:"+person);
            return person;
        };
    }

使用Supplier函数作为生产者,这个生产者,会一直自动生产消息。
在这里插入图片描述

定义消费者bean

 /**
     * 注意方法名称 testConsumer 要与配置文件中的spring.cloud.stream.bindings.testConsumer-in-0 保持一致
     * 其中 -in-0 是固定写法,in 标识消费者类型,0是消费者索引
     */
    @Bean
    public Consumer<Person> testConsumer() {
        return msg -> {
            System.out.println("testConsumer消费消息: " + msg);
        };
    }

使用Consumer函数作为消费者,是自动检测的,只要队列中有数据就会取出来消费,本项目中该消费者配置如下:

    testConsumer-in-0: # 消费者配置
          content-type: application/json
          destination: demo-destination #交换机
          group: demo-group #消费者分组
          binder: rabbit

该消费者会一直监控队列destination.group ,也就是demo-destination.demo-group
在这里插入图片描述
在这里插入图片描述

2.3.2 手动发送消息

只绑定消费者,生产者不绑定,其他的和自动发送消息一样不变
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
发送消息

    @GetMapping("sendMsg")
    public String sendMsg(){
        Person person = new Person();
        person.setName("controller测试");
        Message<Person> message = MessageBuilder.withPayload(person)
               .build();
        // 发送消息
        streamBridge.send("testSupplier-out-0", message);
        return "发送成功";
    }

在这里插入图片描述
在这里插入图片描述

2.3.3 加工消息

  1. 修改配置文件
    在这里插入图片描述
  2. 生产者定义
   @Bean
    public Supplier<Person> testSupplier1() {
        return ()->{
            Person person = new Person();
            person.setName("测试function");
            System.out.println("testSupplier1生产消息:"+person);
            return person;
        };
    }
  1. 消费者定义
    @Bean
    public Consumer<Person> testConsumer1() {
        return msg -> {
            System.out.println("testConsumer1消费消息: " + msg);
        };
    }
  1. 加工funtion定义
    @Bean
    public Function<Person, Person> testFunction() {
        return msg -> {
            msg.setName(msg.getName()+"_加工消息");
            return msg;
        };
    }
  1. 结果
    在这里插入图片描述

3 项目源码

3.1 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zcl</groupId>
    <artifactId>rabitMQDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabitMQDemo</name>
    <description>rabitMQDemo</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>2021.0.3</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.2 application.yaml

server:
  port: 8080
--- # rabbitmq 消费者配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        testSupplier-out-0:
          content-type: application/json
          destination: demo-destination
          group: demo-group
          binder: rabbit
        testConsumer-in-0:
          content-type: application/json
          destination: demo-destination
          group: demo-group
          binder: rabbit
        testSupplier1-out-0:
          content-type: application/json
          destination: demo1-destination
          group: demo1-group
          binder: rabbit
        testFunction-in-0:
          content-type: application/json
          destination: demo1-destination
          group: demo1-group
          binder: rabbit
        testFunction-out-0:
          content-type: application/json
          destination: demo2-destination
          group: demo2-group
          binder: rabbit
        testConsumer1-in-0:
          content-type: application/json
          destination: demo2-destination
          group: demo2-group
          binder: rabbit
    function:
      definition: testSupplier1;testFunction;testConsumer1;

3.3 RabbitMqComponent.java

package com.zcl.component;

import com.zcl.RabitMqDemoApplication;
import com.zcl.entity.Person;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@Component
public class RabbitMqComponent {
    /**
     * 注意方法名称 testConsumer 要与配置文件中的spring.cloud.stream.bindings.testConsumer-in-0 保持一致
     * 其中 -in-0 是固定写法,in 标识消费者类型,0是消费者索引
     */
    @Bean
    public Consumer<Person> testConsumer() {
        return msg -> {
            System.out.println("testConsumer消费消息: " + msg);
        };
    }
    /**
     * 注意方法名称 testSupplier 要与配置文件中的spring.cloud.stream.bindings.testSupplier-out-0 保持一致
     * 其中 -out-0 是固定写法,out 标识生产者类型,0是生产者索引
     */
    @Bean
    public Supplier<Person> testSupplier() {
        return ()->{
            Person person = new Person();
            person.setName("zhang");
            System.out.println("testSupplier生产消息:"+person);
            return person;
        };
    }
    @Bean
    public Supplier<Person> testSupplier1() {
        return ()->{
            Person person = new Person();
            person.setName("测试function");
            System.out.println("testSupplier1生产消息:"+person);
            return person;
        };
    }
    @Bean
    public Function<Person, Person> testFunction() {
        return msg -> {
            msg.setName(msg.getName()+"_加工消息");
            return msg;
        };
    }
    @Bean
    public Consumer<Person> testConsumer1() {
        return msg -> {
            System.out.println("testConsumer1消费消息: " + msg);
        };
    }
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1636965.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

修改Ubuntu远程登录欢迎提示信息

无论何时登录公司的某些生产系统&#xff0c;你都会看到一些登录消息、警告或关于你已登录服务器的信息&#xff0c;如下所示。 修改方式 1.打开ubuntu终端,进入到/etc/update-motd.d目录下面 可以发现目录中的文件都是shell脚本, 用户登录时服务器会自动加载这个目录中的文件…

深度学习之基于Matlab NN的伦敦房价预测

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 房价预测是房地产领域的一个重要问题&#xff0c;对于投资者、开发商以及政策制定者等都具有重要的指…

【写作吗。月入过万,8年写作路,10w+订阅创作者的18条建议】

Tom Kuegler &#xff1a;62k粉丝的medium独立作者&#xff0c;全网累计10w人订阅。他不是高深莫测&#xff0c;有资源背景的大V&#xff0c;也不是有运营团队的流量博主&#xff0c;而是从零开始&#xff0c;坚持不断写作的最普通创作者之一。 通过写作&#xff0c;他收入过万…

SpirngBoot整合快递100

目录 一、注册快递100 二、技术文档地址 三、需要认证的key和comcumer 四、spring boot 整合快递 100使用 4.1 引入快递100和hutool的依赖 4.2 将key和comcumer写入application.properties文件中 4.3 新建一个modle,用于将查出来的json数据转成对象 4.4 新建一个controll…

【yolov8】yolov8剪枝训练流程

yolov8剪枝训练流程 流程&#xff1a; 约束剪枝微调 一、正常训练 yolo train model./weights/yolov8s.pt datayolo_bvn.yaml epochs100 ampFalse projectprun nametrain二、约束训练 2.1 修改YOLOv8代码&#xff1a; ultralytics/yolo/engine/trainer.py 添加内容&#…

机器学习高频问答题总结

机器学习问答题总结 第一章 线性回归1.什么是线性回归&#xff1f;解释主要原理2.解释线性回归中最小二乘法的原理吗&#xff1f;3.如何评估线性回归模型的性能&#xff1f;4.线性回归中正则化的目的是什么吗&#xff1f;L1正则化和L2正则化有什么不同&#xff1f; 第二章 逻辑…

深入解析yolov5,为什么算法都是基于yolov5做改进的?(一)

YOLOv5简介 YOLOv5是一种单阶段目标检测算法&#xff0c;它在YOLOv4的基础上引入了多项改进&#xff0c;显著提升了检测的速度和精度。YOLOv5的设计哲学是简洁高效&#xff0c;它有四个版本&#xff1a;YOLOv5s、YOLOv5m、YOLOv5l、YOLOv5x&#xff0c;分别对应不同的模型大小…

神经网络与深度学习--网络优化与正则化

文章目录 前言一、网络优化1.1网络结构多样性1.2高维变量的非凸优化1.鞍点2.平坦最小值3.局部最小解的等价性 1.3.改善方法 二、优化算法2.1小批量梯度下降法&#xff08;Min-Batch&#xff09;2.2批量大小选择2.3学习率调整1.学习率衰减&#xff08;学习率退火&#xff09;分段…

MouseBoost PRO for Mac激活版:强大的 鼠标增强软件

在追求高效工作的今天&#xff0c;MouseBoost PRO for Mac成为了许多Mac用户的得力助手。这款功能强大的鼠标增强软件&#xff0c;以其独特的智能化功能和丰富的实用工具&#xff0c;让您的电脑操作更加便捷、高效。 MouseBoost PRO for Macv3.4.0中文激活版下载 MouseBoost PR…

nginxconfig.io项目nginx可视化配置--搭建-视频

项目地址 https://github.com/digitalocean/nginxconfig.io搭建视频 nginxconfig.io搭建 nginxconfig.io搭建 展示效果 找到这个项目需要的docker镜像&#xff0c;有项目需要的node的版本 docker pull node:20-alpine运行这个node容器,在主机中挂载一个文件夹到容器中 主机&a…

Python 与 TensorFlow2 生成式 AI(四)

原文&#xff1a;zh.annas-archive.org/md5/d06d282ea0d9c23c57f0ce31225acf76 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第九章&#xff1a;文本生成方法的崛起 在前几章中&#xff0c;我们讨论了不同的方法和技术来开发和训练生成模型。特别是在第六章“使用 …

LLM应用:让大模型prompt总结生成Mermaid流程图

生成内容、总结文章让大模型Mermaid流程图展示&#xff1a; mermaid 美人鱼, 是一个类似 markdown&#xff0c;用文本语法来描述文档图形(流程图、 时序图、甘特图)的工具&#xff0c;您可以在文档中嵌入一段 mermaid 文本来生成 SVG 形式的图形 Prompt 示例&#xff1a;用横向…

基于OSAL 实现UART、LED、ADC等基础示例 4

1 UART 实验目的 串口在我们开发单片机项目是很重要的&#xff0c;可以观察我们的代码运行情况&#xff0c;本节的目的就 是实现串口双工收发。 虽然说 osal 相关的代码已经跟硬件关系不大了&#xff0c;但是我们还是来贴出相关的硬件原理图贴出来。 1.1 串口初始化 osal_ini…

latex使用bib引用参考文献时,正文编号顺序乱序解决办法,两分钟搞定!

一、背景 用Latex写文章时&#xff0c;使用bib添加参考文献是一种最为简便的方式。但有的期刊模板&#xff0c;如机器人顶会IROS&#xff0c;会出现正文参考文献序号没按顺序排列的情况&#xff0c;如下图所示。按理说文献[4]应该是文献[2]&#xff0c;[2]应该是[3]&#xff0…

Go中为什么不建议用锁?

Go语言中是不建议用锁&#xff0c;而是用通道Channel来代替(不要通过共享内存来通信&#xff0c;而通过通信来共享内存)&#xff0c;当然锁也是可以用&#xff0c;锁是防止同一时刻多个goroutine操作同一个资源&#xff1b; GO语言中&#xff0c;要传递某个数据给另一个gorout…

Java项目:88 springboot104学生网上请假系统设计与实现

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本学生网上请假系统管理员&#xff0c;教师&#xff0c;学生。 管理员功能有个人中心&#xff0c;学生管理&#xff0c;教师管理&#xff0c;…

22 重构系统升级-实现不停服的数据迁移和用户切量

专栏的前 21 讲&#xff0c;从读、写以及扣减的角度介绍了三种特点各异的微服务的构建技巧&#xff0c;最后从微服务的共性问题出发&#xff0c;介绍了这些共性问题的应对技巧。 在实际工作中&#xff0c;你就可以参考本专栏介绍的技巧构建新的微服务&#xff0c;架构一个具备…

vue3 安装-使用之第一篇

首先需要node版本高于V16.14.1 安装 执行 npm create vitelatest 具体选择按照自己实际需要的来 Project name:项目名称 Select a framework:选择用哪种框架 &#xff08;我选择vue&#xff09; Select a variant: 选择用JS还是TS&#xff08;我选择JS&#xff09;找到项目&…

STM32 HAL库F103系列之IIC实验

IIC总线协议 IIC总线协议介绍 IIC&#xff1a;Inter Integrated Circuit&#xff0c;集成电路总线&#xff0c;是一种同步 串行 半双工通信总线。 总线就是传输数据通道 协议就是传输数据的规则 IIC总线结构图 ① 由时钟线SCL和数据线SDA组成&#xff0c;并且都接上拉电阻…

(7)快速调优

文章目录 前言 1 安装脚本 2 运行 QuikTune 3 高级配置 前言 VTOL QuikTune Lua 脚本简化了为多旋翼飞行器的姿态控制参数寻找最佳调整的过程。 脚本会缓慢增加相关增益&#xff0c;直到检测到振荡。然后&#xff0c;它将增益降低 60%&#xff0c;并进入下一个增益。所有增…