背景
当前与算法交互均通过rocketMQ异步交互,绝大部分场景一条请求mq消息应对应一条返回mq,但由于各种原因(消息积压、程序bug),可能会导致返回mq超时未返回或者消息丢失。工程侧针对一些重要场景 case by case的通过定时任务不断检查返回mq的状态,并在一定时间后手动触发告警提醒返回mq出现问题。该方案也能够一定程度的解决问题,但存在一些弊端:
- 需要case by case的开发,开发成本高
- 容易遗漏
- 手动触发告警容易导致告警过多,产生疲劳,真正的问题被隐藏在大量的告警中。
因此这里介绍一种通用方案,预期能够解决以上问题。
现有方案
主要问题
- 如何唯一标识请求mq对应的返回mq
- 目前的方案是根据场景的不同,选择不同的业务字段,如resourceId
- 方案一:统一约定公共字段,如bizMsgId,所有与算法交互请求和返回mq均带有该字段
- 优点:能够实现唯一标识的目标
- 缺点:工程、XX、算法均需要改造,成本较高
- 方案二(推荐):通过traceId串联请求和返回mq,即返回mq将请求mq带下去的traceId带回来。
- 优点:
- 有很多现成的框架可以集成,对java工程来说开发成本低
- 集成了traceId之后无论是工程还是算法都可以用来做全链路追踪,方面问题的排查。
- 待确认点
- 算法rocketMq以何种方式向下传递 traceId
- 优点:
- 如何进行状态检查
- 后端维护一张通用的任务表,在发送请求mq和收到算法返回mq时进行拦截,发送时向表中增加一条数据,更新状态为已发送,待返回。
- 收到算法返回mq时更新对应行的状态
- 定时任务,捞取30分钟(待定)前未返回的消息详情,并埋点
- 告警如何触发
- 通过SLS配置定时检查任务,按照告警规则配置进行告警分组(解决同一类型告警重复报多次的问题)、告警分级(解决大量告警淹没重要告警的问题)通知告警处理人
- 一分钟内,<5条 飞书群提醒
-
5条 短信提醒
-
10条 短信、电话提醒
- 通过SLS配置定时检查任务,按照告警规则配置进行告警分组(解决同一类型告警重复报多次的问题)、告警分级(解决大量告警淹没重要告警的问题)通知告警处理人
- 告警升级策略
- 10分钟不认领,短信、电话提醒TL
- 30分钟未处理升级。
- 算法 纳入全链路监控
技术方案
数据模型
其他
- 有些场景是通过dubbo发起请求,通过mq返回结果,需要梳理场景
- 有些场景是发送请求给搜推mq,但返回mq是由算法发送,待梳理
参考
- rocketmq发送、接收拦截器
- 生产者发送拦截
public class TraceProducerInterceptor implements ProducerInterceptor {
private static final String TRACE_ID = "traceId";
private static final String SPAN_ID = "spanId";
@Overridepublic Message<?> beforeConvert(Message<?> message) {
String traceId = TraceContext.getTraceId();
String spanId = TraceContext.getSpanId();
if (traceId != null && spanId != null) {
MessageHeaders headers = message.getHeaders();
headers.put(TRACE_ID, traceId);
headers.put(SPAN_ID, spanId);
if (message instanceof ErrorMessage) {
ErrorMessage errorMessage = (ErrorMessage) message;
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
originalMessage = beforeConvert(originalMessage);
errorMessage = new ErrorMessage(
errorMessage.getPayload(), originalMessage, errorMessage.getHeaders(), errorMessage.getCause());
}
return errorMessage;
}
}
return message;
}
}
- 消费者消费拦截
public class TraceConsumerInterceptor implements ConsumerInterceptor {
private static final String TRACE_ID = "traceId";
private static final String SPAN_ID = "spanId";
@Overridepublic void afterReceive(Message<?> message) {
String traceId = message.getHeaders().get(TRACE_ID, String.class);
String spanId = message.getHeaders().get(SPAN_ID, String.class);
if (traceId != null && spanId != null) {
MDC.put(TRACE_ID, traceId);
MDC.put(SPAN_ID, spanId);
}
}
}
- 配置拦截器
rocketmq:
producer:
interceptor:
- com.example.TraceProducerInterceptor
consumer:
interceptor:
- com.example.TraceConsumerInterceptor
-
rocketmq集成opentelemetry trace