SpringCloudStream整合MQ

news2025/1/27 12:59:51

目录

概念

快速搭建SCS环境

一秒切换MQ

组件

1. Binder

2. Binding

3. Message

分组消费


概念

        Spring Cloud Stream(SCS) 的主要目标是一套代码,兼容所有MQ, 降低MQ的学习成本,提供一致性的编程模型,让开发者能更容易地集成/切换消息组件(如 Apache Kafka、RabbitMQ、RocketMQ)

官网地址:Spring Cloud Stream

快速搭建SCS环境

1. 引入pom依赖

    <dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
			<version>2021.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-acl</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-client</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.9.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-acl</artifactId>
			<version>4.9.1</version>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-dependencies</artifactId>
				<version>2.3.4.RELEASE</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Hoxton.SR6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

2. 配置文件application.properties

# mq地址
spring.cloud.stream.rocketmq.binder.name-server=192.168.6.128:9876

spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain

3. 生产者和消费者代码

@RestController
public class SendMessageController {

    @Autowired
    private Source source;

    @GetMapping("/send")
    public Object send(String message) {
        MessageBuilder<String> messageBuilder =
                MessageBuilder.withPayload(message);
        source.output().send(messageBuilder.build());
        return "message sended : "+message;
    }
}
@Component
public class MessageConsumer {

    @StreamListener(Sink.INPUT)
    public void process(Object message) {
        System.out.println("received message : " + message);
    }
}

4. 验证生产消息,消费消息

 

一秒切换MQ

修改pom文件, 改成目标MQ依赖

        <!--Kafka依赖-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		
		<!--RocketMq依赖-->
		<!--<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
			<version>2021.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-acl</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-client</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.9.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-acl</artifactId>
			<version>4.9.1</version>
		</dependency>-->
		
		<!--RabbitMQ依赖-->
		<!--<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
		</dependency>-->

组件

1. Binder

        SCS通过Binder定义一个外部消息服务器。默认情况下,SCS会使用对应的 SpringBoot插件来构建Binder。

例如RabbitMQ默认值配置

spring.rabbitmq.host=local
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

        在SCS中,支持配置多个Binder访问不同的外部消息服务器。这些配置是通过spring.cloud.stream.binders. [bindername].environment.[props]=[value]的方式进行配置。另外,如果配置了多个Binder,也可以通过spring.cloud.stream.default-binder属性指定默认的 Binder。

spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.host=loca
lhost
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.username=
guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.password=
guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.virtualhost=/
# 指定默认binder
spring.cloud.stream.default-binder=testbinder

 

2. Binding

        Binding是SCS中实际进行消息交互的桥梁。在SCS中,通过Binding和 Binder建立绑定关系,客户端就通过Binding实现的消息收发。 在SCS框架中,配置Binding首先对他进行声明。声明Binding的方式,是在启动类中引入@EnableBinding注解。应用会向Spring容器中注入一个Binding接口的实例对象。在SCS中,默认提供了 Source、Sink、Process三个接口对象,分别代表消息的生产者、消费者和中间处理者。

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

public interface Processor extends Source, Sink {
}

 binding配置项

# 队列名
spring.cloud.stream.bindings.output.destination=scstreamExchange
# 指定binder。
spring.cloud.stream.bindings.output.binder=testbinder

spring.cloud.stream.bindings.input.destination=scstreamExchange
# 消费群组
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
# 指定binder。
spring.cloud.stream.bindings.input.binder=testbinder

# 最大重试次数
spring.cloud.stream.bindings.input.consumer.max-attempts=3

3. Message

 在不同的MQ产品中,对于消息的定义其实也是不相同,SCS框架就需要对这些消息类型进行统一。消息结构包括请求头和消息体。

public interface Message<T> {
    T getPayload();

    MessageHeaders getHeaders();
}

         Payload就是消息体,在SCS中定义成了一个泛型,可以直接传递对象。MessageHeaders是消息的头部属性,也可以说是消息的补充属性。不同的MQ产品下,就可以通过不同的MessageHeaders属性来代表各自的消息差异,消息内容可以通过Payload统一。

        例如,RabbitMQ中有一个非常重要的概念routingKey。通过routingKey可以定制Exchange与Queue之间的路由关系。这个routingKey就可以通过在Headers当中指定一个routingkey属性来实现。

MessageBuilder<String> messageBuilder =
MessageBuilder.withPayload(message).setHeader("routingkey","info");

分组消费

分组消费机制:是在生产者实例和消费者实例之间建立一种对应关系,生产者实例发出的消息只会被对应的消费者消费

1. 设置分区规则, 提前设置好分区ID, 用ID匹配

# 生产者配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
# 指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
# 只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1

# 消费者配置
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=myinput
# 启动消费分区 新版本这个属性已经取消,改为由分区表达式自动判断
spring.cloud.stream.bindings.input.consumer.partitioned=true
# 参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
# 设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=1

 2. 设置分区规则2, 根据请求头属性匹配

分区提取器

// 增加分区提取器-提取匹配键值
public class MyPartitionKeyExtractor implements PartitionKeyExtractorStrategy {
    public static final String PARTITION_PROP="partition";
    @Override
    public Object extractKey(Message<?> message) {
        return message.getHeaders().get(MyPartitionKeyExtractor.PARTITION_PROP);
    }
}

分区匹配器

public class MyPartitionSelectorStrategy implements PartitionSelectorStrategy {
    @Override
    public int selectPartition(Object key, int partitionCount) {
        return Integer.parseInt(key.toString()) % partitionCount;
    }
}

分区配置文件

# 添加生产者的分区配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.output.binder=testbinder

# 指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2

#只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1

# 动态生成分区键
spring.cloud.stream.bindings.output.producer.partition-key-extractorname=myPartitionKeyExtractor
spring.cloud.stream.bindings.output.producer.partition-selectorname=myPartitionSelector

发送消息

    @GetMapping("/send2")
    public Object send2(String message) {
        // 发送4条消息, 设置请求头0 1 2 3
        for (int i = 0; i < 4; i++) {
            MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message)
                    .setHeader(MyPartitionKeyExtractor.PARTITION_PROP, i);
            source.output().send(messageBuilder.build());
        }
        return "message sended : "+message;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1420472.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt之窗口位置

Qt提供了很多关于获取窗体位置及显示区域大小的函数&#xff0c;如x&#xff08;&#xff09;&#xff0c;y()和pos()&#xff0c;rect()&#xff0c;size()&#xff0c;geometry()等&#xff0c;统称为"位置相关函数"或"位置函数"。几种主要位置函数及其之…

Python爬虫实践指南:利用cpr库爬取技巧

引言 在信息时代&#xff0c;数据是无价之宝。为了获取网络上的丰富数据&#xff0c;网络爬虫成为了不可或缺的工具。在Python这个强大的编程语言中&#xff0c;cpr库崭露头角&#xff0c;为网络爬虫提供了便捷而高效的解决方案。本文将深入探讨如何利用cpr库实现数据爬取的各…

SpringBoot整合EasyCaptcha图形验证码

简介 EasyCaptcha&#xff1a;https://github.com/ele-admin/EasyCaptcha Java图形验证码&#xff0c;支持gif、中文、算术等类型&#xff0c;可用于Java Web、JavaSE等项目。 添加依赖 <dependency><groupId>com.github.whvcse</groupId><artifactId…

2023最新版克魔助手抓包教程(9) - 克魔助手 IOS 数据抓包

引言 在移动应用程序的开发中&#xff0c;了解应用程序的网络通信是至关重要的。数据抓包是一种很好的方法&#xff0c;可以让我们分析应用程序的网络请求和响应&#xff0c;了解应用程序的网络操作情况。克魔助手是一款非常强大的抓包工具&#xff0c;可以帮助我们在 Android…

Shell脚本⑦awk

目录 一.awk概述 1.awk介绍 2.基本格式 3.工作原理 4.常见的内建变量 二.awk基本操作 1.打印文本内容 &#xff08;1&#xff09;打印磁盘使用情况 &#xff08;2&#xff09;打印字符串 &#xff08;3&#xff09;打印字符串确定文件有多少行 2.根据$n以及NR提取字…

http和https的区别是什么?https有什么优缺点?

HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一个简单的请求-响应协议&#xff0c;它通常运行在TCP之上。它指定了客户端可能发送给服务器什么样的消息以及得到什么样的响应。这个简单模型是早期Web成功的有功之臣&#xff0c;因为它…

【JVM】运行时数据区域,内存如何分配和对象在内存中的组成

目录 一.运行时数据区域 1.线程独享 2.线程共享 二.内存如何分配 1.指针碰撞法 2.空闲列表法 3.TLAB 三.对象在内存中的组成 ​编辑1.对象头 2.实例数据 3.对齐填充 一.运行时数据区域 1.线程独享 &#xff08;1&#xff09;栈 虚拟机栈&#xff1a;每个 Java 方法在…

如何在centos云服务器上持续运行

一、直接上命令 cd到jar包所在目录 输入命令运行 nohup java -jar xxx.jar & 退出当前命令 二、云服务器上安装宝塔管理面板 直接用宝塔的进程守护&#xff0c;设置好当前进程输入参数保存就ok

Linux第40步_移植ST公司的uboot

一、查看ST公司的uboot源码包 ST公司的uboot源码包在虚拟机中的路径&#xff1a; “/home/zgq/linux/atk-mp1/stm32mp1-openstlinux-5.4-dunfell-mp1-20-06-24/sources/arm-ostl-linux-gnueabi/u-boot-stm32mp-2020.01-r0”&#xff1b; “u-boot-stm32mp-2020.01-r0”就是S…

Android MTE技术详解

1.MTE概念 MTE&#xff08;内存标记扩展&#xff09;是ARM v8.5-A新增的一项缓解内存安全的机制。在Android Linux现有的安全机制中&#xff0c;类似的机制有ASAN、HWSAN。但两者因为性能开销代价高昂&#xff0c;不适用于广泛部署&#xff08;仅调试使用&#xff09;。MTE当前…

springboot131企业oa管理系统

企业OA管理系统 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了企业OA管理系统的开发全过程。通过分析企业OA管理系统管理的不足&#xff0c;创建了一个计算机管理企业OA管理系统的方案。文章介绍了企业OA管…

定制红酒:为您的爱情、友情、亲情定制专属红酒

红酒&#xff0c;这种充满浪漫与情感的饮品&#xff0c;早已超越了单纯的味觉享受&#xff0c;成为人们表达情感、传递心意的载体。云仓酒庄洒派定制红酒&#xff0c;正是为那些珍视爱情、友情、亲情的人们提供了一种表达情感的新方式。 ① 爱情之酒 当你们即将步入婚姻的礼堂…

美国DDOS服务器:应对攻击的策略与技术

分布式拒绝服务(DDOS)攻击是一种常见的网络攻击手段&#xff0c;旨在通过大量无用的请求拥塞目标服务器&#xff0c;使其无法正常处理合法请求。美国作为全球互联网技术的领先者&#xff0c;其DDOS服务器在应对这类攻击时具有一系列先进的技术和策略。本文将详细介绍美国DDOS服…

浏览器内存泄漏排查指南

1、setTimeout执行原理 使用setInterval/setTimeOut遇到的坑 - 掘金 2、Chrome自带的Performance工具 当我们怀疑页面发生了内存泄漏的时候&#xff0c;可以先用Performance录制一段时间内页面的内存变化。 点击开始录制执行可能引起内存泄漏的操作点击停止录制 如果录制结束…

C# OpenCvSharp DNN Gaze Estimation 视线估计

目录 介绍 效果 模型信息 项目 代码 frmMain.cs GazeEstimation.cs 下载 C# OpenCvSharp DNN Gaze Estimation 介绍 训练源码地址&#xff1a;https://github.com/deepinsight/insightface/tree/master/reconstruction/gaze 效果 模型信息 Inputs ----------------…

利用牛顿方法求解非线性方程(MatLab)

一、算法原理 1. 牛顿方法的算法原理 牛顿方法&#xff08;Newton’s Method&#xff09;&#xff0c;也称为牛顿-拉弗森方法&#xff0c;是一种用于数值求解非线性方程的迭代方法。其基本思想是通过不断迭代来逼近方程的根&#xff0c;具体原理如下&#xff1a; 输入&#…

菜单栏应用管理 -- Bartender 4

Bartender 4是一款旨在优化和简化Mac菜单栏管理的强大工具。它具有以下特色功能&#xff1a; 组织和管理菜单栏图标&#xff1a;Bartender 4允许用户轻松组织和管理菜单栏中的图标&#xff0c;可以隐藏不常用的图标&#xff0c;保持菜单栏的整洁和简洁。同时&#xff0c;用户还…

RAG——应用——七个最常见的故障点

近日&#xff0c;国外研究者发布了一篇论文《Seven Failure Points When Engineering a Retrieval Augmented Generation System》&#xff0c;探讨了在实际工程落地RAG应用过程中容易出的七类问题。 论文地址&#xff1a;https://arxiv.org/pdf/2401.05856.pdf 一、丢失内容&…

MacOS安装反编译工具JD-GUI以及解决无法打开的问题

目录 一.下载地址 二.安装 三.问题 四.解决办法 1.显示包内容 2.找到Contents/MacOS/universalJavaApplicationStub.sh 3.修改sh文件 4.保存后再次打开即可 一.下载地址 Java Decompiler 二.安装 将下载下来的 jd-gui-osx-1.6.6.tar 解压&#xff0c;然后将 JD-GUI.a…

提升工作效率,畅享便捷PDF编辑体验——Adobe Acrobat Pro DC 2023

作为全球领先的PDF编辑软件&#xff0c;Adobe Acrobat Pro DC 2023将为您带来前所未有的PDF编辑体验。无论您是个人用户还是企业用户&#xff0c;Adobe Acrobat Pro DC 2023将成为您提高工作效率、简化工作流程的得力助手。 一、全面编辑功能 Adobe Acrobat Pro DC 2023提供了…