背景:
2023月7月份入职新公司,初来乍到还没参入到具体的项目中,技术负责人安排写一个批量处理数据的服务,于是便有了以下文章。
数据流程大概是这样,从clickhouse表中获取数据,并从elasticserach中根据业务序号增加若干属性,批量保存到mysql中,最后再更新clickhouse中数据状态为已生成。
本文以10000条数据示例(本机环境 docker(mysql+clickhouse)、es测试服务器环境)。
思考:
很简单的处理流程,刚参加工作的时候这样的服务写过很多次(jdbc Class.forName Connection PreparedStatment Resulset一些列操作),顺手拈来的事情。
- 技术框架:springboot+mybatis plus+spring data elasticserach + dynamic datasource
- 实现思路:
- orm框架选择mybatis处理mysql和clickhouse,使用mybatis plus封装好的saveOrUpdateBatch方法省得自己写sql,
- 使用spring data elasticserach封装好的方法进行es查询;
- 要同时操作clickhouse和mysql并且需要支持事务,选择dynamic datasource;
和相关同事沟通每天的数据量大概在5-6K(这里解释下数据量为什么自己不去看,内网环境需要vpn,刚入职暂未分配账号),于是开始了编码工作。
过程实践:
定义方法:
1、生成保存到mysql的数据;
1.1、clickhouse批量查询数据;
1.2、查询数据结果根据业务编号从es中查询数据信息;
1.3、2个结果合并属性并生成mysql入库对象集合。
部分代码:
public List<QcBaseDataInfo> generate(String day, int limit) {
/**
* 1、从clickhouse取昨日数据
* 2、从es中取数据(根据序列号)
* 3、封装智能质检数据
*/
Assert.notNull(day, "业务数据日期不允许为空");
List<ChWicDataInfo> chWicDataInfos = queryFormCh(day, limit);
List<QcBaseDataInfo> qcBaseDataInfoList = getPropertiesFromEs(chWicDataInfos);
return qcBaseDataInfoList;
}
@Override
public List<ChWicDataInfo> queryFormCh(String day, int limit) {
Assert.notNull(day, "业务数据日期不允许为空");
List<ChWicDataInfo> queryResult = chWicDataInfoService.list(new QueryWrapper<ChWicDataInfo>().lambda()
.eq(ChWicDataInfo::getAcctDay, day)
.eq(ChWicDataInfo::getIsDown, STATUS_ASR_DEAL_FINISHED)
.eq(ChWicDataInfo::getHadToDataLake, false)
.last(" limit " + limit));
return queryResult;
}
@Override
public List<QcBaseDataInfo> getPropertiesFromEs(List<ChWicDataInfo> chWicDataInfos) {
List<String> serialNumbers = new ArrayList<>();
chWicDataInfos.forEach(q -> {
serialNumbers.add(q.getId());
});
log.info("start query serialNumbers:{} from es", serialNumbers);
List<EsWicDataInfo> esWicDataInfos = esWicDataRepository.findBySerialNumberIn(serialNumbers);
log.info("query from es end...");
List<QcBaseDataInfo> qcBaseDataInfoList = construct(chWicDataInfos, esWicDataInfos);
return qcBaseDataInfoList;
}
2、数据批量保存/更新到mysql中;
部分代码:
@Override
public boolean saveOrUpdateBatch(List<QcBaseDataInfo> data) {
boolean result = qcBaseDataInfoService.saveOrUpdateBatch(data, BATCH_SIZE);
return result;
}
3、clickhouse中更新数据状态为已更新;
部分代码:
本地测试后放到生产环境执行,发现5万多数据2个多小时才执行完,纳尼,不能够啊。
于是开始想问题出在了那里,开始认为可能是es查询费事,经测试后无问题;
batchSize(原设置为2000)设置的过大?测试后亦无问题;
自始至终就没想过是mybatis plus的批量更新updateBatchById的问题。
一直找不到问题于是便开始了分步骤测试。
分步骤测试:
思路:针对每步操作进行测试,排查用时过长的操作。
- mysql保存 10000条 batchSize=1024
批量保存用时:saveBatch 23s
批量保存或更新用时:saveOrUpdateBatch 111s
- mysql保存+clickhouse更新 10000条 batchSize=1024
批量保存用时:saveBatch 600s
批量保存或更新用时:saveOrUpdateBatch 未测试
- mysql保存+clickhouse更新 10000条 batchSize=1024 替换mybatis plus updateBatchById 为 update table set column={value} where id in ({ids})
批量保存用时:saveBatch 22s
批量保存或更新用时:saveOrUpdateBatch 180S
生产环境每天的数据量约60k/每天,现每天约18分钟处理完成,执行效率提高了好几个数量级。
排查到是mybatis plus的updateBatchById耗时过长,查看了源代码是批量执行,不知道为啥这么慢。
mybatis plus updateBatcheById源代码:
总结:
一次小事情反应出一个大问题,想当然的以为框架封装的方法肯定是最优的,加上经验使然导致了本次问题,以下一句话可总结,分享。
纸上得来终觉浅,觉知此事要躬行。
纸上得来终觉浅,觉知此事要躬行。
纸上得来终觉浅,觉知此事要躬行。
本文完。