优质博文:IT-BLOG0CN
一、挑战/注意事项
【1】生产者发送消息后,QMQ
国内外服务器进行了双向同步,如果消费者国内外都部署了系统,就会导致消息重复消费的问题:
目前封装了共同的SDK
重构消费者的代码,生产者发送的消息中必须带有能够找到UDL
的属性,根据UDL
和配置的UCS
策略,在消费端对消息进行过滤,从而解决重复消费的问题;
二、QMQ消息互通
默认情况下,AWS
和私有云的消息是隔离的。如果业务需要可以申请AWS
和私有云的消息进行互通,这样公有云的消息可以在私有云消费,同时私有云的消息可以在公有云消费,消息同步存在延迟(300-500ms)或者更长。
互通的缺陷:同步生产者发送的消息,会产生两个问题:
【1】AWS
和私有云上有同名的ConsumerGroup
,但每条消息都会在两边各消费一次;
【2】私有云上已经存在的ConsumerGroup
,在公有云第一次部署这个消费组时,仍然会遇到消息堆积的问题,需要重置offset
;
跨Region
场景: 需要跨Region
的MQ
通过BaseSubject
同步到中心机房Region
,来保证正常业务流程。
消息处理的改造流程图如下图所示:
三、数据互通后过滤/路由方案
为了实现海内外数据隔离,需要对QMQ
进行过滤,国内机房只消费境内QMQ
,海外机房只消费境外QMQ
【1】消费者根据UID
/是否英网订单过滤:
■ QMQ
业务逻辑中涉及到用户行为和订单信息时,可以封装组件,QMQ
生产时强制带上UID
字段,并在QMQ
消费入口层统一做拦截,根据UID
区分是否为海外数据,实现过滤功能。前提是生产流量完全切换需要依赖UID
清洗全部完成,且公共部门提供接口/规则来判断UID
属于国内或国外。
■ QMQ
中含有订单号信息时,可以在消费入口层查询订单是否属于海外数据,实现统一拦截和过滤。依赖:不需要依赖UID
清洗的进度,但是需要提供机票的订单号与海内外关系的查询接口(订单详情过重性能较差,与降低压力的目标违背),且查询接口需要海内外均部署以降低响应时间。
注:是否应该专门建立一个表存储订单号,
uid
和region/UDL
信息的映射,并按照UDL
通过DRC
同步至海外对应region
的机房?这样查询接口才能最快速的返回结果。
可以使用SDK
中UCSFilteredExecutor
类中的系类方法,集成UCS
策略。QConfig
上添加ucs_strategy_resource_idc.json
配置文件,内容如下:
{
"mqConsumeEnabledTopics": { // 当前应用作为消费者,控制topic粒度可处理的idc;
"bbz.oi.account.drop.deleted.orders": [
"SIN-AWS"
]
},
"ucsStrategies": [
{
"strategyId": 200, // UCS策略id,需要线下申请;strategyId对应的灰度比例:0%:232 1%:213 5%:214 10%:215 20%:216 30%:217 40%:218 50%:219 75%:220 100%:221
"effectiveTime": "2023-03-17 10:00:00", // 是用来控制strategyId生效时间;
"topics": [
"bbz.oi.account.drop.deleted.orders"
],
"names": [
"xxxWorker", // names中资源每个idc都可以执行,需要在它portal上开启;
"yyyName"
]
},
{ // 如果同一个topic/name资源需要灰度切换,需要在ucsStrategies新增
"strategyId": 201,
"effectiveTime": "2023-03-17 15:00:00",
"topics": [
"bbz.oi.account.drop.deleted.orders"
]
}
]
}
QMQ + UCS改造前:
@QmqConsumer(prefix = SUBJECT, consumerGroup = "xxx")
public void onMessage(Message message) throws SOAServiceException {
process(message);
}
QMQ + UCS改造后:
@QmqConsumer(prefix = SUBJECT, consumerGroup = "xxx")
public void onMessage(Message message) throws SOAServiceException {
UCSFilteredExecutor.execute(this::process, message);
}
【2】QMQ
框架根据地域UDL
过滤: UDL - User Data Location
,指用户数据所属的国家与地区,具体为用户注册时使用的国家与地区。该值用户注册后便不会发生变动。目前存在使用UID
查询UDL
接口。
UDL
的来源包括: 1)、用户登录后操作,调用接口时会下发UID
和UDL
信息。2)、后台系统自动发起的操作,需要使用UID
信息查询上述接口/SDK
获得UDL
信息。
QMQ
已支持框架级别的根据UDL
路由功能,即设置了UDL=SG
的消息,只会在AWS-SIN
被消费,不会在SH
被消费了。
限制: 1)、一个
QMQ
消息不能包含多个UID
,尤其是跨UDL
的UID
的数据,此时无法确定需要在哪个数据中心消费。2)、目前来看,初期改造可能仍然需要靠生产者手动使用UID
查询接口后设置UDL
信息,且该接口似乎有白名单,不清楚后续是否会开放给业务团队使用。3)、不支持流量的灰度。
【3】生产者带tag
,消费者根据tag
过滤: 整体思路和方案1基本一致,但是判断逻辑移动到生产端。好处在于生产者可以复用订单数据(是否英网订单),且可以减少消费端实际消费的消息量,节约系统资源。
四、跨域幂等校验
QMQ
是会将国内的QMQ
消息同步到AWS
上,但消息消费情况,是不会进行同步和其他特殊处理的,导致的问题是一条消息会在私有云和AWS
上各消费一次,如果涉及写数据,就会产生数据冲突。
QConfig
可以根据不同的region
读取不同的配置文件,目前是通过配置来进行过滤。比如按照订单号的尾号进行过滤,私有云消费尾号为0-6的订单号,公有云消费尾号7-9的订单号。
问题:做灰度切换时,由于网络延迟等原因,双边的配置版本在短时间不一致,可能出现重复消费消息或者漏掉部分消息。如下图,在T3和T4两者间隔的时间,私有云在按尾号0-5过滤,而AWS
在按尾号7-9过滤,而尾号为6的消息被完全丢弃掉。反之,若AWS
先收到并启动新版本的配置,则会导致尾号为6的消息在两边各消费一次,可能导致后续的业务不正确。对于丢弃问题,可以尝试把丢弃通过延迟转化为重复消费问题,对于重复消费问题,采用强一致幂等的方针。对于不一致的区间,灰度流量减少的机房需要等待灰度流量增多的机房确认后,才能使新的配置生效减少流量,中间产生的重复消费,靠强制幂等来解决。
解决方案:跨域幂等,严格保证单条消息会在单边进行消费。
采用类型幂等的处理方式,跨域幂等应用的输入为MessageID
和IDC
机房,收到请求后会尝试将数据以SetNX
写入Redis
,key
为MessageID
,Value
为IDC
机房名称,若写入成功,则返回true
,已经存在则判断已存在的value
与请求的IDC
机房是否相同,并将判断结果作为Response
。
跨域幂等应用部署在单边,另一个机房的应用需要跨域进行访问。
由于该问题只会在灰度配置变更后短时间内产生,所以我们可以设定一个时间,应用监听灰度配置的变化,当灰度配置发生变更后的指定时间内,才需要使用这个机制去保证幂等,过了该时间后可以直接全量消费。这样可以降低总体的处理延迟。网络通信出问题调用幂等检查失败时,需要抛出NeedRetryException
,QMQ
框架会在当前regoin
内发起重试。
五、灰度依赖对DB同步的依赖
问题:生产流量逐步导入AWS
时,可能是基于订单号进行灰度判断,如传入订单号,然后在订单信息中查询到对应的用户ID
,然后按用户ID
来判断确定在私有云还是AWS
进行处理。如用户在私有云下订单,然后在AWS
进行灰度判断时,可能因为数据同步的延迟,而无法获取用户ID
,导致无法判断
时序信息如下 :
T1 : 私有云接收到下单请求,生成OrderID=10001
的订单,订单信息写入DB
T2 : 开始DB
同步,将私有云数据发向AWS
T3 : 私有云应用发送后续处理的QMQ
T4 : AWS
应用接收到T3
时发送的QMQ
,需要对其中的订单号进行灰度判断
T5 : AWS
上的DB
接收到T2
时的数据同步请求,将OrderID=10001
的数据写入AWS
的DB
问题点 : T4
时接收的请求中有OrderID=10001
,但此时AWS
的DB
中没有这个订单的信息,无法获取这个订单的UID
信息,也就无法进行灰度判断
当前解决办法:下单时进行双写,强制将灰度判断必需的信息进行双写
时序信息如下 :
T1 : 私有云接收到下单请求,生成OrderID=10001
的订单,订单信息写入DB
T2 : 私有云发起请求,将订单号和UID
信息发送到AWS
T3 : AWS
收到请求后,将单号和UID
写入DB
,并返回成功
T4 : 私有云收到AWS
的应答,确认AWS
上已经存储订单号和UID
信息
T5 : 开始DB
同步,将私有云数据发向AWS
T6 : 私有云应用发送后续处理的QMQ
T7 : AWS
应用接收到T3时发送的QMQ
,需要对其中的订单号进行灰度判断
T8 : AWS
上的DB
接收到T2
时的数据同步请求,将OrderID=10001
的数据写入AWS
的DB
重点 :T7
时需要进行灰度判断时,可以直接使用T3
写入的数据,保证能够获取到灰度判断必需的信息