Spring AMQP 随笔 8 Retry MessageRecoverer ErrorHandler

news2024/7/6 19:50:11

0. 列位,响应式布局好麻烦的 …

有意思的,chrome devtool 在调试响应式的分辨率的时候,比如说在 宽度远远大于 768 的时候,按说浏览器也知道大概率是 web端方式打开,样式也是如此渲染,但一些事件(没有鼠标悬停,还是维持触控的感觉)却还是移动端上面的。


前面看过了 spring amqp 的官方文档,但是它的文档不是按一个个 完整的流程 写的,还是老一套 按目录(或者叫 纲目?)拆解 出来的。这就导致,我了解了 RetryTemplate, MessageRecoverer, ErrorHandler 的作用,但是很难想象他们一起工作的样子。

所以,临时起意,走一遍源码,好久没有做这个事儿了。


本文不会按照这几个组件 加载,运行的顺序展开。主打一个 debug 的顺序打开

1. 翻开源码前

debug源码,首先需要写一个极简的demo,我这边的做法是:

  • 配置: 关闭 拒绝时重排(这个跟本文无关,只是补充一下)
  • 配置:开启 消费端 重试
  • 代码:在消费端抛出一个异常,诱发重试
  • 现象:重试耗尽后,消息 转投 死信队列

1.1 最简单的,先见名知意

建议看一下 接口定义中 文档注释,比较详细

本文默认读者有 RetryTemplate 的经验

  • MessageRecoverer 消息的恢复器(… 这个确实很抽象,作用的位置也深,实现非常简单)
    如果猜不出来它的作用,建议看下他的几个实现类,数量很少,代码就几行
  • ErrorHandler 错误处理器

1.2 容易纠结的点

因为,大概看了一下 运行时堆栈,基本跟 MessageRecoverer 不沾边。所以,比较纠结—— Retry 拦截器链 跟 ErrorHandler 的关系,到底是 一先一后的顺序,还是 内外嵌套的结构

解开这个,只需要 轻点debug,看一下运行时堆栈信息即可

// user method annotated by @RabbitListener
onSubscribeTestQueue:93, ExampleQueueClientConfiguration (org.pajamas.example.starter.integration.module.client)

invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:169, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)
invoke:119, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)
invoke:77, HandlerAdapter (org.springframework.amqp.rabbit.listener.adapter)
invokeHandler:263, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
invokeHandlerAndProcessResult:209, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#doInvokeListener(org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener, com.rabbitmq.client.Channel, java.lang.Object)
onMessage:148, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
doInvokeListener:1670, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
actualInvokeListener:1589, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
invokeListener:-1, 1153028279 (org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$1982)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
invokeJoinpointUsingReflection:344, AopUtils (org.springframework.aop.support)
invokeJoinpoint:198, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:163, ReflectiveMethodInvocation (org.springframework.aop.framework)
doWithRetry:97, RetryOperationsInterceptor$1 (org.springframework.retry.interceptor)
doExecute:329, RetryTemplate (org.springframework.retry.support)
execute:225, RetryTemplate (org.springframework.retry.support)
invoke:122, RetryOperationsInterceptor (org.springframework.retry.interceptor)
proceed:186, ReflectiveMethodInvocation (org.springframework.aop.framework)

// aop拦截器链:可以看到只有 Retry 一个advice (上面这一坨都是)
invoke:215, JdkDynamicAopProxy (org.springframework.aop.framework)
invokeListener:-1, $Proxy256 (org.springframework.amqp.rabbit.listener)

// 这里是 aop 跟 amqp.listenerContainer 之间的分界线
// 在这里停留,看一下 我们代码抛出给 spring.retry 的异常 到这里 怎么是往下走的?
// step into ...
invokeListener:1577, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
doExecuteListener:1568, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
executeListener:1512, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
doReceiveAndExecute:994, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
receiveAndExecute:941, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
access$1600:85, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
mainLoop:1319, SimpleMessageListenerContainer$AsyncMessageProcessingConsumer (org.springframework.amqp.rabbit.listener)
run:1225, SimpleMessageListenerContainer$AsyncMessageProcessingConsumer (org.springframework.amqp.rabbit.listener)
run:748, Thread (java.lang)

Note: 因为只有一个advice, 即 Retry 的拦截器,所以,Spring.Retry 与 Spring.Amqp.ErrorHandler 必然是 一先一后 的顺序结构

2. 源码,启动!

// org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#executeListener

/**
 * Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
 * @param channel the Rabbit Channel to operate on
 * @param data the received Rabbit Message
 * @see #invokeListener
 * @see #handleListenerException
 */
protected void executeListener(Channel channel, Object data) {
	if (!isRunning()) {
		if (logger.isWarnEnabled()) {
			logger.warn(
					"Rejecting received message(s) because the listener container has been stopped: " + data);
		}
		throw new MessageRejectedWhileStoppingException();
	}
	Object sample = null;
	if (this.micrometerHolder != null) {
		sample = this.micrometerHolder.start();
	}
	try {
		// aop >>> user code 
		doExecuteListener(channel, data);
		if (sample != null) {
			this.micrometerHolder.success(sample, data instanceof Message
					? ((Message) data).getMessageProperties().getConsumerQueue()
					: queuesAsListString());
		}
	}
	// ListenerExecutionFailedException(shorten as 'LEFE') extends AmqpException ( extends RuntimeException )
	// 	cause here is AmqpRejectAndDontRequeueException(shorten as 'ARADRE')
	catch (RuntimeException ex) {
		if (sample != null) {
			this.micrometerHolder.failure(sample, data instanceof Message
					? ((Message) data).getMessageProperties().getConsumerQueue()
					: queuesAsListString(), ex.getClass().getSimpleName());
		}
		Message message;
		if (data instanceof Message) {
			message = (Message) data;
		}
		else {
			message = ((List<Message>) data).get(0);
		}
		// if message.properties.finalRetryForMessageWithNoId(default false) then rethrow
		checkStatefulRetry(ex, message);
		// ConditionalRejectingErrorHandler.handleError(t)
		// 	if is-not-ARADRE && exceptionStrategy.isfatal(t)
		// 	 	if 
		//			this.errorHandler.discardFatalsWithXDeath(default true) 
		//			&& is-a-LEFE
		//			&& t has fail message
		//	  			throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present")
		//	 	else
		//			throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal");
		//	else 
		//		do nothing ...
		handleListenerException(ex);
		// step to 
		throw ex;
	}
}

// 最后这一个异常,直接跳转到 catch 的地方,已经到 main looooooop了
// org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
// catch ARADRE but do nothing ... (bacuse of it wrapped in LEFE had been handled)
// return main looooooop
  • 这里很多注释,是我自己写的,如果没有看过官方文档,也许会对一些名词(比如那些个 异常类型)有些陌生

3. 考虑到读者 也许没有心情 看官方文档

大概 解释一下 为什么要有这几个 异常类型

官网不舍得放图,么事,我不惜笔墨的

在这里插入图片描述

在这里插入图片描述

这样看,有没有体会到:官方文档没有把这俩放在同一个流程中

4. 伪代码,启动!

现在,这个顺序结构已经有了

try {
	// aop(retry) -> invoke listener mothod
} catch {
	// listenContainer.ErrorHandler ...
}

5. 那么好 …

当然,源码继续启动,看一下几个关键的时刻

5.1 Retry max-attempts exhausted

代码位置,可以在控制台的异常日志找到,或者 跟踪用户抛出的异常,一层层往外抛出时候,总会被这里捕获一次的

// org.springframework.retry.support.RetryTemplate#handleRetryExhausted
	// invoke spring.retry.recover callback
	// org.springframework.retry.interceptor.RetryOperationsInterceptor.ItemRecovererCallback#recover(retryContext)
		// org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean#recover(args[], retryContext.latest-throwable)
				// throw new LEFE(this.messageSupplier.get(), new ARADRE(latest-throwable), message);
/* 
	args: [
		logger,
		messageRecoverer, (default RejectAndDontRequeueRecoverer)
		retryTemplate
	]
*/
  • MessageRecoverer 做了 new ARADRE(user ex)
  • spring.amqp 的 一个 retry recover-callback 做了 new LEFE(new ARADRE(user ex))

现在对 MessageRecoverer 的理解不难了,因为这个案例中,我们抛出的异常 不在其配置的strategy 里面(不会因为这种失败,而触发重排), 这就是 ARADRE 的语义

5.1.1 Locate MessageRecoverer

debug可以看到 这个 MessageRecoverer 不是显式声明出来(step into 被直接跳过了),我们看不到他的声明位置,说明是一个lambda 表达式

可以通过所在类 StatelessRetryOperationsInterceptorFactoryBean,找进去

	protected Object recover(Object[] args, Throwable cause) {
		// this.messageRecoverer
		MessageRecoverer messageRecoverer = getMessageRecoverer();
		Object arg = args[1];
		if (messageRecoverer == null) {
			this.logger.warn("Message(s) dropped on recovery: " + arg, cause);
		}
		else if (arg instanceof Message) {
			messageRecoverer.recover((Message) arg, cause);
		}
		else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) {
			((MessageBatchRecoverer) messageRecoverer).recover((List<Message>) arg, cause);
		}
		return null;
	}

FactoryBean 是在 Spring.amqp configuration 加载配置的时候用的,这是泛泛的说法,不难理解

5.1.2 回过头看 configuration 加载配置的地方

// org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactory
// org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer#configure
// org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer#configure(T, org.springframework.amqp.rabbit.connection.ConnectionFactory, org.springframework.boot.autoconfigure.amqp.RabbitProperties.AmqpContainer)

protected void configure(T factory, ConnectionFactory connectionFactory,
		RabbitProperties.AmqpContainer configuration) {
	Assert.notNull(factory, "Factory must not be null");
	Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
	Assert.notNull(configuration, "Configuration must not be null");
	factory.setConnectionFactory(connectionFactory);
	if (this.messageConverter != null) {
		factory.setMessageConverter(this.messageConverter);
	}
	factory.setAutoStartup(configuration.isAutoStartup());
	if (configuration.getAcknowledgeMode() != null) {
		factory.setAcknowledgeMode(configuration.getAcknowledgeMode());
	}
	if (configuration.getPrefetch() != null) {
		factory.setPrefetchCount(configuration.getPrefetch());
	}
	if (configuration.getDefaultRequeueRejected() != null) {
		factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected());
	}
	if (configuration.getIdleEventInterval() != null) {
		factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());
	}
	factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
	factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
	ListenerRetry retryConfig = configuration.getRetry();
	if (retryConfig.isEnabled()) {
		RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
				: RetryInterceptorBuilder.stateful();
		RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
				.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
		builder.retryOperations(retryTemplate);
		// set message recoverer
		MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
				: new RejectAndDontRequeueRecoverer();
		builder.recoverer(recoverer);
		// step into ...
		// 这里构造 args[]
		factory.setAdviceChain(builder.build());
	}
}


// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder#build
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder.StatelessRetryInterceptorBuilder#build
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder#applyCommonSettings

6. 总结一下

就这样,吃饭!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1693776.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

题解:P9535 [YsOI2023] 连通图计数

题意 求&#xff1a;在所有 n n n 个点 m m m 条边的无向简单连通图中&#xff0c;满足把第 i i i 个点删去后图被分为 a i a_i ai​​ 个连通块。 n − 1 ≤ m ≤ n 1 n-1\le m\le n1 n−1≤m≤n1。 思路 将 m n − 1 , m n , m n 1 mn-1,mn,mn1 mn−1,mn,mn1​ 三…

二叉树——堆详解

目录 前言&#xff1a; 一、堆的结构 二、向上调整和向下调整 2.1 向上调整 2.2 向下调整 2.3 向上调整和向下调整时间复杂度比较 三、堆的实现 3.1 堆的初始化 3.2 堆的销毁 3.3 堆的插入 3.4堆的删除 3.5 取堆顶元素 3.6 对堆判空 四、堆排序 五、TOP-K 问题 六、代码总…

电商公司需不需要建数字档案室呢

建立数字档案室对于电商公司来说是非常有必要的。以下是一些原因&#xff1a; 1. 空间节约&#xff1a;数字档案室可以将纸质文件转化为电子文件&#xff0c;节省了大量存储空间。这对于电商公司来说尤为重要&#xff0c;因为他们通常会有大量的订单、客户信息和供应商合同等文…

Python01:初入Python(Mac)

Python环境准备 下载Python&#xff1a;官网https://www.python.org/ 下载PyCharm&#xff1a;官网https://www.jetbrains.com/pycharm/download Python与PyCharm的关系 Python&#xff08;解释器&#xff09;&#xff1a;机器语言—>翻译人员–>翻译成电脑能读懂的 PyC…

DatePicker日期选择框(antd-design组件库)简单使用

1.DatePicker日期选择框 输入或选择日期的控件。 2.何时使用 当用户需要输入一个日期&#xff0c;可以点击标准输入框&#xff0c;弹出日期面板进行选择。 组件代码来自&#xff1a; 日期选择框 DatePicker - Ant Design 3.本地验证前的准备 参考文章【react项目antd组件-demo:…

2022蓝桥杯大赛软件类国赛Java大学B组 左移右移 空间换时间+双指针

import java.util.Scanner;public class Main {static Scanner scnew Scanner(System.in);public static void main(String[] args) {int nsc.nextInt();//数组长度int tsc.nextInt();//操作次数int arr[]new int[n];char arr1[] new char[t];int arr2[] new int[t];int vis…

金融信贷风控系统设计模式应用之模版方法

背景介绍 风控系统每种场景 如个人消费贷 都需要跑很多规则 规则1 申请人姓名身份证号实名验证规则2 申请人手机号码实名认证规则3 银行卡预留手机号码实名认证规则4 申请人银行卡预留手机号码在网状态检验规则5 申请人银行借记卡有效性核验规则6 户籍地址与身份证号归属地比…

后量子密码解决方案

什么是后量子密码学 (PQC)&#xff0c;为什么准备工作如此重要? 量子计算正在迅速发展;用不了多久&#xff0c;量子网络攻击就会成为可能。量子网络攻击将能够在几分钟内瘫痪大型网络。我们今天赖以保护我们的连接和交易的一切都将受到量子计算机的威胁&#xff0c;危及所有密…

Django中model中的抽象类

Django中model中的抽象类 当我们在app中models.py文件中定义model表并执行python manage.py makemigrations和python manage.py migrate后&#xff0c;Django就会在数据库中创建表 但是我们也可以对其默认配置修改&#xff0c;定义model类但是不在数据库中创建 from django.…

Behind the Code:Polkadot 如何重塑 Web3 未来

2024 年 5 月 17 日 Polkadot 生态 Behind the Code 第二季第一集 《创造 Web3 的未来》正式上线。第一集深入探讨了 Polkadot 和 Web3 技术在解决数字身份、数据所有权和去中心化治理方面的巨大潜力。 &#x1f50d; 查看完整视频&#xff1a; https://youtu.be/_gP-M5nUidc?…

基于Python对评论进行情感分析可视化

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景与意义 在当今数字化时代&#xff0c;用户生成内容&#xff08;UGC&#xff09;如在线评论、社交媒体…

需求响应+配网重构!含高比例新能源和用户需求响应的配电网重构程序代码!

前言 配电网重构作为配电网优化运行的手段之一&#xff0c;通过改变配电网的拓扑结构&#xff0c;以达到降低网损、改善电压分布、提升系统的可靠性与经济性等目的。近年来&#xff0c;随着全球能源消耗快速增长以及环境的日趋恶化&#xff0c;清洁能源飞速发展&#xff0c;分…

线性回归模型

目录 1.概述 2.线性回归模型的定义 3.线性回归模型的优缺点 4.线性回归模型的应用场景 5.线性回归模型的未来展望 6.小结 1.概述 线性回归是一种广泛应用于统计学和机器学习的技术&#xff0c;用于研究两个或多个变量之间的线性关系。在本文中&#xff0c;我们将深入探讨…

GM Bali,OKLink受邀参与Polygon AggIsland大会

5月16日-17日&#xff0c;OKLink 受到生态合作伙伴 Polygon 的特别邀请&#xff0c;来到巴厘岛参与以 AggIsland 为主题的大会活动并发表演讲&#xff0c;详细介绍 OKLink 为 Polygon 所带来的包括多个浏览器和数据解析等方面的成果&#xff0c;并与 Polygon 一起&#xff0c;对…

【maven与tomcat配置】如何正确配置maven及tomcat环境变量及运行Java项目 (附图文说明及下载包)

maven及tomcat配置详解 &#x1f354;涉及知识&#x1f964;写在前面&#x1f367;一、maven和tomcat是啥&#xff1f;&#x1f367;二、maven环境变量配置2.1获取maven包2.2创建本地仓库及修改配置A&#xff0e;校验是否安装javaB&#xff0e;创建本地maven存放仓库C&#xff…

C++vector的简单模拟实现

文章目录 目录 文章目录 前言 一、vector使用时的注意事项 1.typedef的类型 2.vector不是string 3.vector 4.算法sort 二、vector的实现 1.通过源码进行猜测vector的结构 2.初步vector的构建 2.1 成员变量 2.2成员函数 2.2.1尾插和扩容 2.2.2operator[] 2.2.3 迭代器 2…

OpenHarmony系统使用gdb调试init

前言 OpenAtom OpenHarmony&#xff08;简称“OpenHarmony”&#xff09;适配新的开发板时&#xff0c;启动流程init大概率会出现问题&#xff0c;其为内核直接拉起的第一个用户态进程&#xff0c;问题定位手段只能依赖代码走读和增加调试打印&#xff0c;初始化过程中系统崩溃…

单片机设计注意事项

1.电源线可以30mil走线&#xff0c;信号线可以6mil走线 2.LDO推荐 SGM2019-3.3,RT9013,RT9193,1117-3.3V。 3.单片机VCC要充分滤波后再供电&#xff0c;可以接0.1uf的电容 4.晶振附件不要走其他元件&#xff0c;且放置完单片机后就放置晶振&#xff0c;晶振靠近X1,X2。

【C++】d1

关键字&#xff1a; 运行、前缀、输入输出、换行 运行f10 前缀必须项&#xff1a; #include <iostream> using namespace std; 输入/输出&#xff1a; cin >> 输入 cout << 输出 语句通过>>或<<分开 换行 endl或者"\n"

前端日志收集(monitor-report v1)

为什么 为什么自己封装而不是使用三方 类似 Sentry 这种比较全面的 因为 Sentry 很大我没安装成功&#xff0c;所有才自己去封装的 为什么使用 可以帮助你简单解决前端收集错误日志、收集当前页面访问量&#xff0c;网站日活跃&#xff0c;页面访问次数&#xff0c;用户行…