0. 列位,响应式布局好麻烦的 …
有意思的,chrome devtool 在调试响应式的分辨率的时候,比如说在 宽度远远大于 768 的时候,按说浏览器也知道大概率是 web端方式打开,样式也是如此渲染,但一些事件(没有鼠标悬停,还是维持触控的感觉)却还是移动端上面的。
前面看过了 spring amqp 的官方文档,但是它的文档不是按一个个 完整的流程 写的,还是老一套 按目录(或者叫 纲目?)拆解 出来的。这就导致,我了解了 RetryTemplate
, MessageRecoverer
, ErrorHandler
的作用,但是很难想象他们一起工作的样子。
所以,临时起意,走一遍源码,好久没有做这个事儿了。
本文不会按照这几个组件 加载,运行的顺序展开。主打一个 debug 的顺序打开
1. 翻开源码前
debug源码,首先需要写一个极简的demo,我这边的做法是:
- 配置: 关闭 拒绝时重排(这个跟本文无关,只是补充一下)
- 配置:开启 消费端 重试
- 代码:在消费端抛出一个异常,诱发重试
- 现象:重试耗尽后,消息 转投 死信队列
1.1 最简单的,先见名知意
建议看一下 接口定义中 文档注释,比较详细
本文默认读者有 RetryTemplate 的经验
MessageRecoverer
消息的恢复器(… 这个确实很抽象,作用的位置也深,实现非常简单)
如果猜不出来它的作用,建议看下他的几个实现类,数量很少,代码就几行ErrorHandler
错误处理器
1.2 容易纠结的点
因为,大概看了一下 运行时堆栈,基本跟 MessageRecoverer
不沾边。所以,比较纠结—— Retry
拦截器链 跟 ErrorHandler
的关系,到底是 一先一后的顺序,还是 内外嵌套的结构 ?
解开这个,只需要 轻点debug,看一下运行时堆栈信息即可
// user method annotated by @RabbitListener
onSubscribeTestQueue:93, ExampleQueueClientConfiguration (org.pajamas.example.starter.integration.module.client)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:169, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)
invoke:119, InvocableHandlerMethod (org.springframework.messaging.handler.invocation)
invoke:77, HandlerAdapter (org.springframework.amqp.rabbit.listener.adapter)
invokeHandler:263, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
invokeHandlerAndProcessResult:209, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#doInvokeListener(org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener, com.rabbitmq.client.Channel, java.lang.Object)
onMessage:148, MessagingMessageListenerAdapter (org.springframework.amqp.rabbit.listener.adapter)
doInvokeListener:1670, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
actualInvokeListener:1589, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
invokeListener:-1, 1153028279 (org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$1982)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
invokeJoinpointUsingReflection:344, AopUtils (org.springframework.aop.support)
invokeJoinpoint:198, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:163, ReflectiveMethodInvocation (org.springframework.aop.framework)
doWithRetry:97, RetryOperationsInterceptor$1 (org.springframework.retry.interceptor)
doExecute:329, RetryTemplate (org.springframework.retry.support)
execute:225, RetryTemplate (org.springframework.retry.support)
invoke:122, RetryOperationsInterceptor (org.springframework.retry.interceptor)
proceed:186, ReflectiveMethodInvocation (org.springframework.aop.framework)
// aop拦截器链:可以看到只有 Retry 一个advice (上面这一坨都是)
invoke:215, JdkDynamicAopProxy (org.springframework.aop.framework)
invokeListener:-1, $Proxy256 (org.springframework.amqp.rabbit.listener)
// 这里是 aop 跟 amqp.listenerContainer 之间的分界线
// 在这里停留,看一下 我们代码抛出给 spring.retry 的异常 到这里 怎么是往下走的?
// step into ...
invokeListener:1577, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
doExecuteListener:1568, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
executeListener:1512, AbstractMessageListenerContainer (org.springframework.amqp.rabbit.listener)
doReceiveAndExecute:994, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
receiveAndExecute:941, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
access$1600:85, SimpleMessageListenerContainer (org.springframework.amqp.rabbit.listener)
mainLoop:1319, SimpleMessageListenerContainer$AsyncMessageProcessingConsumer (org.springframework.amqp.rabbit.listener)
run:1225, SimpleMessageListenerContainer$AsyncMessageProcessingConsumer (org.springframework.amqp.rabbit.listener)
run:748, Thread (java.lang)
Note: 因为只有一个advice, 即 Retry 的拦截器,所以,Spring.Retry 与 Spring.Amqp.ErrorHandler 必然是 一先一后 的顺序结构
2. 源码,启动!
// org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#executeListener
/**
* Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
* @param channel the Rabbit Channel to operate on
* @param data the received Rabbit Message
* @see #invokeListener
* @see #handleListenerException
*/
protected void executeListener(Channel channel, Object data) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn(
"Rejecting received message(s) because the listener container has been stopped: " + data);
}
throw new MessageRejectedWhileStoppingException();
}
Object sample = null;
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
try {
// aop >>> user code
doExecuteListener(channel, data);
if (sample != null) {
this.micrometerHolder.success(sample, data instanceof Message
? ((Message) data).getMessageProperties().getConsumerQueue()
: queuesAsListString());
}
}
// ListenerExecutionFailedException(shorten as 'LEFE') extends AmqpException ( extends RuntimeException )
// cause here is AmqpRejectAndDontRequeueException(shorten as 'ARADRE')
catch (RuntimeException ex) {
if (sample != null) {
this.micrometerHolder.failure(sample, data instanceof Message
? ((Message) data).getMessageProperties().getConsumerQueue()
: queuesAsListString(), ex.getClass().getSimpleName());
}
Message message;
if (data instanceof Message) {
message = (Message) data;
}
else {
message = ((List<Message>) data).get(0);
}
// if message.properties.finalRetryForMessageWithNoId(default false) then rethrow
checkStatefulRetry(ex, message);
// ConditionalRejectingErrorHandler.handleError(t)
// if is-not-ARADRE && exceptionStrategy.isfatal(t)
// if
// this.errorHandler.discardFatalsWithXDeath(default true)
// && is-a-LEFE
// && t has fail message
// throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present")
// else
// throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal");
// else
// do nothing ...
handleListenerException(ex);
// step to
throw ex;
}
}
// 最后这一个异常,直接跳转到 catch 的地方,已经到 main looooooop了
// org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
// catch ARADRE but do nothing ... (bacuse of it wrapped in LEFE had been handled)
// return main looooooop
- 这里很多注释,是我自己写的,如果没有看过官方文档,也许会对一些名词(比如那些个 异常类型)有些陌生
3. 考虑到读者 也许没有心情 看官方文档
大概 解释一下 为什么要有这几个 异常类型
官网不舍得放图,么事,我不惜笔墨的
这样看,有没有体会到:官方文档没有把这俩放在同一个流程中
4. 伪代码,启动!
现在,这个顺序结构已经有了
try {
// aop(retry) -> invoke listener mothod
} catch {
// listenContainer.ErrorHandler ...
}
5. 那么好 …
当然,源码继续启动,看一下几个关键的时刻
5.1 Retry max-attempts exhausted
代码位置,可以在控制台的异常日志找到,或者 跟踪用户抛出的异常,一层层往外抛出时候,总会被这里捕获一次的
// org.springframework.retry.support.RetryTemplate#handleRetryExhausted
// invoke spring.retry.recover callback
// org.springframework.retry.interceptor.RetryOperationsInterceptor.ItemRecovererCallback#recover(retryContext)
// org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean#recover(args[], retryContext.latest-throwable)
// throw new LEFE(this.messageSupplier.get(), new ARADRE(latest-throwable), message);
/*
args: [
logger,
messageRecoverer, (default RejectAndDontRequeueRecoverer)
retryTemplate
]
*/
MessageRecoverer
做了new ARADRE(user ex)
spring.amqp
的 一个 retry recover-callback 做了new LEFE(new ARADRE(user ex))
现在对 MessageRecoverer
的理解不难了,因为这个案例中,我们抛出的异常 不在其配置的strategy 里面(不会因为这种失败,而触发重排), 这就是 ARADRE
的语义
5.1.1 Locate MessageRecoverer
debug可以看到 这个 MessageRecoverer
不是显式声明出来(step into 被直接跳过了),我们看不到他的声明位置,说明是一个lambda 表达式
可以通过所在类 StatelessRetryOperationsInterceptorFactoryBean
,找进去
protected Object recover(Object[] args, Throwable cause) {
// this.messageRecoverer
MessageRecoverer messageRecoverer = getMessageRecoverer();
Object arg = args[1];
if (messageRecoverer == null) {
this.logger.warn("Message(s) dropped on recovery: " + arg, cause);
}
else if (arg instanceof Message) {
messageRecoverer.recover((Message) arg, cause);
}
else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) {
((MessageBatchRecoverer) messageRecoverer).recover((List<Message>) arg, cause);
}
return null;
}
FactoryBean 是在 Spring.amqp configuration 加载配置的时候用的,这是泛泛的说法,不难理解
5.1.2 回过头看 configuration 加载配置的地方
// org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactory
// org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer#configure
// org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer#configure(T, org.springframework.amqp.rabbit.connection.ConnectionFactory, org.springframework.boot.autoconfigure.amqp.RabbitProperties.AmqpContainer)
protected void configure(T factory, ConnectionFactory connectionFactory,
RabbitProperties.AmqpContainer configuration) {
Assert.notNull(factory, "Factory must not be null");
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
Assert.notNull(configuration, "Configuration must not be null");
factory.setConnectionFactory(connectionFactory);
if (this.messageConverter != null) {
factory.setMessageConverter(this.messageConverter);
}
factory.setAutoStartup(configuration.isAutoStartup());
if (configuration.getAcknowledgeMode() != null) {
factory.setAcknowledgeMode(configuration.getAcknowledgeMode());
}
if (configuration.getPrefetch() != null) {
factory.setPrefetchCount(configuration.getPrefetch());
}
if (configuration.getDefaultRequeueRejected() != null) {
factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected());
}
if (configuration.getIdleEventInterval() != null) {
factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());
}
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful();
RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
builder.retryOperations(retryTemplate);
// set message recoverer
MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
: new RejectAndDontRequeueRecoverer();
builder.recoverer(recoverer);
// step into ...
// 这里构造 args[]
factory.setAdviceChain(builder.build());
}
}
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder#build
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder.StatelessRetryInterceptorBuilder#build
// org.springframework.amqp.rabbit.config.RetryInterceptorBuilder#applyCommonSettings
6. 总结一下
就这样,吃饭!