1:需求描述
支持NVR升级后通道数变更,完成升级后,设备SDK上报通道数量给A平台,A平台将NVR通道数量同步给B平台,B平台自动调用C平台接口,同步通道数量给C平台,C平台重新生成通道序列号,并完成序列号入库,再由C平台将通道序列号同步给A平台,A平台完成序列号入库。
2:需求分析
我所在的小组维护的是B平台;拿到需求后,开始分析:
逻辑很简单:设备SDK在启动后,会将信息上报到A平台,A会同步到B平台,B再上报给C;所以只需要在kafka里加入更新后的通道数就可以
现有接口:A平台在设备重启后会通过Kafka上报设备信息给C、获取NVR的通道信息接口
存在问题:获取NVR通道信息的接口是http请求,可能出现网络异常掉用失败的情况,所以需要设计一个补偿机制
3:补偿逻辑
当走网络掉接口失败后,将失败后的kafka信息存入数据库,用定时任务去扫描失败信息进行kafka再次发送;考虑到以后还会有同类型的升级,补偿机制可以设计成通用的。
4:表设计
固件版本同步失败记录表:
重要字段:status:判断是否同步成功;biz_type: 业务类型,本次升级可以设为1(需要补偿的业务类型);biz_msg:业务信息(需要补偿的信息)
5:代码逻辑
更新版本信息接口:
@Override
public String updateVersionInfo(String macId, String currentVersion, String modelId) {
/**更新前置代码**/
try {
//将更新后的设备信息用kafka同步给C,加入升级后的通道数
JSONObject kafkaData = new JSONObject();
kafkaData.put("macId", macId);
kafkaData.put("version", currentVersion);
kafkaData.put("modelId", modelId);
kafkaData.put("videoCapability", 2);
//升级通道数的是nvr总线设备(1nvr总线设备(总设备)可以理解为总设备是一个大的机盒,下面挂了许多子设备,这次升级的就是设备SDK可以上报通道数量(可以挂子设备的数量)
if (cameraDaoMybatis.getDbCameraInfo(macId).getNvrType() == 1) {
//通过macID获取nvr的信息(设备id,(state 1:在线),nvrList(这个的size就是通道数));因为获取nvr信息的方法是走网络掉接口,所有可能出现网络异常掉用失败的情况
NvrDetailInfo nvrChannels = dcsManager.getNvrChannels(macId);
//当调用成功,并且nvr状态为在线
if (nvrChannels != null && nvrChannels.getState() == 1) {
//在kafka中加入升级后获取到的通道数
kafkaData.put("channelNumber", nvrChannels.getNvrChannelList().size());
}
//当调用失败,将设备id,版本,已经kafka的信息全部存入固件版本同步失败记录表
Integer row = syncFailureRecordMybatis.addSyncFailureMsg(macId, currentVersion, kafkaData.toString());
LOG.info("addSyncFailureMsg success, rows={} affected, macId={}", row, macId);
}
/**更新后置代码**/
} catch (Exception e) {
LOG.error("updateVersionInfo error: ", e);
result = JsonUtil.addErrorCode(EErrorCode.ERROR_SYSTEM, result);
}
return result.toString();
}
6:定时任务补偿
@Component
@Slf4j
public class SyncFailureRecordJob {
@Autowired
private LittlecConfig littlecConfig;
@Autowired
private SyncFailureRecordMybatis syncFailureRecordMybatis;
@Autowired
private DcsManager dcsManager;
@Autowired
private DeviceVersionProducer deviceVersionProducer;
@XxlJob("SyncFailureRecord")
public ReturnT<String> syncFailureRecord(String param) {
//配置一次升级的条数
long limitSize = littlecConfig.getConfigAsLong("camera.SyncFailureRecordMybatis.querySyncFailureRecord.limitSize", 5000L);
//记录每次启动定时器,开始扫描的起始id
Integer startId = 0;
log.info("syncFailureRecord param:limitSize:{} startId:{}", limitSize, startId);
boolean isCompleted = false;
//记录循环次数
int reconTimes = 0;
//查t_firmware_version_sync_failure_record表,一天内的数据
while (!isCompleted) {
reconTimes++;
//通过limitSize和startId查出的list集合
List<SyncFailureRecordDO> syncFailureRecordDOList = syncFailureRecordMybatis.querySyncFailureRecordList(limitSize, startId);
//取出list里的所有mcaid
String macIds = syncFailureRecordDOList.stream().map(n -> n.getMacId()).collect(Collectors.joining(","));
//日志记录循环次数,与这一次扫描的所有macid
log.info("In querySyncFailureRecordList: recon {} time with syncFailureRecordDOList size {} , collect macIds={}", reconTimes, syncFailureRecordDOList.size(),macIds);
//如果扫描出的集合小于定下的升级条数,结束循环(代表已经扫描到了最后的一组数据)
if (syncFailureRecordDOList.size() < limitSize) {
isCompleted = true;
}
//如果扫描出的集合大小大于0(代表查出数据)
if (syncFailureRecordDOList.size() > 0) {
//修改下次扫描的起始id为集合的最后一个id
startId =syncFailureRecordDOList.get(syncFailureRecordDOList.size() - 1).getId();
//循环取出list里的对象
for (SyncFailureRecordDO dto : syncFailureRecordDOList) {
if (dto != null) {
//同步需要放入kafuka的所有数据给安防管理平台
NvrDetailInfo nvrChannels = dcsManager.getNvrChannels(dto.getMacId());
JSONObject bizMsgObject = JSONObject.parseObject(dto.getBizMsg());
org.json.JSONObject kafkaData = new org.json.JSONObject();
kafkaData.put("macId", bizMsgObject.getString("macId"));
kafkaData.put("version", bizMsgObject.getString("version"));
kafkaData.put("modelId", bizMsgObject.getString("modelId"));
kafkaData.put("videoCapability", 2);
//加入升级后的通道数
kafkaData.put("channelNumber", nvrChannels.getNvrChannelList().size());
//同步方法
deviceVersionProducer.sendDeviceVersionMessage(dto.getMacId(), kafkaData.toString());
log.info("sendDeviceVersionMessage success, macId={}, channelNumber={}", dto.getMacId(), nvrChannels.getNvrChannelList().size());
//修改表中的status,为已经升级
Integer row = syncFailureRecordMybatis.updateRecordStatus(dto.getMacId());
log.info("updateRecordStatus success,rows={} affected,macId={}",row,dto.getMacId());
}
}
}
}
log.info("syncFailureRecord success");
return ReturnT.SUCCESS;
}
}
7:mybatis
List<SyncFailureRecordDO> querySyncFailureRecordList(@Param("limitSize") long limitSize, @Param("startId") Integer startId);
Integer updateRecordStatus(@Param("macId") String macId);
8:xml
<select id="querySyncFailureRecordList"
resultType="com.cmcc.littlec.camera.dao.entity.SyncFailureRecordDO">
SELECT uc.mac_id AS macId, uc.biz_msg AS bizMsg
FROM t_firmware_version_sync_failure_record uc
WHERE uc.id<![CDATA[ > ]]>#{startId}
AND uc.biz_type = 1
AND uc.status = 0
AND created >= DATE_SUB(NOW(), INTERVAL 1 DAY)
</select>
<update id="updateRecordStatus">
UPDATE t_firmware_version_sync_failure_record
SET modified = NOW(),
status = 1
WHERE mac_id = #{macId}
</update>
9:提交任务
开开心心提交代码给老大code review,老大说:这补偿为什么要我们来做,不合理;几个电话加个会议后决定:
让A平台上报SDK信息的时候带上升级后的通道数,我们只需要加个字段就可以了。。。。
好嘛!两天白干
10:总结
遇到问题能抛出去就抛出去!能麻烦别人就别麻烦自己