SpringBoot整合SpringCloudStream3.1+版本Kafka

news2025/1/12 19:07:30

SpringBoot整合SpringCloudStream3.1+版本Kafka

下一节直通车

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

为什么用SpringCloudStream3.1

  1. Springcloud架构提供,基于spring生态
  2. 能够快速切换市面上常见的MQ产品
  3. 3.1后使用配置文件的形式定义channel,不再需要3.1前的硬编码

Jar

<!--SpringCloudStream Kafka-->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
	<version>3.2.4</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<version>3.2.4</version>
</dependency>
<!--Kafka相关-->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.8.6</version>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
	<version>2.6.8</version>
</dependency>
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<version>1.18.24</version>
</dependency>

配置文件yml

kafka基本配置(application-mq.yml)

server:
  port: 7105
spring:
  application:
	name: betrice-message-queue
  config:
	import:
	- classpath:application-bindings.yml
  cloud:
	stream:
	  kafka:
		binder:
		  brokers: localhost:9092
		  configuration:
			key-serializer: org.apache.kafka.common.serialization.StringSerializer
			value-serializer: org.apache.kafka.common.serialization.StringSerializer
		  consumer-properties:
			enable.auto.commit: false
	  binders:
		betrice-kafka:
		  type: kafka
		  environment:
			spring.kafka:
			  bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

classpath:application-bindings.yml 引入通道绑定配置文件,即消息生产、消费者的关系。

通道绑定配置(application-bindings.yml)

参数含义:
spring.cloud.stream.function.definition:定义channel名字,每个channel又可以作为生产者(in)与消费者(out)
spring.cloud.stream.bindings: 是一个map形式(具体看源码的Properties定义)
1. destination(生产/消费者通向的topic);
2. group:消费者组名;
3. binder:绑定当前使用的MQ类型(见betrice-kafka);
4. content-type:消息序列/反序列化的类型(见源码的支持的类型)

spring:
  cloud:
	stream:
	  betrice-default-binder: betrice-kafka
	  function:
		# 声明两个channel,transfer接收生产者的消息,处理完后给sink
		definition: transfer;sink;gather;gatherEcho
	  bindings:
		# 添加生产者binding,输出到destination对应的topic
		Evad05:
		  destination: Evad07
		  binder: ${spring.cloud.stream.betrice-default-binder}
		transfer-in-0:
		  destination: Evad07
		  binder: ${spring.cloud.stream.betrice-default-binder}
		transfer-out-0:
		  destination: Evad08
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain
		sink-in-0:
		  destination: Evad08
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain

Controller

/**
 * @author Evad.Wu
 * @Description 消息队列 控制类
 * @date 2023-02-10
 */
@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
	@Resource(name = "betriceKafkaProducer")
	private Producer<String, String> producer;
	@Resource(name = "streamBridgeUtils")
	private StreamBridge streamBridge;

	@PostMapping("send")
	public void send(String topic, String key, String message) {
		try {
			producer.send(new ProducerRecord<>(topic, key, message));
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}

	@PostMapping("streamSend")
	public void streamSend(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}
}

Channel(生产、消费者通道)

/**
 * @author Evad.Wu
 * @Description mq消息通道 配置类
 * @date 2023-02-11
 */
@Configuration
public class BetriceMqSubChannel {
	/**
	 * Function方式声明binding,相当于同时声明了一个Producer的Bindng和一个Consumer的Binding。
	 * transfer-in-0 表示消费者  transfer-out-0 表示生产者
	 * 其作用就在于,echo-in-0收到的消息,立即就会通过echo-out-0发送出去。
	 */
	@Bean
	public Function<String, String> transfer() {
		return message -> {
			System.out.println("transfer: " + message);
			throw new RuntimeException("死信队列测试!");
//            return "transfer:" + message;
		};
	}

	/**
	 * Consumer声明一个消息消费者,sink1就对应sink-in-0
	 */
	@Bean
	public Consumer<String> sink() {
		return message -> {
			System.out.println("******************");
			System.out.println("At Sink1");
			System.out.println("******************");
			System.out.println("Received message " + message);
		};
	}
}

结果

在这里插入图片描述

参考网址

SpringCloudStream实战拆解以及3.1后新版本特性分析
@EnableBinding @deprecated 自 3.1 起支持函数式编程模型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/770048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python接口自动化测试之详解post请求

前言 在HTTP协议中&#xff0c;与get请求把请求参数直接放在url中不同&#xff0c;post请求的请求数据需通过消息主体(request body)中传递。 且协议中并没有规定post请求的请求数据必须使用什么样的编码方式&#xff0c;所以其请求数据可以有不同的编码方式&#xff0c;服务…

C#线性插值,三角插值

什么是插值&#xff1f; 是什么&#xff1f;很简单&#xff01; 已知两点&#xff0c;推断中间的每一点的过程。 有什么用&#xff1f; 很简单&#xff01;位置从30到40耗时3秒求每一时刻的位置&#xff01; 1.线性插值 设v是结果&#xff0c;start是开始&#xff0c;end是结…

【贪心算法part02】| 122.买卖股票的最佳时机||、55.跳跃游戏、45.跳跃游戏||

目录 &#x1f388;LeetCode122.买卖股票的最佳时机|| &#x1f388;LeetCode55.跳跃游戏 &#x1f388;LeetCode45.跳跃游戏|| &#x1f388;LeetCode122.买卖股票的最佳时机|| 链接&#xff1a;122.买卖股票的最佳时机|| 给你一个整数数组 prices &#xff0c;其中 price…

Day 61-62 决策树(ID3)

代码&#xff1a; package dl;import java.io.FileReader; import java.util.Arrays; import weka.core.*;/*** The ID3 decision tree inductive algorithm.*/ public class ID3 {/*** The data.*/Instances dataset;/*** Is this dataset pure (only one label)?*/boolean …

Revit中如何创建水的效果及基坑?

一、Revit中如何创建水的效果? 我们在创建建筑的时候会遇上小池塘啊小池子之类的装饰景观&#xff0c;Revit又不像专业的3D软件那样可以有非常真实的水的效果&#xff0c;那么我们该如何简单创建水呢?下面来看步骤&#xff1a; 1、 在水池位置创建一块楼板&#xff0c;并将该…

C语言——文件操作(超全超详细)

C语言——文件操作 1. 什么是文件 磁盘上的文件是文件 但是在程序设计中&#xff0c;我们一般谈的文件有两种&#xff1a;程序文件、数据文件&#xff08;从文件功能的角度来分类的&#xff09; 1.1 程序文件 包括源程序文件&#xff08;后缀为.c&#xff09;&#xff0c;目…

椒图--分析中心

护网的时候我们要把右边的开关开启。开启就会对系统全量的记录&#xff0c;包含有网络行为日志&#xff0c;就会检测我们服务器里面的链接&#xff0c;端口箭头&#xff0c;内内网暴露的链接&#xff1b;进程操作日志&#xff0c;就可以看我们系统创建了哪些进程&#xff0c;就…

【操作系统】Liunx项目自动化构建工具-make/Makefile

Yan-英杰的主页 悟已往之不谏 知来者之可追 C程序员&#xff0c;2024届电子信息研究生 目录 一、背景 二、Makefile 实现 Makefile依赖 依赖关系 makefile的工作原理 项目清理 补充&#xff1a; .PHONY是什么&#xff1f; Linux如何进行多行注释&#xff1a; 说明&#xf…

app 元素定位失败了,怎么办?一看我的做法,惊呆了!

粉丝们在日常的android app自动化测试工作当中&#xff0c;元素定位时会遇到以下类似的报错&#xff1a; 然后来问博主&#xff0c;这是啥情况&#xff1f; 我一般都会送上亲切的关怀&#xff1a; 1&#xff09;adb能识别到设备吗&#xff1f; 2&#xff09;设备有被其它的程…

VUE+element Input框 实现输入内容可自适应文本高度,换行(空格换行,enter发送)阻止文本域的回车事件

需求 输入框实现输入内容自适应高度 以及可以换行 使用官方文档提供的属性 代码 <el-input clearable autosize type"textarea" :placeholder"$t(navbar.pleaseInput)"v-model"inputText" change"inputChange" keyup.enter.na…

数据分析之Matplotlib

文章目录 1. 认识数据可视化和Matplotlib安装2. 类型目录3. 图标名称title4. 处理图形中的中文问题5. 设置坐标轴名称&#xff0c;字体大小&#xff0c;线条粗细6. 绘制多个线条和zorder叠加顺序7. 设置x轴刻度xticks和y轴刻度yticks8. 显示图表show9. 设置图例legend10. 显示线…

Ceph简介及部署

Ceph Ceph一、存储基础1、单机存储设备2、Ceph 简介3、Ceph 优势5、Ceph 架构6、Ceph 核心组件7、OSD 存储后端8、Ceph 数据的存储过程9、Ceph 版本发行生命周期10、Ceph 集群部署 二、部署ceph-deploy Ceph 集群前环境配置1、关闭 selinux 与防火墙2、根据规划设置主机名3、配…

全网火爆,pytest自动化测试框架从0-1精通实战,你的进阶之路...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、运行方式 命令…

(68Ga)Nota-prgd2,1345968-02-6,通过镓标记的NOTA衍生物

文章编辑来自于&#xff1a;陕西新研博美生物科技有限公司MISS.wu​ (68Ga)Nota-prgd2| CAS&#xff1a;1345968-02-6| 纯度&#xff1a;95% PART1-----(68Ga)Nota-prgd2结构式 PART2-----试剂参数信息 CAS&#xff1a;1345968-02-6 外观&#xff08;Appearance&#xff09;…

网络安全(黑客)学习笔记

0基础学网安或者提升巩固网安技术的小伙伴有福了&#xff01; 本篇整合了网络安全全知识点&#xff0c;零基础也适用&#xff01; 本篇涵盖内容及其全面&#xff0c;强烈推荐收藏&#xff01; 一、学习网络安全会遇到什么问题呢&#xff1f; 1、学习基础内容多时间长 2、难…

周报(1)

文章预览&#xff1a; 本周内容&#xff1a;Python语言的学习和pytorch安装配置1 Python基础知识1.1 交互式解释器1.2 数和表达式1.3 变量1.4 获取用户输入1.5 函数1.6 模块1.7 字符串1.7.1 单引号字符串以及对引号转义1.7.2 拼接字符串1.7.3 字符串表示str 和repr Pytorch 的安…

vue 限制表情输入

在main.js中加入下列代码 import emoji from ./util/emojiVue.directive(emoji,emoji) 在util文件夹中加入emoji.js 下列代码 const findEle (parent, type) > { return parent.tagName.toLowerCase() type ? parent : parent.querySelector(type)}const emoji {bi…

vite vue3进行多环境打包配置

需求&#xff1a; 对此vite创建的vu3项目进行构建&#xff0c;项目分为四个环境&#xff1a;本地、测试、预发、生产 1.在项目根目录创建四个文件夹 .env.development .env.test .env.pre .env.production 2.配置不同环境的 地址和打包文件名 具体例子如下&#xff1a; …

手机验证码登录 -- 手把手教你做ssm+springboot入门后端项目黑马程序员瑞吉外卖(七)

文章目录 前言一、短信发送1. 短信服务介绍2. 阿里云短信服务3. 代码开发 二、手机验证码登录1. 需求分析2. 数据模型3. 代码开发4. 功能测试 总结 前言 为了巩固所学的知识&#xff0c;作者尝试着开始发布一些学习笔记类的博客&#xff0c;方便日后回顾。当然&#xff0c;如果…

项目名称:网络聊天室

目录 一&#xff0c;简述 二&#xff0c;项目要求 三&#xff0c;程序流程图 服务器端&#xff1a; 客户端&#xff1a; 四&#xff0c;相关知识点 通信流程&#xff1a; 函数接口&#xff1a; 五&#xff0c;代码实现 客户端&#xff1a; 服务器&#xff1a; 主程序…