智能化状态管理:自动状态流转处理模块

news2024/11/27 21:41:25

目录

基本背景介绍

具体实现

基本数据准备

基本数据表

状态转换常量

状态转换注解

任务处理模版

各任务实现逻辑

开启比对任务进行处理

降噪字段处理任务处理

开启业务数据比对处理

业务数据比对处理

开始核对数据生成最终报告处理

核对数据生成最终报告处理

状态逻辑分发器

定时任务定义

总结


自动流转一般都是一个很大的处理系统,其中包含的处理内容是很庞大的,就这样一个大型系统的开发思路,我后面会抽空来分享一篇全局的处理和调度实现方式,本次仅针对一般如果我们需要对一些业务流程需要进行自动化处理思维的给出一个样例的自动状态流转处理模块的代码示例。如果有写的不对的地方,请留言指正!

基本背景介绍

假设我们需要一个自动的数据比对任务处理流程,基本的状态流转如下:

其中,任务创建、任务启动、任务暂停这几项开放接口交由用户手动决策,其他流程则按指定的方式直接进行自动化处理。大致模版如上,实际业务可按实际处理方式进行替换。

具体实现

基本数据准备

基本数据表

启动以上任务以及实际实现处理上,具体表暂时定义如下:

CREATE TABLE `compare_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `compare_task_name` varchar(512) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '比对任务名称',
  `replay_task_id` bigint(20) unsigned DEFAULT NULL COMMENT '回放任务ID',
  `status` int(10) unsigned DEFAULT NULL COMMENT '比对状态:-1-取消执行,0-任务创建;1-任务启动,2-降噪字段处理中,3-降噪字段处理完成,4-业务数据比对处理中-比对成功,5-业务数据比对处理完成,6-核对数据生成最终报告处理中,7-核对数据生成最终报告处理完成,8-比对失败',
  `failure_position` int(10) unsigned DEFAULT NULL COMMENT '中间失败停留状态记录',
  `failure_reason` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '失败原因',
  `noise_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '噪声数据结果记录',
  `compare_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '业务结果比对结果记录',
  `final_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '最终报告记录',
  `valid` int(11) DEFAULT '0' COMMENT ' 0当前在线 1已删除',
  `last_ping_time` int(11) NOT NULL DEFAULT '0' COMMENT '执行节点最后一次心跳时间',
  `version` int(11) NOT NULL DEFAULT '1' COMMENT '版本',
  `cname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '创建人',
  `uname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '更新人',
  `ctime` bigint(20) DEFAULT NULL COMMENT '创建时间',
  `utime` bigint(20) DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='比对任务'

状态转换常量

为了实现以上基本的数据,我们先提供一个具体的状态转换常量表,具体代码如下:

/**
 * @author yanfengzhang
 * @description 比对相关常量
 * @date 2022/5/1  23:29
 */
public class CompareCons {
    /**
     * 比对基本状态信息
     */
    public static class Status {
        /**
         * 比对任务取消执行
         */
        public static final int CANCEL = -1;
        /**
         * 比对任务创建
         */
        public static final int CREATE = 0;
        /**
         * 比对任务启动
         */
        public static final int START = 1;
        /**
         * 降噪字段处理中
         */
        public static final int NOISE_REDUCING = 2;
        /**
         * 降噪字段处理完成
         */
        public static final int NOISE_REDUCED = 3;
        /**
         * 业务数据比对处理中
         */
        public static final int BIZ_COMPARING = 4;
        /**
         * 业务数据比对处理完成
         */
        public static final int BIZ_COMPARED = 5;
        /**
         * 核对数据生成最终报告处理中
         */
        public static final int GENERATE_REPORTING = 6;
        /**
         * 核对数据生成最终报告处理完成
         */
        public static final int GENERATE_REPORTED = 7;
        /**
         * 比对失败
         */
        public static final int FAILED = 8;
    }
}

状态转换注解

自动化根据状态进行统一管理,故各个处理器实际上需要表明自己需要处理的状态行为,具体注解定义如下:

/**
 * @author yanfengzhang
 * @description
 * @date 2022/5/1  23:33
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Status {
    int status();
}

任务处理模版

基本任务处理的基类,包含通用逻辑:

  • 将提交的处理交由线程池管理。
  • 同时定义一个心跳关联到外部的Processor,当processor运行结束时,结束心跳。每个ping关联一个单独的ScheduledExecutorService,结束ping时直接shutdown线程池。
  • 每个processor在开始前需要有一定逻辑更新task的状态,否则可能导致任务被重复提交。

具体的代码实现逻辑如下:

/**
 * @author yanfengzhang
 * @description 任务处理的基类,包含任务处理的通用逻辑;
 * 核心逻辑:
 * 被提交的processor交由线程池执行;
 * 每个processor关联一个ping对象,ping实现心跳逻辑;
 * 每个processor在开始前需要有一定逻辑更新task的状态,否则可能导致任务被重复提交。
 * @date 2022/5/1  23:46
 */
public abstract class AbstractProcessor implements Runnable {

    private final Ping ping;

    private final CompareTaskPo value;

    private final Semaphore semaphore = new Semaphore(0);

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractProcessor.class);

    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8,
            16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
        private AtomicInteger threadCount = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
        }
    });

    protected AbstractProcessor(CompareTaskPo value) {
        ping = new Ping(this);
        this.value = value;
    }

    /**
     * 心跳。
     * 关联到外部的Processor,当processor运行结束时,结束心跳。
     * 每个ping关联一个单独的ScheduledExecutorService,结束ping时直接shutdown线程池。
     */
    class Ping implements Runnable {
        private WeakReference<AbstractProcessor> weakReference;
        private ReferenceQueue<AbstractProcessor> referenceQueue = new ReferenceQueue<>();
        private ScheduledExecutorService scheduleAtFixedRate = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "task-ping"));
        private CompareTaskMapper compareTaskMapper;

        Ping(AbstractProcessor processor) {
            weakReference = new WeakReference<>(processor, referenceQueue);
            compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
        }

        void ping() {
            if (referenceQueue.poll() != null) {
                /*兜底:当其关联的processor被垃圾回收后,结束心跳*/
                LOGGER.warn("【任务处理心跳】compareTaskId:{}的心跳被动结束", value.getId());
                scheduleAtFixedRate.shutdown();
            } else {
                try {
                    int curTime = (int) (System.currentTimeMillis() / 1000);
                    compareTaskMapper.updateLastPingTime(value.getId(), curTime);
                    LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常,当前时间:{} processor:{}", value.getId(), curTime, weakReference.get());
                } catch (Exception e) {
                    LOGGER.error("【任务处理心跳】compareTaskId:{}心跳时间更新异常,exception:", value.getId(), e);
                }
            }
        }

        @Override
        public void run() {
            ping();
        }

        void start() {
            LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常开启", value.getId());
            scheduleAtFixedRate.scheduleWithFixedDelay(this, 2, 2, TimeUnit.SECONDS);
        }

        void close() {
            LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常结束", value.getId());
            scheduleAtFixedRate.shutdown();
        }
    }

    protected abstract boolean actualProcess(CompareTaskPo value);

    protected abstract void end(CompareTaskPo value);

    private void done() {
        ping.close();
        semaphore.release(1);
    }

    public final void process() {
        THREAD_POOL_EXECUTOR.submit(this);
    }

    public final boolean allowRecycle() {
        return semaphore.tryAcquire();
    }

    @Override
    public final void run() {
        this.ping.start();
        try {
            /*实际状态下任务处理内容成功后进行状态流转*/
            if (actualProcess(value)) {
                end(value);
            }
        } finally {
            done();
        }
    }
}

各任务实现逻辑

主要的任务处理流程如下几个重要处理器实现,其中状态可以由用户自动暂停更新状态,更新状态后相关流程被中断,该部分实现放置在定时任务中进行处理,具体见后面的代码,同时如果各任务中间有处理失败的内容,我们也会中断流程并记录具体失败的原因是什么好方便后续问题的定位。

开启比对任务进行处理

主要功能:用户创建比对任务没有问题后,点击任务启动,自动化处理流程开始,对相关业务数据进行分析验证,然后更新相关的状态,具体样例实现如下:

/**
 * @author yanfengzhang
 * @description 开启比对任务进行处理
 * @date 2022/5/2  00:05
 */
@Status(status = CompareCons.Status.START)
@Slf4j
public class StartCompareProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public StartCompareProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开启比对任务进行处理:当前处理id为{}", value.getId());
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        /*1检查数据正确性:对应的回放信息是否满足要求,如果不满足则直接中止比对任务*/
        return startCompareProcessorCheck(compareTaskPo);
    }

    /**
     * 检查数据正确性:对应的回放信息是否满足要求,如果不满足则直接中止比对任务
     * 如果没有问题,则认为已经成功
     *
     * @param compareTaskPo 比对任务信息
     * @return true-基本检查通过;false-检查不通过
     */
    private boolean startCompareProcessorCheck(CompareTaskPo compareTaskPo) {
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开启比对任务进行处理完成,待更新状态:当前处理id为{}", value.getId());
        try {
            /*更新状态为"降噪字段处理中"*/
            compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.NOISE_REDUCING);
        } catch (Exception e) {
            log.info("开启比对任务进行处理完成异常异常异常异常:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCING);
        }

        log.info("开启比对任务进行处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCING);
    }
}

降噪字段处理任务处理

主要功能:假设我们通过比对两次master处理回放来分析得出一些噪声处理信息,比对处理噪声的主要代码如下:

/**
 * @author yanfengzhang
 * @description 降噪字段处理任务处理
 * @date 2022/5/2  00:29
 */
@Status(status = CompareCons.Status.NOISE_REDUCING)
@Slf4j
public class NoiseReduceProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public NoiseReduceProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("降噪字段处理任务处理:当前处理id为{}", value.getId());
        /*1.根据回放任务id来查看对应回放记录中的数据信息*/
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        if (Objects.isNull(compareTaskPo)) {
            log.error("降噪字段处理任务处理异常:比对任务{}并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");
            return false;
        }

        ReplayTaskApplicationService replayTaskApplicationService = BeanFactoryUtil.getBean(ReplayTaskApplicationService.class);
        ReplayDataResultValue replayDataResultValue = replayTaskApplicationService.getBdfPathListByReplayTaskId(compareTaskPo.getReplayTaskId());
        if (Objects.isNull(replayDataResultValue) || StringUtils.isBlank(replayDataResultValue.getMasterFirstBdfPath())
                || StringUtils.isBlank(replayDataResultValue.getMasterSecondBdfPath())) {
            log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录id相关数据文件数据并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "对应回放记录id相关数据文件数据并不存在或不完整!");
            return false;
        }

//        String masterFirstBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.mfbdf.rpresult";
//        String masterSecondBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.msbdf.rpresult";
        String masterFirstBdfPath = replayDataResultValue.getMasterFirstBdfPath();
        String masterSecondBdfPath = replayDataResultValue.getMasterSecondBdfPath();
        /*2.检查回放记录中两次master文件对应的条数是否一致*/
        Long masterFirstBdfLines = null;
        Long masterSecondBdfLines = null;
        try {
            masterFirstBdfLines = Files.lines(Paths.get(masterFirstBdfPath)).count();
            masterSecondBdfLines = Files.lines(Paths.get(masterSecondBdfPath)).count();
        } catch (Exception e) {
            log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录中两次master回放文件读取异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "两次master回放文件读取异常");
            return false;
        }
        if (!Objects.equals(masterFirstBdfLines, masterSecondBdfLines)) {
            log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录中两次master文件数据条数并不一致!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "两次master文件数据条数并不一致");
            return false;
        }

        /*3.文件各行进行数据对比并进行记录*/
        try {
            String compareMasterFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_降噪比对数据.txt";
            for (int i = 1; i < masterFirstBdfLines + 1; i++) {
                String masterFirstBdfStr = FileUtils.readAppointedLineNumber(masterFirstBdfPath, i);
                String masterSecondBdfStr = FileUtils.readAppointedLineNumber(masterSecondBdfPath, i);
                JsonNode diffInfo = JsonDealUtils.getCompareJsonResult(masterFirstBdfStr, masterSecondBdfStr);
                FileUtils.writeContent(compareMasterFile, diffInfo.toString());
            }
            compareTaskMapper.updateNoiseResult(value.getId(), compareMasterFile);
        } catch (Exception e) {
            log.error("降噪字段处理任务处理异常:比对任务{}生成噪声数据异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "生成噪声数据异常");
            return false;
        }

        /*4.执行完毕无异常,进行状态变更*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("降噪字段处理任务处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"降噪字段处理完成"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.NOISE_REDUCED);
        log.info("降噪字段处理任务处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCED);
    }
}

开启业务数据比对处理

主要功能:没有其他检查数据内容的话,可以直接进行状态转换,我这边暂时忽略检查!

/**
 * @author yanfengzhang
 * @description 开启业务数据比对处理
 * @date 2022/5/2  00:36
 */
@Status(status = CompareCons.Status.NOISE_REDUCED)
@Slf4j
public class StartBizCompareProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public StartBizCompareProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开启业务数据比对处理:当前处理id为{}", value.getId());
        /*该状态下当前不做任何处理,基本没有检查的相关启动条件*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开启业务数据比对处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"业务数据比对处理中"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.BIZ_COMPARING);
        log.info("开启业务数据比对处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.BIZ_COMPARING);
    }
}

业务数据比对处理

主要功能:对本次业务代码改动和master代码进行对比来分析对应的内容处理变化统计,具体代码如下:

/**
 * @author yanfengzhang
 * @description 业务数据比对处理
 * @date 2022/5/2  00:53
 */
@Status(status = CompareCons.Status.BIZ_COMPARING)
@Slf4j
public class BizCompareProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public BizCompareProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开启业务数据比对处理处理:当前处理id为{}", value.getId());
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        /*1.根据回放任务id来查看对应回放记录中的数据信息*/
        if (Objects.isNull(compareTaskPo)) {
            log.error("开启业务数据比对处理处理异常:比对任务{}并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");
            return false;
        }

        ReplayTaskApplicationService replayTaskApplicationService = BeanFactoryUtil.getBean(ReplayTaskApplicationService.class);
        ReplayDataResultValue replayDataResultValue = replayTaskApplicationService.getBdfPathListByReplayTaskId(compareTaskPo.getReplayTaskId());
        if (Objects.isNull(replayDataResultValue) || StringUtils.isBlank(replayDataResultValue.getMasterFirstBdfPath())
                || StringUtils.isBlank(replayDataResultValue.getFeatureBdfPath())) {
            log.error("开启业务数据比对处理处理异常:比对任务{}对应回放记录id相关数据文件数据并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "对应回放记录id相关数据文件数据并不存在或不完整!");
            return false;
        }

//        String masterFirstBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.mfbdf.rpresult";
//        String featureBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.fbdf.rpresult";
        String masterFirstBdfPath = replayDataResultValue.getMasterFirstBdfPath();
        String featureBdfPath = replayDataResultValue.getFeatureBdfPath();
        /*2.检查回放记录中master文件和dev文件对应的条数是否一致*/
        Long masterFirstBdfLines = null;
        Long featureBdfLines = null;
        try {
            masterFirstBdfLines = Files.lines(Paths.get(masterFirstBdfPath)).count();
            featureBdfLines = Files.lines(Paths.get(featureBdfPath)).count();
        } catch (Exception e) {
            log.error("比对任务{}对应回放记录中master回放文件或dev回放文件读取异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "master回放文件或dev回放文件读取异常");
            return false;
        }
        if (!Objects.equals(masterFirstBdfLines, featureBdfLines)) {
            log.error("比对任务{}对应回放记录中master回放文件和dev回放文件数据条数并不一致!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "master回放文件和dev回放文件数据条数并不一致");
            return false;
        }

        /*3.文件各行进行数据对比并进行记录*/
        try {
            String compareBizFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_业务比对数据.txt";
            for (int i = 1; i < masterFirstBdfLines + 1; i++) {
                String masterFirstBdfStr = FileUtils.readAppointedLineNumber(masterFirstBdfPath, i);
                String featureBdfStr = FileUtils.readAppointedLineNumber(featureBdfPath, i);
                JsonNode diffInfo = JsonDealUtils.getCompareJsonResult(masterFirstBdfStr, featureBdfStr);
                FileUtils.writeContent(compareBizFile, diffInfo.toString());
            }
            compareTaskMapper.updateCompareResult(value.getId(), compareBizFile);
        } catch (Exception e) {
            log.error("比对任务{}生成业务比对数据异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "生成业务比对数据异常");
            return false;
        }

        /*4.执行完毕无异常,进行状态变更*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开启业务数据比对处理处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"业务数据比对处理完成"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.BIZ_COMPARED);
        log.info("开启业务数据比对处理处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.BIZ_COMPARED);
    }
}

开始核对数据生成最终报告处理

主要功能:没有其他检查数据内容的话,可以直接进行状态转换,我这边暂时忽略检查!

/**
 * @author yanfengzhang
 * @description 开始核对数据生成最终报告处理
 * @date 2022/5/2  00:59
 */
@Status(status = CompareCons.Status.BIZ_COMPARED)
@Slf4j
public class StartGenerateReportProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public StartGenerateReportProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理:当前处理id为{}", value.getId());
        /*该状态下当前不做任何处理,基本没有检查的相关启动条件(检查相关文件是否存在)*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"核对数据生成最终报告处理中"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.GENERATE_REPORTING);
        log.info("开始核对数据生成最终报告处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.GENERATE_REPORTING);
    }
}

核对数据生成最终报告处理

主要功能:结合前面处理生成的数据进行最终报告的比对任务生成报告,具体处理流程如下:

/**
 * @author yanfengzhang
 * @description 核对数据生成最终报告处理
 * @date 2022/5/2  01:20
 */
@Status(status = CompareCons.Status.GENERATE_REPORTING)
@Slf4j
public class GenerateReportProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public GenerateReportProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理:当前处理id为{}", value.getId());
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        if (Objects.isNull(compareTaskPo)) {
            log.error("开始核对数据生成最终报告处理异常:比对任务{}并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");
            return false;
        }
        /*1.根据回放任务id来查看对应回放记录中的数据信息*/
        String compareBizResultPath = compareTaskPo.getCompareResult();
        String noiseResultPath = compareTaskPo.getNoiseResult();
        /*2.检查回放记录中master文件和dev文件对应的条数是否一致*/
        Long compareBizResultLines = null;
        Long noiseResultLines = null;
        try {
            compareBizResultLines = Files.lines(Paths.get(compareBizResultPath)).count();
            noiseResultLines = Files.lines(Paths.get(noiseResultPath)).count();
        } catch (Exception e) {
            log.error("比对任务{}对应核对数据生成最终报告读取文件异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "对应核对数据生成最终报告读取文件异常");
            return false;
        }
        if (!Objects.equals(compareBizResultLines, noiseResultLines)) {
            log.error("比对任务{}对应核对数据生成最终报告相关文件数据条数并不一致!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "对应核对数据生成最终报告相关文件数据条数并不一致");
            return false;
        }

        /*3.文件各行进行数据对比并进行记录*/
        try {
            String compareBizFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_最终结果报告.txt";
            for (int i = 1; i < compareBizResultLines + 1; i++) {
                String compareBizResultStr = FileUtils.readAppointedLineNumber(compareBizResultPath, i);
                String noiseResultStr = FileUtils.readAppointedLineNumber(noiseResultPath, i);
                List<CompareDataMeta> compareDataMetas = CompareDataResult.getCompareDataResult(noiseResultStr, compareBizResultStr);
                FileUtils.writeContent(compareBizFile, JSON.toJSONString(compareDataMetas));
            }
            compareTaskMapper.updateNoiseResult(value.getId(), compareBizFile);
        } catch (Exception e) {
            log.error("比对任务{}核对数据生成最终报告数据处理异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "核对数据生成最终报告数据处理异常");
            return false;
        }

        /*4.执行完毕无异常,进行状态变更*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"核对数据生成最终报告处理完成"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.GENERATE_REPORTED);
        log.info("开始核对数据生成最终报告处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.GENERATE_REPORTED);
    }
}

状态逻辑分发器

我们针对以上任务处理器,对实际业务处理进行分析并将其转发到相关的处理器上进行自动化处理,具体实现逻辑如下:

/**
 * @author yanfengzhang
 * @description 负责对task任务不同状态运行逻辑的分发。
 * @date 2022/5/2  01:44
 */
public class EventDispatcher {
    private static Map<Integer, Class> status2Processor = Maps.newHashMap();
    private static Set<AbstractProcessor> curProcessors = new HashSet<>();
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);

    static {
        Reflections reflections = new Reflections("com.sankuai.tsp.product.bsap.domain.compare.event.processor.impl");
        Set<Class<?>> classSet = reflections.getTypesAnnotatedWith(Status.class);
        for (Class<?> cl : classSet) {
            Annotation[] annotations = cl.getAnnotations();
            for (Annotation a : annotations) {
                if (a instanceof Status) {
                    Status status = (Status) a;
                    status2Processor.put(status.status(), cl);
                }
            }
        }
    }

    /**
     * dispatch方法目前只有cronServer线程调用,
     * 但是为了防止出现多线程调用导致的curProcessors被并发修改问题,所以用synchronized同步
     *
     * @param status        当前任务状态
     * @param compareTaskPo 比对任务消息数据
     * @return
     */
    public static synchronized boolean dispatch(int status, CompareTaskPo compareTaskPo) {
        AbstractProcessor processor = getInstance(status, compareTaskPo);
        if (processor != null) {
            curProcessors.add(processor);
            processor.process();
            return true;
        }
        return false;
    }

    private static AbstractProcessor getInstance(int status, CompareTaskPo compareTaskPo) {
        /*zyf:主动清理一次*/
        cleanDirty();
        if (containsStatus(status)) {
            try {
                Constructor constructor = status2Processor.get(status).getConstructor(CompareTaskPo.class);
                return (AbstractProcessor) constructor.newInstance(compareTaskPo);
            } catch (Exception ex) {
                LOGGER.error("EventDispatcher dispatcher getInstance error, exception:", ex);
            }
        }
        return null;
    }

    public static boolean containsStatus(int status) {
        return status2Processor.containsKey(status);
    }

    public static synchronized void cleanDirty() {
        curProcessors.removeIf(AbstractProcessor::allowRecycle);
    }

    public static int getTaskCount() {
        return curProcessors.size();
    }
}

定时任务定义

针对以上的内容,我们内部维护一个基本的定时器来完成实际的业务自动化流转处理,主要代码和业务处理如下:

/**
 * @author yanfengzhang
 * @description 定时任务:定时读取数据库中比对数据需要处理的task任务,并分发到响应的processor处理。
 * @date 2022/5/2  02:18
 */
@Component
@DependsOn("beanFactoryUtil")
public class CronServer implements InitializingBean {

    @Autowired
    private CompareTaskMapper compareTaskMapper;

    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
    private static final Logger LOGGER = LoggerFactory.getLogger(CronServer.class);

    @Override
    public void afterPropertiesSet() throws Exception {
        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CompareCronTask(), 20, 3, TimeUnit.SECONDS);
    }

    class CompareCronTask implements Runnable {
        @Override
        public void run() {
            if (BsapCompareSwitch.cronServerPause()) {
                LOGGER.warn("--------------cron server pause--------------");
                return;
            }
            int taskCount = EventDispatcher.getTaskCount();
            /*清理已经完成的任务*/
            EventDispatcher.cleanDirty();
            LOGGER.warn("[--------当前正在运行的任务数量为:{}-------]", EventDispatcher.getTaskCount());
            if (taskCount != 0 && EventDispatcher.getTaskCount() == 0) {
                LOGGER.warn("[------------------------任务数量存在问题,主动进行gc处理中---------------------------]");
                System.gc();
            }
            int curSecond = (int) (System.currentTimeMillis() / 1000);
            try {
                List<CompareTaskPo> compareTaskPos = compareTaskMapper.selectCompareTaskPoByTimeRange(curSecond - 20);
                if (CollectionUtils.isEmpty(compareTaskPos)) {
                    return;
                }
                for (CompareTaskPo compareTaskPo : compareTaskPos) {
                    /*如果处理的内容不在我们规定的范围时直接跳出*/
                    if (!EventDispatcher.containsStatus(compareTaskPo.getStatus())) {
                        continue;
                    }
                    /**
                     * 思考:
                     * 尝试更新一下last_ping_update的时间,更新成功代表抢锁成功,然后执行任务。
                     * 如果更新成功但是执行失败,待后续CronServer运行时再次尝试。
                     * 每台服务器每次定时任务只运行一个任务,防止同一台服务器抢占多个任务导致压力过大、负载不均衡的问题。
                     * (由于目前任务运行周期在多台服务器是一致的,所以极端情况下可能会出现任务被一台机器抢占的情况,
                     * 后续可以考虑使不同机器的运行周期随机或者引入分布式任务分配(负载均衡)策略)
                     */
                    if (compareTaskMapper.updateLastPingTimeByVersion(compareTaskPo.getId(), curSecond - 15, compareTaskPo.getVersion()) > 0) {
                        compareTaskPo.setVersion(compareTaskPo.getVersion() + 1);
                        compareTaskPo.setLastPingTime(curSecond - 15);
                        if (EventDispatcher.dispatch(compareTaskPo.getStatus(), compareTaskPo)) {
                            LOGGER.warn("CronServer 提交一个任务,任务id为{}, 任务详细信息:{}", compareTaskPo.getId(), JSON.toJSON(compareTaskPo));
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                LOGGER.error("server cron run catch an exception:", e);
            }
        }
    }
}

总结

整体上的大致模版实现已如上进行了简化,其中有不同的理解的可留言讨论,后续复杂系统的内容后续有时间在进行分享,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1825571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[渗透测试学习] SolarLab-HackTheBox

SolarLab-HackTheBox 信息搜集 nmap扫描端口 nmap -sV -v 10.10.11.16扫描结果如下 PORT STATE SERVICE VERSION 80/tcp open http nginx 1.24.0 135/tcp open msrpc Microsoft Windows RPC 139/tcp open netbios-ssn Microsoft Windows n…

观光车司机N2精选考试题库(附答案)

一、判断题 1、在使用手电钻、电砂轮等手持电动工具时,为保证安全,应该装设漏电保护器。(√) 2、碳弧气刨的方法设备工具简单.操作使用安全。(√) 3、事故调查组有权向有关单位和个人了解与事故有关的情况。()(√) 4、发射药(动力药)是能产生发射和推进效应的烟火药,有粒状、粉…

SAP BOM项目类别N非库存项目简介

在BOM的项目类别中用的最多的就是L类型的库存管理,还有T类型的文本类型,但是在实际业务中也会存在物料不做库存管理,但是物料需要进行成本的管控,进入对应的工单成本中,比如在电子行业中需要烧录的正版软件,或者是电脑制造行业中需要预装的正版的Windows系统,购买的软件…

【SpringBoot】SpringBoot:简化数据库操作与API开发

文章目录 引言SpringBoot概述数据库操作简化传统数据库操作的挑战使用Spring Data JPA示例&#xff1a;定义Repository接口实现服务层 使用MyBatis示例&#xff1a;配置MyBatis定义Mapper接口 API开发简化RESTful API概述创建RESTful API示例&#xff1a;定义控制器 高级特性与…

【二】【动态规划NEW】91. 解码方法,62. 不同路径,63. 不同路径 II

91. 解码方法 一条包含字母 A-Z 的消息通过以下映射进行了 编码 &#xff1a; ‘A’ -> “1” ‘B’ -> “2” … ‘Z’ -> “26” 要 解码 已编码的消息&#xff0c;所有数字必须基于上述映射的方法&#xff0c;反向映射回字母&#xff08;可能有多种方法&#xff…

小知识点快速总结:Batch Normalization Layer(BN层)的作用

本系列文章只做简要总结&#xff0c;不详细说明原理和公式。 目录 1. 参考文章2. 主要作用3. 具体分析3.1 正则化&#xff0c;降低过拟合3.2 提高模型收敛速度&#xff0c;加速训练3.3 减少梯度爆炸或者梯度消失的情况 4. 补充4.1 BN层做的是标准化不是归一化4.2 BN层的公式4.…

洗地机提升渗透率,降价不是唯一解

作者 | 辰纹 来源 | 洞见新研社 添可2019年开创洗地机赛道时&#xff0c;看好的人不多&#xff0c;在扫地机器人正被风口吹在天上翻滚的那个年代&#xff0c;洗地机被扣上了“智商税”的标签。 洗地机到底有没有用&#xff0c;市场用脚投票。 奥维云网数据显示&#xff0c…

PS通过GTX实现SFP网络通信2

PS 程序设计 LWIP 库修改 修改原因 SDK 2017.4 自带的 LWIP 1.4.1 库的版本为 2.0 &#xff0c;直接使用该库将无法通过 SFP 实现网络通信。 因此需要进行修改。 修改的原因有 2 个&#xff0c;第 1 个原因是由于 2017.4 版本产生的新 bug 。在 2015.4 版本…

Java数据结构之ArrayList(如果想知道Java中有关ArrayList的知识点,那么只看这一篇就足够了!)

前言&#xff1a;ArrayList是Java中最常用的动态数组实现之一&#xff0c;它提供了便捷的操作接口和灵活的扩展能力&#xff0c;使得在处理动态数据集合时非常方便。本文将深入探讨Java中ArrayList的实现原理、常用操作以及一些使用场景。 ✨✨✨这里是秋刀鱼不做梦的BLOG ✨✨…

Kotlin 语言基础学习

什么是Kotlin ? Kotiln翻译为中文是:靠他灵。它是由JetBrains 这家公司开发的,JetBrains 是一家编译器软件起家的,例如常用的WebStorm、IntelliJ IDEA等软件。 Kotlin官网 JetBrains 官网 Kotlin 语言目前的现状: 目前Android 已将Kotlin 作为官方开发语言。 Spring 框…

Java—读取properties配置文件

编写配置文件 usernameroot password123456 urljdbc:mysql://localhost:3306/myDatabase driverClassNamecom.mysql.cj.jdbc.Driver 编写测试类 import java.io.FileInputStream; import java.io.IOException; import java.util.Enumeration; import java.util.Properties;/*…

vagrant putty错误的解决

使用Vagrant projects for Oracle products and other examples 新创建的虚机&#xff0c;例如vagrant-projects/OracleLinux/8。 用vagrant ssh可以登录&#xff1a; $ vagrant ssh > vagrant: Getting Proxy Configuration from Host...Welcome to Oracle Linux Server …

专业学习|博弈论-博弈论概述

&#xff08;一&#xff09;认识博弈论&#xff1a;解析复杂决策与策略 &#xff08;1&#xff09;认识博弈 博弈论广泛应用于分析个体间因利益冲突而产生的决策问题。通过构建不同模型来探讨如经贸关系、军事威胁等问题&#xff0c;旨在寻找均衡解并提供新知&#xff0c;相较…

C语言概述与历史

引言 C语言是一门历史悠久且影响深远的编程语言。它不仅为后继的许多编程语言奠定了基础&#xff0c;同时因其高效性和灵活性在系统编程和嵌入式开发领域得到了广泛应用。本篇文章将全面介绍C语言的起源与发展、设计目标与理念&#xff0c;以及C语言的标准演化历程&#xff0c;…

字符数组基础知识及题目

死识。。。 字符该如何存储呢&#xff1f;这一点我们在以前就接触过了。用char来存储。 如何输入一个单词呢&#xff1f; char a[10002]; scanf("%s",a); 就不用地址符了。 如何输入句子呢&#xff1f; char a[100002]; gets(a); gets是读入句子的&#xff0c…

利用智能交流控制设计方法实现更好的家电安全

从机电到数字控制的转变首先是通过现成的电子设备完成的——系统架构是围绕 MCU、分立晶体管和高压双向可控硅构建的。 家用电器的这场小型革命部分是由于减少能源和水的浪费以及提高易用性的需求日益增长而推动的。 随着市场及其标准的化&#xff0c;性能和成本效率一直是家…

用MATLAB绘制地球围绕太远运动而月球围绕地球运动

绘制 MATLAB代码: clc;close all;clear all;warning off;%清除变量 rand(seed, 100); randn(seed, 100); format long g;% 初始化参数 num_frames 1000; % 动画帧数 G200; dt 0.01; % 时间步长% 设置太阳、地球和月球的初始位置和半径 sun_position [0, 0]; earth_radius …

Docker MySQL Shutting down mysqld

6月初至6月15日发现MySQL无故停机多次&#xff0c;导致系统无法使用。接下来各种日志查看&#xff0c;排查原因。先附上一份Docker种MySQL的日志的截图。 一、根据Docker的日志初步估计是数据库内存飙升&#xff0c;从而被系统杀掉进程 查询Linux系统日志&#xff0c;在宿主机…

海康威视-按时间下载录像文件

目录 1、流程图 1.1、录像查找 1.2、录下下载 2、按时间下载 2.1、开启下载 2.2、后台下载 2.2.1、方式一 2.2.2、方式二 3、问题整理 3.1、错误码34 3.2、错误码10 3.3、下载的文件大小为0kb 4、错误码 由于没有在官方文档中找到通过ISAPI协议透传实现按时间下…

---String类---

在c语言中要使用字符串&#xff0c;只能通过字符指针或者字符数组&#xff0c;然后再通过函数进行各种操作&#xff0c;这种将变量和变量方法分开的方式显然不符合面向对象的编程&#xff0c;所以java中添加了String这个类 String类构造 而对于string有很多的方法 字符串长度…