SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

news2025/1/23 2:05:48

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

上一篇直通车

SpringBoot整合SpringCloudStream3.1+版本Kafka

实现死信队列步骤

  1. 添加死信队列配置文件,添加对应channel
  2. 通道绑定配置对应的channel位置添加重试配置

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

配置文件

Kafka基本配置(application-mq.yml)

server:
  port: 7105
spring:
  application:
	name: betrice-message-queue
  config:
	import:
	- classpath:application-bindings.yml
  cloud:
	stream:
	  kafka:
		binder:
		  brokers: localhost:9092
		  configuration:
			key-serializer: org.apache.kafka.common.serialization.StringSerializer
			value-serializer: org.apache.kafka.common.serialization.StringSerializer
		  consumer-properties:
			enable.auto.commit: false
	  binders:
		betrice-kafka:
		  type: kafka
		  environment:
			spring.kafka:
		  bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

创建死信队列配置文件(application-dql.yml)

在这里插入图片描述

spring:
  cloud:
	stream:
	  kafka:
		bindings:
		  dqlTransfer-in-0:
			consumer:
			  # When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.
			  # messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].
			  # By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
			  enableDlq: true
			  dlqName: Evad05-message-dlq
			  keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
#              valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
			  valueSerde: com.devilvan.pojo.Evad05MessageSerde
			  autoCommitOnError: true
			  autoCommitOffset: true

注意:这里的valueSerde使用了对象类型,需要搭配application/json使用,consumer接收到消息后会转化为json字符串

通道绑定文件添加配置(application-bindings.yml)

channel对应上方配置文件的dqlTransfer-in-0

在这里插入图片描述

spring:
  cloud:
	stream:
	  betrice-default-binder: betrice-kafka
	  function:
		# 声明两个channel,transfer接收生产者的消息,处理完后给sink
		definition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumer
	  bindings:
		# 添加生产者bindiing,输出到destination对应的topic
		dqlTransfer-in-0:
		  destination: Evad10
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  group: evad05DlqConsumer # 使用死信队列必须要有group
		  content-type: application/json
		  consumer:
			maxAttempts: 2 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
			backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
			backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
			backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。
		dqlTransfer-out-0:
		  destination: Evad10
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain
		# 消费死信队列中的消息
		evad05DlqConsumer-in-0:
		  destination: Evad05-message-dlq
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain

Controller

发送消息并将消息引入死信队列

@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
	@Resource(name = "streamBridgeUtils")
	private StreamBridge streamBridge;

	@PostMapping("streamSend")
	public void streamSend(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}

	@PostMapping("streamSendDql")
	public void streamSendDql(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}

	@PostMapping("streamSendJsonDql")
	public void streamSendJsonDql(String topic) {
		try {
			Evad05MessageSerde message = new Evad05MessageSerde();
			message.setData("evad05 test dql");
			message.setCount(1);
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}
}

Channel

这里使用了transfer通道,消息从Evad10(topic)传来,经过transfer()方法后抛出异常,随后进入对应的死信队列

@Configuration
public class BetriceMqSubChannel {
	@Bean
	public Function<String, String> dqlTransfer() {
		return message -> {
			System.out.println("transfer: " + message);
			throw new RuntimeException("死信队列测试!");
		};
	}

	@Bean
	public Consumer<String> evad05DlqConsumer() {
		return message -> {
			System.out.println("Topic: evad05 Dlq Consumer: " + message);
		};
	}
}

将自定义序列化类型转换为JSON消息

步骤

1. 通道绑定文件(application-bindings.yml)的valueSerde属性添加自定义的序列化

在这里插入图片描述

2. BetriceMqController中封装该自定义类型的对象,并作为消息发送

@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
	try {
		Evad05MessageSerde message = new Evad05MessageSerde();
		message.setData("evad05 test dql");
		message.setCount(1);
		streamBridge.send(topic, message);
		log.info("发送消息:" + message);
	} catch (Exception e) {
		log.error("异常消息:" + e);
	}
}

3. channel(BetriceMqSubChannel)接收到该消息并反序列化

@Bean
public Consumer<String> evad05DlqConsumer() {
	return message -> {
		System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));
	};
}

4. 结果

在这里插入图片描述
在这里插入图片描述

参考网址

Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com)
spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/772962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python机器学习、数据统计分析在医疗中的应用

Python机器学习在医疗诊断领域的应用 随着人工智能技术的不断发展&#xff0c;机器学习已经在医疗领域的诊断治疗、预防等方面展现出强大的潜力。Python 作为一种广泛应用于机器学习的语言&#xff0c;在医疗领域也已经被广泛使用。本文将探讨 Python 机器学习在医疗领域的应用…

mysql 第五章

目录 1.order by 排序 2.区间判断 3.group by 分组 4.limit 5.别名 6.通配符 like 7.总结 1.order by 排序 2.区间判断 3.group by 分组 4.limit 5.别名 6.通配符 like 7.总结 对 mysql 数据库的查询&#xff0c;除了基本的查询外&#xff0c;有时候需要对查…

nginx官网与下载

官网 nginx: download 下载 解压 conf配置文件

[高通平台][WLAN] IEEE802.11mc 介绍

IEEE802.11mcWi-Fi协议(即Wi-FiRound-Trip-Time,RTT),利用此项技术及可以进行室内定位,因此为了使用此项技术,只有在硬件支持的设备上,应用才可以使用最新的RTT API以测量附近具有RTT功能的Wi-FiAP。 单面RTT :  距离是通过发送的分组和接收到的ACK之间的时间差来计算的…

SPSS中级统计--S05-5多个样本率的卡方检验及两两比较

小伙伴们&#xff0c;今天我们学习SPSS中级统计--多个样本率的卡方检验及两两比较。 例1、2 C列联表资料 上期我们学习了双向无序RC表资料&#xff08;c2&#xff09;的检验&#xff0c;案例如下&#xff0c;比较不同污染地区的动物畸形率是否有差异&#xff1f; H0&#xff…

哈医大一院电力监控系统 安科瑞 许敏

摘要&#xff1a;本文介绍基于Acrel-3000电力监控软件和电力监控仪表&#xff0c;设计并实现了一套分散式采集和集中控制管理的自动化报警系统。系统实现远程精细化及时性报警&#xff0c;避免因停电造成医疗事故&#xff0c;提高了供电质量和管理水平&#xff0c;具有简明实用…

怎么把高版本CAD转换成低版本?CAD版本转换方法分享

某些情况下&#xff0c;较新的CAD软件版本可能不被较旧的CAD软件版本所支持。如果你需要与使用较旧版本CAD的人进行交流、共享或协作&#xff0c;将高版本CAD转换为低版本可以确保文件能够顺利打开和编辑。那么问题来了&#xff0c;怎么将高版本CAD转换成低版本呢&#xff1f;教…

29,stack容器

29.1stack基本概念 概念&#xff1a;stack是一种先进后出(First In Last Out,FILO)的数据结构&#xff0c;它只有一个出口 栈容器 符合先进后出 栈中只有顶端的元素才可以被外界使用&#xff0c;因此栈不允许有遍历行为 栈可以判断容器为空与否(empty) 栈可以返回元素个数(…

【SQL】计算每个人的完成率

目录 前提任务的完成率前三名拓展&#xff1a;达梦如何去实现除法有余数拓展&#xff1a;MySQL 任务的完成率前三名 前提 达梦数据库&#xff1a; select 1/3; # 0不要求四舍五入 任务的完成率前三名 # nick_name 人名 # finishNum 当前这个人的任务完成数 # total 当前这…

Go语言之并发编程练习,GO协程初识,互斥锁,管道:channel的读写操作,生产者消费者

GO协程初识 package mainimport ("fmt""sync""time" )func read() {defer wg.Done()fmt.Println("read start")time.Sleep(time.Second * 3)fmt.Println("read end") }func listenMusci() {defer wg.Done()fmt.Println(&qu…

【lesson2】Linux基本指令1

文章目录 touch创建文件更新文件最新修改时间 lslsls -lls -als -i pwd...cdcd 路径法一&#xff1a;cd 绝对路径法二&#xff1a;cd 相对路径 cd - stattreemkdirmkdir创建一个目录mkdir -p创建一串路径目录 ~/rmdirrmrmrm -frm -rrm -i mancpcpcp -r mvnaocatcatcat -n ta…

错过后悔!!!Java岗秋招最全面试攻略!!

这里给大家整理了完整的Java完整的架构面试核心知识体系。按照这样的一个脉络&#xff0c;只要大家能够掌握这里面的绝大部分内容&#xff0c;并且有过相应的一些实践&#xff0c;那么就可以去面试自己心仪的工作了。 这份笔记总结都是作者近几年结合众多牛客的面经分享&#…

【完整版】zabbix企业级监控(概念、简单操作、页面优化、监控主机自己、监控linux、监控Win10)

第三阶段基础 时 间&#xff1a;2023年7月19日 参加人&#xff1a;全班人员 内 容&#xff1a; zabbix企业级监控 目录 一、Zabbix概述 &#xff08;一&#xff09;Zabbix简介 &#xff08;二&#xff09;Zabbix运行条件&#xff1a; &#xff08;三&#xff09;Zab…

【面试笔试避坑指南】一

从这篇文章开始 进行笔试的训练环节&#xff0c;我会在 本专栏详细介绍笔试的易错点&#xff0c;帮助大家精准避坑。 1.有如下一段代码&#xff08;unit16_t为2字节无符号整数&#xff0c;unit8_t位1字节无符号整数&#xff09;&#xff1b; 请问x.z.n在大字节序和小字节序机器…

MySQL第五章、索引事务

目录 一、索引 1.1 概念 1.2 作用 1.3 使用场景 1.4 使用 1.5 案例 二、索引背后的数据结构 2.1 B-树&#xff08;B树&#xff09; 2.2 B树&#xff08;MySQL背后数据结构&#xff09; 三、事务 3.1 为什么使用事务 3.2 事务的概念 3.3 使用 3.4并发执行事务产生…

《Java黑皮书基础篇第10版》 第12章【习题】

Java语言程序设计 习题第十二章 12.2章节习题 12.1 使用异常处理的优势是什么? 如果没有异常处理&#xff0c;方法执行(called method)出错时&#xff0c;调用者(caller)没有办法进行处理&#xff08;比如方法所在的类被封装&#xff0c;调用者无法访问&#xff09;&#xf…

ubuntu版本Linux操作系统上安装键盘中文输入法

要在ubuntu版本Linux操作系统上安装键盘中文输入法 可以按照以下步骤进行操作&#xff1a; 1、Linux终端输入&#xff1a;sudo apt-get install ibus-pinyin 这将安装一个常用的中文输入法 “ibus-pinyin”。 2、重新启动系统&#xff1a;为了使输入法生效&#xff0c;需要…

喜报!沃通CA中标河南城建学院SSL证书服务项目

沃通CA再添中标喜报&#xff01;沃通CA中标河南城建学院SSL证书服务项目&#xff0c;为河南城建学院官网及各类信息系统提供HTTPS加密及网站可信身份认证&#xff0c;保护数据传输安全、保障通信主体身份可信。 为加快教育现代化、推进新时代教育信息化发展&#xff0c;我国发布…

springboot 内嵌H2

pom 文件配置 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/></parent><dependencies><dependency>&…

List迭代器是如何实现的

我们知道当我们使用vector的迭代器时,它的操作可以让它指向下一个位置,解引用操作就可以找到这个位置的值,因为vector底层时用的一个顺序表,可以支持随机访问。对比list来说vector底层的迭代器是十分的简便可观的。虽然我们使用list的迭代器外观上和vector是大同小异的&#xf…