SpringCloudStream原理和深入使用

news2024/10/7 18:23:31

简单概述

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型

相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  • Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息

  • Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

**消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

Spring Message

Spring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。

package org.springframework.messaging;

public interface Message<T> {
    T getPayload();

    MessageHeaders getHeaders();
}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

@FunctionalInterface
public interface MessageChannel {

	long INDEFINITE_TIMEOUT = -1;

	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}

	boolean send(Message<?> message, long timeout);

}

消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅

public interface SubscribableChannel extends MessageChannel {

	boolean subscribe(MessageHandler handler);

	boolean unsubscribe(MessageHandler handler);

}

MessageHandler真正地消费/处理消息

@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

Spring-Cloud-Stream的架构

img

快速入门

引入依赖

        <!--stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

增加配置文件

spring:
    cloud:
        stream:
            # 定义消息中间件
            binders:
              MyRabbit:
                  type: rabbit
                  environment:
                    spring:
                        rabbitmq:
                            host: localhost
                            port: 5672
                            username: root
                            password: root
                            vhost: /
            bindings:
            # 生产者中定义,定义发布对象
              myInput:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
            # 消费者中定义,定义订阅的对象
              myOutput-in-0:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
        # 消费者中定义,定义输出的函数
        function:
            definition: myOutput

生产者

	@Resource
	private StreamBridge streamBridge;

	public void sendNormal() {
		streamBridge.send("myInput", "hello world");
	}

消费者

	@Bean("myOutput")
	public Consumer<Message<String>> myOutput() {
		return (message) -> {
			MessageHeaders headers = message.getHeaders();
			System.out.println("myOutput head is : " + headers);
			String payload = message.getPayload();
			System.out.println("myOutput payload is : " + payload);
		};
	}

如何自定义Binder

  1. 添加spring-cloud-stream依赖
  2. 提供ProvisioningProvider的实现
  3. 提供MessageProducer的实现
  4. 提供MessageHandler的实现
  5. 提供Binder的实现
  6. 创建Binder的配置
  7. META-INF/spring.binders中定义绑定器

添加spring-cloud-stream依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供ProvisioningProvider的实现

ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

public class FileProvisioningProvider implements ProvisioningProvider<
	ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {

	public FileProvisioningProvider() {
		super();
	}

	@Override
	public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}


	@Override
	public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}


	private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {

		private final String destination;

		private FileMessageDestination(final String destination) {
			this.destination = destination;
		}

		@Override
		public String getName() {
			return destination.trim();
		}

		@Override
		public String getNameForPartition(int partition) {
			throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
		}

	}

}

提供MessageProducer的实现

MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

		super.onInit();
		executorService = Executors.newScheduledThreadPool(1);
	}

	@Override
	public void doStart() {
		executorService.scheduleWithFixedDelay(() -> {
			String payload = getPayload();
			if (payload != null) {
				Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
				sendMessage(receivedMessage);
			}

		}, 0, 50, TimeUnit.MILLISECONDS);
	}

	@Override
	protected void doStop() {
		executorService.shutdownNow();
	}

	private String getPayload() {
		try {
			List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
			String currentPayload = allLines.get(allLines.size() - 1);
			if (!currentPayload.equals(previousPayload)) {
				previousPayload = currentPayload;
				return currentPayload;
			}
		} catch (IOException e) {
			FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
		}

		return null;
	}
}

提供MessageHandler的实现

MessageHandler提供产生事件所需的逻辑。

public class FileMessageHandler extends AbstractMessageHandler {
	FileExtendedBindingProperties fileExtendedBindingProperties;
	ProducerDestination destination;

	public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {
		this.destination = destination;
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}

	@Override
	protected void handleMessageInternal(Message<?> message) {
		try {
			if (message.getPayload() instanceof byte[]) {
				Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());
			} else {
				throw new RuntimeException("处理消息失败");
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}

提供Binder的实现

提供自己的Binder抽象实现:

  • 扩展AbstractMessageChannelBinder
  • 将自定义的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的通用参数
  • 重写createProducerMessageHandlercreateConsumerEndpoint方法
public class FileMessageChannelBinder extends AbstractMessageChannelBinder
	<ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>
	implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {

	FileExtendedBindingProperties fileExtendedBindingProperties;

	public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {
		super(headersToEmbed, provisioningProvider);
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}


	@Override
	protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
		FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);
		return fileMessageHandler;
	}


	@Override
	protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {
		FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);

		return fileMessageProducerAdapter;
	}


	@Override
	public FileConsumerProperties getExtendedConsumerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);
	}

	@Override
	public FileProducerProperties getExtendedProducerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);
	}


	@Override
	public String getDefaultsPrefix() {
		return fileExtendedBindingProperties.getDefaultsPrefix();
	}

	@Override
	public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
		return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();
	}
}

创建Binder的配置

严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean

@EnableConfigurationProperties(FileExtendedBindingProperties.class)
@Configuration
public class FileMessageBinderConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public FileProvisioningProvider fileMessageBinderProvisioner() {
		return new FileProvisioningProvider();
	}

	@Bean
	@ConditionalOnMissingBean
	public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {
		return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);
	}


	@Bean
	public FileProducerProperties fileConsumerProperties() {
		return new FileProducerProperties();
	}
}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1834011.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI工具快速制作爆火的影视视频混剪

今天给大家发一个有意思的工具&#xff0c;影视混剪大家应该都刷到过&#xff0c;像下面这种视频&#xff0c;播放量都超级高。 这种视频都是怎么做的呢&#xff1f; 现在AI工具这么多样性&#xff0c;先用 AI 写一段具有网感的对话段子&#xff0c;然后找影视剧片段混剪成一…

笑脸金融测试社招面经,期望20K

面经哥只做互联网社招面试经历分享&#xff0c;关注我&#xff0c;每日推送精选面经&#xff0c;面试前&#xff0c;先找面经哥 测试总监一面 1、问一些测试理论相关的知识。 自我介绍、质量模型 2、登录如何设计测试用例。 3、给你一个东西你会从哪些方面去考虑设计测试用…

【数据结构初阶】--- 堆的应用:topk

堆的功能&#xff1a;topk 为什么使用topk 先举个例子&#xff0c;假如说全国有十万家奶茶店&#xff0c;我现在想找到评分前十的店铺&#xff0c;现在应该怎么实现&#xff1f; 第一想法当然是排序&#xff0c;由大到小排序好&#xff0c;前十就能拿到了。这是一种方法&…

2024 年最新 Python 基于 LangChain 框架基础案例详细教程(更新中)

LangChain 框架搭建 安装 langchain pip install langchain -i https://mirrors.aliyun.com/pypi/simple/安装 langchain-openai pip install langchain-openai -i https://mirrors.aliyun.com/pypi/simple/ChatOpenAI 配置环境变量 环境变量 OPENAI_API_KEYOpenAI API 密钥…

在IDEA 2024.1.3 (Community Edition)中创建Maven项目

本篇博客承继自博客Windows系统Maven下载安装-CSDN博客 Maven版本&#xff1a;maven-3.9.5 修改设置&#xff1a; 首先先对Idea的Maven依赖进行设置&#xff1b;打开Idea&#xff0c;选择“Costomize”&#xff0c;选择最下边的"All settings" 之后找到Maven选项&…

聚四氟乙烯离心管 四氟反应管 消解管 PTFE螺口带盖管 特氟龙试管

一、产品介绍 样品悬浮液盛放在管状试样容器中&#xff0c;在离心机的高速旋转下&#xff0c;由于巨大的离心力作用&#xff0c;使悬浮的微小颗粒 以一定的速度沉降&#xff0c;从而与溶液得以分离。这种带密封盖或压盖的管状试样容器&#xff0c;就是离心管。 PTFE离心管&…

编程精粹—— Microsoft 编写优质无错 C 程序秘诀 03:强化你的子系统

这是一本老书&#xff0c;作者 Steve Maguire 在微软工作期间写了这本书&#xff0c;英文版于 1993 年发布。2013 年推出了 20 周年纪念第二版。我们看到的标题是中译版名字&#xff0c;英文版的名字是《Writing Clean Code ─── Microsoft’s Techniques for Developing》&a…

【面试干货】常见的编译时异常(运行时异常)及其处理

【面试干货】常见的编译时异常&#xff08;运行时异常&#xff09;及其处理 1、SQLException2、IOException3、FileNotFoundException4、ClassNotFoundException5、EOFException6、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Jav…

电能表厂家的研发能力是实力的体现

电能表厂家的研发能力无疑是其整体实力的核心体现。一个拥有强大研发能力的电能表厂家&#xff0c;不仅能够持续推出具有竞争力的新产品&#xff0c;满足市场需求&#xff0c;还能引领行业发展&#xff0c;塑造企业品牌形象。 一、研发能力对电能表厂家的重要性 研发能力是电…

图纸管理的方法、图纸管理软件

图纸管理是一个复杂且关键的过程&#xff0c;它涉及到图纸的创建、存储、共享、修改、审核、存档和检索等多个环节。以下是根据参考文章总结的图纸管理的具体内容和方法&#xff1a; 一、图纸管理的目的 1、确保图纸的准确性&#xff1a;通过规范的管理流程和质量控制措施&…

Failed to execute goal org.apache.maven.plugins:maven-antrun-plugin:1.8:

Mvan 点击执行 mvn install https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.8/maven-antrun-plugin-1.8.pom

小米手机怎么用代理换ip:步骤详解与实用指南

在数字化时代&#xff0c;网络安全与隐私保护日益受到重视。对于小米手机用户而言&#xff0c;使用代理换IP已成为提升网络安全性、访问特定网站或绕过地域限制的有效手段。本文将详细介绍如何在小米手机上设置代理以更换IP地址&#xff0c;帮助用户更好地保护个人信息和享受更…

【NOI-题解】1448. 随机体能测试1469. 数的统计1511. 数字之和为13的整数1846. 阿尔法乘积

文章目录 一、前言二、问题问题&#xff1a;1448. 随机体能测试问题&#xff1a;1469. 数的统计问题&#xff1a;1511. 数字之和为13的整数问题&#xff1a;1846. 阿尔法乘积 三、感谢 一、前言 本章节主要对嵌套循环的题目进行讲解&#xff0c;包括《1448. 随机体能测试》《1…

Swift开发——存储属性与计算属性

Swift语言开发者建议程序设计者多用结构体开发应用程序。在Swift语言中,结构体具有了很多类的特性(除类的与继承相关的特性外),具有属性和方法,且为值类型。所谓的属性是指结构体中的变量或常量,所谓的方法是指结构体中的函数。在结构体中使用属性和方法是因为:①匹别于结…

泛微开发修炼之旅--19ecode获取用户人员信息方案汇总及代码示例(含pc端和移动端)

文章详情链接&#xff1a;19ecode获取用户人员信息方案汇总及代码示例&#xff08;含pc端和移动端&#xff09;

Android Basis - Google Keybox

什么是Keybox Keybox 又称为Gooogle attestation key&#xff0c;是Google用于管理、统计运行GMS套件设备的一种手段。 通常我们会向Google申请keybox&#xff0c;结合可能得出货量&#xff0c;提供如下信息给到的Google。 1. fingerprint 2. device id 列表 举个例子&am…

(done) AFL 都有哪些阶段? Stage progress

参考资料&#xff1a;https://afl-1.readthedocs.io/en/latest/user_guide.html 所有阶段如下&#xff0c;包括详细的解释

下载lombok.jar包,简化类的代码

Download (projectlombok.org) 去这个网站下载lombok.jar包 打开这个包文件的位置,拖到项目lib文件夹: 在这里右键添加为库(Add as library)。 添加这三个注解即可&#xff0c;类里面不需要其他东西了

基于Python的垃圾分类检测识别系统(Yolo4网络)【W8】

简介&#xff1a; 垃圾分类检测识别系统旨在利用深度学习和计算机视觉技术&#xff0c;实现对不同类别垃圾的自动识别和分类。应用环境包括Python编程语言、主流深度学习框架如TensorFlow或PyTorch&#xff0c;以及图像处理库OpenCV等&#xff0c;通过这些工具集成和优化模型&a…

物联网技术-第3章物联网感知技术-3.3传感技术

目录 1.1什么是传感器 1.1.1生活中的传感器 1.1.2人的五官与传感器 1.1.3传感器的定义 1.1.4传感器的组成 1.2传感器的特性 1.2.1传感器的静态特征 1、灵敏度&#xff08;静态灵敏度&#xff09; 2.精度 3.线性度&#xff08;非线性误差&#xff09; 4.最小检测量&a…