优质博文:IT-BLOG-CN
一、Job单元化
什么Job
需要单元化:所有会往单元化DB
更新/删除/插入数据的Job
都需要进行单元化改造。
什么是单元化DB
【1】指配置为DRC
双向复制的DB
;
【2】单元化的DB
部署在多个Zone
,每个Zone
内的实例都会被读写,以便达到:1)Zone
故障时无需等待DB
切换;2)请求的处理链路完全封闭在Zone
内,低延迟;不同Zone
负责不同的数据,例如按照OrderId%100
进行Shard
流量,Shard
和Zone
是多对一关系,以保证多个Zone
间数据的双向复制不会产生冲突。
上云改造方案
【1】Job
部署在所有Zone
,调用所有Zone
中的实例,每个Zone
的Job
实例只处理属于本Zone
的数据,不处理其他Zone
的数据。
【2】当流量切分规则发生变化时,终止当前调度,重新获取当前Job
所在Zone
应该处理哪些Shard
并用这些Shard
的进度最小值作为新的进度开始调度。同时需要保证Job
逻辑的幂等性,防止跳跃性数据修改规则后重复执行。
为什么当规则发生变化时,取最小值和保证Job数据的幂等性:
举例:SH
处理Shard S1
和S2
,SIN
处理S0
,当需要把S1
从SH
切换至SIN
时,此时SH
的进度是3,SIN
的进度是7。在SIN
响应流量切换规则变化时,发现自己此时可以处理Shard S0
和S1
的数据,如果按照之前的进度7来进行,就会漏掉SH
未处理的5。同时,JOB
逻辑需要幂等。
Job
为什么要Sharding
分Zone
执行部署数据:
因为DB
进行了单元化改造,读写操作都在Zone
的Master
上进行,上云前的框架都是依赖一个全局唯一的DB Master
进行一些互斥和可见性的操作,所以DB
单元化后JOB
执行的数据也需要分Zone
执行。
改造后的Job
确定数据是否归属当前Zone
处理: 将处理的数据orderId
传给单元化组件UCS
客户端API
判断。
流量切分规则变化: 当Zone
故障或者Zone
之间流量比例调整时,Shard
->Zone
的映射关系会发生变化,每个Zone
内的Job
实例处理的数据也会发生变化。
【1】QSchedule Job
:QSchedule
会通知每个Zone
的Job
实例终止当前调度并发起一次新的调度,Job
实例通过QSchedule
传递过来的上下文获取当前Job
实例应该处理哪些Shard
并重新开始业务处理逻辑。
【2】非QSchedule Job
:Job
实例需要自行监听UCS
客户端的策略变化,收到通知后终止当前业务处理逻辑,然后通过UCS
客户端重新获取当前Job
实例应该处理哪些Shard
并重新开始业务处理逻辑。
代码逻辑改动
每次调度执行一次批量计算的Job
改造前
public void startQSchedule(QScheduleContext ctx) {
// 从 Offset 存储中获取上次扫描过的 Offset
Offset startOffset = offsetDao.queryStartOffset();
// 根据 Offset 获取本地调度需要处理的数据
List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);
// 业务逻辑处理
process(sourceItems);
// 计算并保存最新 Offset
offsetDao.save(calcucateLatestOffset(startOffset, sourceItems));
}
改造后
public void startQSchedule(QScheduleContext ctx) {
// #改造点# 通过 ctx 中本次调度负责的 Shards, 计算本次调度的起始 Offset (负责的Shards中最小的Offset)
Offset startOffset = offsetDao.queryStartOffset();
// 根据 Offset 获取本地调度需要处理的数据
List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);
// #改造点# 使用 RouteContextBuilders 作为 UCS 路由 过滤本次调度处理的数据
List<SourceItem> filteredItems = UCSHelper.filterCurrentZoneItems(ctx, sourceItems, RouteContextBuilders.Builder()...build());
// 业务逻辑处理
process(filteredItems);
// #改造点# 计算并保本次调度完成后的最新 Offset,并映射到本次调度时单元化策略涉及到的 Shard
UCSHelper.saveOffset(ctx, calcucateLatestOffset(startOffset, sourceItems));
}
Long-Running Job
:指Job
启动函数里存在无限循环,触发调度后,除非循环条件不满足,否则会一直持续运行,每次循环需要做的事情由业务逻辑自行控制,典型的Long-Running Job
。
改造前
public void startJob(QScheduleContext ctx) {
// 从 Offset 存储中获取上次扫描过的 Offset
Offset startOffset = offsetDao.queryStartOffset();
// 这里是无限循环的条件,使用QSchedule提供的API来判断是否本次调度已被终止
while (!ctx.isStopped()) {
// 根据 Offset 获取本地调度需要处理的数据
List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);
// 业务逻辑处理
process(sourceItems, latestScanedOffset);
// 等待一段时间
TimeUnit.SECONDS.sleep(1);
}
}
改造后
public void startJob(QScheduleContext ctx) {
// #改造点# 通过ctx中本次调度应该负责的Shard,计算本次调度的起始 Offset,一般是用多个 Shard 的最小 Offset
Offset startOffset = UCSHelper.calculateMinimumOffsetOfShards(ctx);
// 这里是无限循环的条件,使用QSchedule提供的API来判断是否本次调度已被终止
// 当单元化策略发生变化时,调度将会停止,等待 QSchedule Server 再次启动调度,此时会更新 ctx 中的 Shards 信息
while (!ctx.isStopped()) {
// 根据 Offset 获取本地调度需要处理的数据
List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);
// #改造点# 使用 RouteContextBuilders 作为 UCS 路由 过滤本次调度处理的数据
List<SourceItem> filteredItems = UCSHelper.filterCurrentZoneItems(ctx, sourceItems, RouteContextBuilders.Builder()...build());
// 业务逻辑处理
process(filteredItems);
// #改造点# 计算并更新本次调度完成后的最新 Offset
startOffset = calcucateLatestOffset(startOffset, filteredItems);
// #改造点#
UCSHelper.saveOffset(ctx, startOffset);
// 等待一段时间
TimeUnit.SECONDS.sleep(1);
}
}
正确性保证
【1】QSchedule
调度上下文会封装本次调度涉及到的Shard
信息。多次调度过程看到的Shard
信息会随单元化策略变化而变化。一次完整的调度过程内部使用的Shard
信息是一致的,不会因为单元化策略变化而变化。
【2】调度过程中,下面三个步骤使用到的Shard
信息是完全相同的,保证Shard
信息的原子性:一下三个步骤如果检测到单元化策略发生变化,会及时终止本次调度,等待QSchedule
服务端再次调度。
● 根据Shard
获取对应的StartOffset
;
● 根据Shard
过滤扫描到的数据;
● 保存处理的最新Offset
和上述Shard
的对应变化;
二、PaaS基础组件多IDC接入
根据RegionCode
确定数据所在Region
,使得常用的数据查询或业务处理操作可以在单个节点上执行,以达到数据单元化处理和数据合规策略动态调整的效果,从而避免跨节点带来额外性能消耗和数据跨境合规问题。
分布式调度中心: 因为业务中大部分JOB
都是通过扫表来对数据进行批量处理,所以多IDC
场景下则基于存储的RegionCode
将任务分散到多个IDC
,数据经过单元化过滤后,进行分片处理。