spring Cloud Stream 实战应用深度讲解

news2024/7/6 17:47:48

springCloudStream

简介

Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。

该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。

核心模块

  • Destination Binders: 负责提供与外部消息系统集成的组件
  • Destination Bindings: 外部消息系统和用户程序代码之间的桥梁(生产者-使用者之间的桥梁)
  • Message:生产者和消费者用于与Destination Binders(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

历史

Spring 的数据集成之旅始于 Spring Integration。通过其编程模型,它提供了一致的开发人员体验来构建应用程序,这些应用程序可以采用企业集成模式来连接外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,可以无缝开发独立的、基于 Spring 的生产级微服务。

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放在一个新项目中。Spring Cloud Stream 诞生了。

架构模型

在这里插入图片描述

这张图是spring-stream官网的,里面的Middleware指的就是RabbitMQ或者KafKa这些消息队列。

下图是我们原来和消息队列通信的方式。我们的程序直接发送数据给MQ或者监听到MQ的数据。

在这里插入图片描述

通过spring stream来做的话,就增加了Binder层来做统一调度,我们的程序只需要和Binder层通信,不需要关注底层的MQ是RabbitMQ还是Kafka

目前官方提供了两个Binder,分别是RabbitMQ的和Kafka的,其余队列的有一些第三方维护的。同时我们也可以自己实现Binder

一开始图中的InputOutput是对于spring stream来说的,input就是输入消息到stream中,output就是输出消息到我们的程序中。

简单介绍一下Binder,其实就是策略模式,统一接口实现,比如MQ1里面发送消息到MQ的方法叫Publish,MQ2里面发送消息到MQ的方法叫Release,但是在Binder接口里面提供了一个方法,就叫做add。也只需要提供一个Message消息。

public interface Binder{
    function add(Message msg);
}

// 连接MQ1的Binder
public class Binder1 implements Binder{
    public function add(Message msg){
        // 消息处理
        // 发送到MQ1
        publish(msg);
    }
}

// 连接MQ2的Binder
public class Binder2 implements Binder{
    public function add(Message msg){
        // 消息处理
        // 发送到MQ2
        release(msg);
    }
}

当我们使用的时候只需要自己决定使用哪个Binder就可以了。就是就和连接数据库一样,不需要关心连接的是Mysql还是PostgreSql。

public class main{
    public static function main() {
        Binder binder = new Binder1();
        Message msg = new Message();
        binder.add(msg);
    }
}
Bindings

Bindings作为一个桥梁,负责连接MQ和用户代码。比如绑定一个代码作为input往某一个Queue里面输入信息,绑定一个代码作为output从某个Queue里面接收信息。然后我们使用Binder来实现推送消息到MQ和消费消息。

这里是官网原文:The application communicates with the outside world by establishing bindings between destinations exposed by the external brokers and input/output arguments in your code. Broker specific details necessary to establish bindings are handled by middleware-specific Binder implementations.

下图为Bindings和Binder的关系

在这里插入图片描述

source 和 sink

source其实就是发送方的发送的Message. sink就是接收方接受的Message

注解实现

注解的实现已经被彻底删除,只有之前低版本的还能使用

函数式编程实现示例

依赖引入

将下面的代码加入pom文件,然后使用maven导入相关依赖即可。

// 引入spring cloud stream依赖
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
// 引入spring cloud stream的rabbit binder依赖
// 如果是kafka,那么把这个换成kafka的binder
// 在这个binder里面已经引入了 rabbit MQ依赖,所以不需要再单独引入rabbit MQ了
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream: # stream的配置
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

生产者

配置文件修改

对于函数式编程来说,spring cloud stream有一些约定或者说规定。比如我们注册了一个logPubBean,那么它对应的bindings配置的名称就是logPub-in-0或者logPub-out-0,前面是我们的方法名,中间表示生产者或消费者,in表示消费者,out表示生产者。这里的in or out是对于我们的代码来说的。后面的0就是一个序号。

写生产者之前我们需要加上对应的bindings配置。如果注册了多个Bean作为生产者或消费者,那么还需要配置哪些Bean是生产者和消费者。


spring:
  cloud:
    function: # 配置哪些Bean是Stream可以用的
        definition: log;logPub;sendLog
    stream: # stream的配置
        bindings: # 服务的整合处理
            logPub-out-0:
                destination: log # 表示要使用的Exchange名称定义,不存在会自动创建
                content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
写代码

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logProducer {

}

然后开始编写生产者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值只能是Supplier函数接口类型。不能是其他的。

方法里面可以写生产者的具体代码。会注册一个名为logPubBean作为生产者。

@Component
public class logListener {
    @Bean
    public Supplier<logListener.Person> logPub() {
        return () -> {
            Person person = new Person();
            person.setName("张三");
            System.out.println("生产者:"+person);
            return person;
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return this.name;
        }
    }
}

关于Supplier,这个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Supplier的注释和定义

//Represents a supplier of results.
//There is no requirement that a new or distinct result be returned each time the supplier is invoked.
//This is a functional interface whose functional method is get().

public interface Supplier<T>

翻译过来大概就是:一个结果的提供者或者一个结果的生产者。正好对应我们的生产者。该接口只有一个方法T get(),没有参数并且仅返回一个结果。

运行

运行的话会发现控制台一直在打印。我们的队列里面也一直在新增。

在这里插入图片描述

StreamBridge

当前的运行方式是当写完生产者以后,spring cloud stream会1/s次来调用我们的生产者,但是我们一般是自己来控制生产者的调用。就可以使用下面的方法。

我们可以通过StreamBridge来做到这一点。他有四个send方法。

  • public boolean send(String bindingName, Object data):第一个参数是bindingName,我们输入的是sendLog,就需要增加sendLog的配置,我们也可以用之前的logPub-out-0。第二个参数是发送的数据。
  • public boolean send(String bindingName, Object data, MimeType outputContentType):比上面的多了一个数据类型。
  • public boolean send(String bindingName, @Nullable String binderName, Object data):还可以指定Binder的name
  • public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType): 四个参数放在一起了。
@RestController
public class logController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/sendLog")
    public void sendLog() {
        logListener.Person person = new logListener.Person();
        person.setName("李四");
        System.out.println("生产者发送消息"+person);
        streamBridge.send("sendLog", person);
    }
}

消费者

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logListener {

}

然后开始编写消费者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值可以是Consumer,也可以是Function。不能是其他的。

方法里面就可以写消费的具体代码了。

@Component
public class logListener {
    @Bean
    public Consumer<logListener.Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return this.name;
        }
    }
}

关于ConsumerFunction,这两个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Consumer接口的注释和接口的定义。

//Represents an operation that accepts a single input argument and returns no result. Unlike most other functional interfaces, Consumer is expected to operate via side-effects.
//This is a functional interface whose functional method is accept(Object).

public interface Consumer<T>

翻译过来大概就是说Consumer接口仅接收一个参数并且没有返回值,我们的代码里面也可以看到,接收了一个person参数,没有return。

该接口只有一个方法void accept(T t),T类型就是我们的Person类型。

下面是Function接口的注释和定义

//Represents a function that accepts one argument and produces a result.
//This is a functional interface whose functional method is apply(Object).
public interface Function<T, R>

翻译过来大概就是说Function接口仅接收一个参数并且返回一个结果。该接口只有一个方法R apply(T t),接收一个T类型的参数,返回一个R类型的结果。

在这里插入图片描述

手动ACK

通过禁止使用死信队列来执行手动的ACK,这个时候如果抛出异常,则会重试。如果开启了死信队列,那么抛出异常以后则会进入死信队列。

log-in-0:
    consumer:
        auto-bind-dlq: false

队列持久化

上面可以看出来,创建的都是匿名队列,当程序启动的时候自动创建,当程序关闭的时候自动删除。

但是正常开发中,很少使用这种,都会指定一个持久化的队列,不管程序是否运行,队列都存在。

我们可以在bindings的配置里面增加group配置来显式指定哪个队列,我们指定log123队列。

log-in-0:
    destination: log
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    group: log123
sendLog:
    destination: log
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    group: log123

再次运行程序,可以看到该队列被创建。接下来停止程序,可以看到队列还存在那里。

bindings重命名

默认约定的名称为log-in-0这种形式

但是我们也可以将它重命名。通过配置文件可以将log-in-0重命名为input,不过这样的话,所有的log-in-0的bindings配置都需要修改成input,使用上也是。注意官方并不推荐这种做法,他们认为在大多数情况下,这有点矫枉过正。

spring:
    cloud:
        stream:
            function:
                bindings:
                    log-in-0: input

显式绑定创建

默认约定的是log-in-0负责输入,log-out-0负责输出,我们也可以显式的创建这些。

通过配置文件

spring:
    cloud:
        stream:
            input-bindings: login;fooin
            output-bindings: logout;fooout

轮询配置属性

spring:
    integration:
        poller:
            # 全局配置
            fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L 
            maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L
            cron: none # Cron 触发器的 Cron 表达式值。默认 none
            initialDelay: 0 # 周期性触发的初始延迟。 默认0
            timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

也可以单独为某个bindings来配置

spring:
    cloud:
        stream:
            bindings:
                log-out-0:
                    producer:
                        poller:
                            # log-out-0的单独配置
                            fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L 
                            maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L
                            cron: none # Cron 触发器的 Cron 表达式值。默认 none
                            initialDelay: 0 # 周期性触发的初始延迟。 默认0
                            timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

函数组合

假设我们有两个处理Bean,enrich负责检查header,如果缺少foo,就添加为foo,bar。然后第二个echo则负责检查是否包含foo这个Header然后输出消息内容。

@Bean
public Function<Message<String>, Message<String>> enrich() {
    return message -> {
        Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
        return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
    };
}

@Bean
public Function<Message<String>, Message<String>> echo() {
    return message -> {
        Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
        System.out.println("Incoming message " + message);
        return message;
    };
}

通过配置将这两个bean组合起来,组合之后,这个bean名称就编程了enrich|echo,后续的配置都需要这种冗长的名称,所以这里官方推荐使用重命名的方式将它变成简单的名称。

spring:
    cloud:
        function:
            definition: enrich|echo # 函数组合
        stream:
            function: 
                bindings:
                    enrich|echo-in-0: input # 重命名

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1406266.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Maven】-- 打包添加时间戳的两种方法

一、需求 在执行 mvn clean package -Dmaven.test.skiptrue 后&#xff0c;生成的 jar 包带有自定义系统时间。 二、实现 方法一&#xff1a;使用自带属性&#xff08;不推荐&#xff09; 使用系统时间戳&#xff0c;但有一个问题&#xff0c;就是默认使用 UTC0 的时区。举例…

什么叫特征分解?

特征分解&#xff08;Eigenvalue Decomposition&#xff09;是将一个方阵分解为特征向量和特征值的过程。对于一个 nn 的方阵A&#xff0c;其特征向量&#xff08;Eigenvector&#xff09;v 和特征值&#xff08;Eigenvalue&#xff09; λ 满足以下关系&#xff1a; 这可以写…

Python批量自动处理文件夹

假如在你的电脑硬盘某文件夹里有这样一堆不同格式的文件&#xff0c;看起来非常混乱&#xff0c;详情如图所示&#xff1a; 为了方便自己快速检索到文件&#xff0c;我们需要将这些文件按照不同格式分类整理到不同的文件夹中&#xff0c;应该怎么做呢&#xff1f;源码如下&…

v39.for循环while循环

1.循环引入&#xff08;loops&#xff09; 2.while loop 当括号中表达式expression返回真值时&#xff0c;代码块执行 。之后将再次检查expression &#xff0c;如果仍然返回真值&#xff0c;继续执行......直到expression返回假值。 3.for loop 有初始化、条件、递增/减 步骤。…

Zookeeper集群 + Kafka集群,Filebeat+Kafka+ELK

目录 什么是Zookeeper&#xff1f; Zookeeper 工作机制 Zookeeper 特点 Zookeeper 数据结构 Zookeeper 选举机制 实验 部署 Zookeeper 集群 1.安装前准备 安装 JDK 下载安装包 2.安装 Zookeeper 修改配置文件 拷贝配置好的 Zookeeper 配置文件到其他机器上 在每个节…

uni-app (安卓、微信小程序)接口封装 token失效自动获取新的token

一、文件路径截图 1、新建一个文件app.js存放接口 //这里存放你需要的接口import {request} from /utils/request.js //这个文件是请求逻辑处理 module.exports {// 登录 -- 注册perssonRegister: (data) > { // 供应商注册 return request({url: manageWx/Login/perssonR…

npm i 报一堆版本问题

1&#xff0c;先npm cache clean --force 再下载 插件后缀加上 --legacy-peer-deps 2&#xff0c; npm ERR! code CERT_HAS_EXPIRED npm ERR! errno CERT_HAS_EXPIRED npm ERR! request to https://registry.npm.taobao.org/yorkie/download/yorkie-2.0.0.tgz failed, reason…

【echarts图表】提示暂无数据

【echarts图表】提示暂无数据 背景&#xff1a;echarts图表数据有时候为空数组[ ]&#xff0c;这时候渲染图表异常&#xff0c;需要提示暂无数据 // 提示 暂无数据 : noDataBox 样式 this.$nextTick(() > {const dom document.getElementById("echartsId");dom…

JAVA的面试题四

1.电商行业特点 &#xff08;1&#xff09;分布式&#xff1a; ①垂直拆分:根据功能模块进行拆分 ②水平拆分:根据业务层级进行拆分 &#xff08;2&#xff09;高并发&#xff1a; 用户单位时间内访问服务器数量,是电商行业中面临的主要问题 &#xff08;3&#xff09;集群&…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例4-9 HTML5 表单验证

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>HTML5 表单验证</title> </head><body> <form action"#" method"get">请输入您的邮箱:<input type"email&q…

如何基于 ESP32 芯片测试 WiFi 连接距离、获取连接的 AP 信号强度(RSSI)以及 WiFi吞吐测试

测试说明&#xff1a; 测试 WiFi 连接距离&#xff0c;是将 ESP32 作为 WiFi Station 模式来连接路由器&#xff0c;通过在开阔环境下进行拉距来测试。另外&#xff0c;可以通过增大 WiFi TX Power 来增大连接距离。 获取连接的 AP 信号强度&#xff0c;一般可以通过 WiFi 扫描…

matlab appdesigner系列-常用18-表格

表格&#xff0c;常用来导入外部表格数据 示例&#xff1a; 导入外界excel数据&#xff1a;data.xlsx 姓名年龄城市王一18长沙王二21上海王三56武汉王四47北京王五88成都王六23长春 操作步骤如下&#xff1a; 1&#xff09;将表格拖拽到画布上 2&#xff09;对app1右键进行…

【深度学习:集中偏差】减少计算机视觉数据集中偏差的 5 种方法

【深度学习&#xff1a;集中偏差】减少计算机视觉数据集中偏差的 5 种方法 有偏差的计算机视觉数据集会导致哪些问题&#xff1f;如何减少计算机视觉数据集中偏差的示例观察并监控带注释样本的类别分布确保数据集代表模型适用的人群明确定义对象分类、标记和注释的流程为标签质…

【书生·浦语】大模型实战营——第六课笔记

视频链接&#xff1a;https://www.bilibili.com/video/BV1Gg4y1U7uc/?vd_source5d94ee72ede352cb2dfc19e4694f7622 教程文档&#xff1a;https://github.com/InternLM/tutorial/blob/main/opencompass/opencompass_tutorial.md 仓库&#xff1a;https://github.com/open-compa…

(学习日记)2024.01.23:结构体、位操作和枚举类型

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

【JavaEE Spring】MyBatis 操作数据库 - 进阶

MyBatis 操作数据库 - 进阶 1. 动态SQL1.1 \<if>标签1.2 \<trim>标签1.3 \<where>标签1.4 \<set>标签1.5 \<foreach>标签1.6 \<include>标签 1. 动态SQL 动态 SQL 是Mybatis的强⼤特性之⼀&#xff0c;能够完成不同条件下不同的 sql 拼接…

[完美解决]Vue/React项目运行时出现this[kHandle] = new _Hash(algorithm, xofLen)

问题出现的原因 出现这个问题是node.js 的版本问题&#xff0c;因为 node.js V17开始版本中发布的是OpenSSL3.0, 而OpenSSL3.0对允许算法和密钥大小增加了严格的限制&#xff0c;可能会对生态系统造成一些影响。故此以前的项目在使用 nodejs V17以上版本后会报错。而github项目…

【6】密评中对服务端采用“挑战-响应”机制进行身份鉴别的验证

对服务端采用“挑战-响应”机制进行身份鉴别的验证 1、提取出服务端的签名值 签名值&#xff08;hex&#xff09;&#xff1a; 3045022100e4795b5a947526f8e7cbd0edd571ea8749e0efd24323799346ea2c740c006c5a0220026189e51c19d20d40a82606d0ed72cb9530a189bbb94c09e4559d7d8f…

[C++]使用yolov8的onnx模型仅用opencv和bytetrack实现目标追踪

【官方框架地址】 yolov8: https://github.com/ultralytics/ultralytics bytetrack: https://github.com/ifzhang/ByteTrack 【算法介绍】 随着人工智能技术的不断发展&#xff0c;目标追踪已成为计算机视觉领域的重要研究方向。Yolov8和ByTetrack作为当前先进的算法&…

【GitHub项目推荐--Git 教程】【转载】

本开源项目是 Will 保哥在 2013 第 6 界 IT 邦帮忙铁人赛年度大奖的得奖著作。这是一个 Git 教程&#xff0c;这个开源教程用 30 天的时间&#xff0c;带领大家详细了解使用 Git 。 重点介绍了 Git 的一些常用操作&#xff0c;以及日常工作中实际应用场景讲解&#xff0c;下图…