学成在线 - 第3章任务补偿机制实现 + 分块文件清理

news2025/1/12 0:56:49

7.9 额外实现

7.9.1 任务补偿机制

问题:如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

大家思考这个sql该如何实现?

大家尝试自己实现此任务补偿机制。

数据库表结构

media_process:

在这里插入图片描述

根据status字段判断视频文件是否正在处理,如果status值为4(正在处理),且当前时间减去create_date超过30分钟,就把status的值修改为3(处理失败),并更新其他字段的值。

具体实现流程
  1. 编写检查超时的sql

    SELECT * FROM `media_process_history`
    WHERE TIMESTAMPDIFF(MINUTE,create_date,finish_date) < 30
    
  2. 在media_process的mapper接口中添加相应的接口

    MediaProcessMapper.java

    /**
     * 查询是否有执行超过30分钟的视频处理任务
     * @param nowDate 当前任务执行的时间
     * @return List<MediaProcess>
     */
    @Select("SELECT * FROM media_process_history WHERE TIMESTAMPDIFF(MINUTE,create_date,#{date}) > 30")
    List<MediaProcess> selectTimeoutProcess(@Param("date")Date nowDate);
    
  3. 实现service代码

    /**
     * 超时任务列表
     * @param shardIndex 分片序号
     * @param shardTotal 分片总数
     * @param count 获取记录数(cpu核心数)
     * @return List<MediaProcess>
     */
    @Override
    public List<MediaProcess> getTimeoutMediaProcessList(int shardIndex, int shardTotal, int count) {
        List<MediaProcess> mediaTimeoutProcessList = mediaProcessMapper.selectTimeoutProcess(LocalDateTime.now(), shardIndex, shardTotal, count);
        return mediaTimeoutProcessList;
    }
    
  4. 业务逻辑在task中实现

    查询相应超时记录,把status和errMsg字段更新后保存到media_process表里。

    /**
     * 处理视频超时任务
     * @throws Exception
     */
    @XxlJob("videoTimeoutJobHandler")
    public void videoTimeoutJobHandler() throws Exception {
        // 分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        List<MediaProcess> mediaTimeoutProcessList = null;
        int size = 0;
        try {
            // 取出cpu核心数作为一次检查超时的记录条数
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            mediaTimeoutProcessList = mediaFileProcessService.getTimeoutMediaProcessList(LocalDateTime.now(), shardIndex, shardTotal, availableProcessors);
            size = mediaTimeoutProcessList.size();
            log.debug("取出的超时任务数量是 {} 条", size);
            if (size <= 0) {
                return ;
            }
        } catch (Exception e) {
            log.error("取出的超时任务超时", size);
            e.printStackTrace();
        }
        // 启动size个线程的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(size);
        // 定义size个插销
        CountDownLatch countDownLatch = new CountDownLatch(size);
        mediaTimeoutProcessList.forEach(mediaProcess -> {
            // 将任务加入线程池
            fixedThreadPool.execute(() -> {
                try {
                    // 把status值设置为3,并把errMsg更新
                    Long taskId = mediaProcess.getId();
                    String fileId = mediaProcess.getFileId();
                    mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "处理视频超时(30分钟)");
                    log.debug("删除超时任务");
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("删除超时任务失败,失败原因: {}", e.getMessage());
                } finally {
                    countDownLatch.countDown();
                }
            });
        });
        // 等待,给一个充裕的超时事件,防止无限等待,到达超时时间还没有处理完成则结束任务
        countDownLatch.await(30, TimeUnit.MINUTES);
    }
    

7.9.2 达到最大失败次数(暂时未实现)

问题:需要找到前端对应的接口。

当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

7.9.3 分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。(实现过程中并没有往文件表中插入正在上传的文件记录,只是在media_minio_files临时保存了分块信息)

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

实现思路

难点:怎么判断文件上传了一半不再上传了?上传分块文件只在上传视频时有用,上传分块文件时不会往media_files表里添加记录,只有所有分块上传完成,并合并成功后,才会往media_files里插入数据。所以要建立一个media_minio_files表,每上传一个分块就往media_minio_files表里插入一条分块信息,记录包括分块的上传时间。如果超过30分钟分块记录还没有被删除,说明上传到一半不传了,把minio里的分块目录删除,并删除对应的数据库里的记录。在文件合并成功后,数据库里的分块文件上传记录也要删除。

media_minio_files表结构

在这里插入图片描述

具体实现流程
  1. 上传文件块的时候,上传一个文件块完毕要把这个块的信息保存到表里。

    /**
     * 把文件分块信息入库
     * @param fileMd5 文件md5
     * @param fileName 文件名称
     * @param bucket minio桶
     * @param objectName minio中块的存储路径
     * @param chunkSize 块大小
     * @return MediaMinioFiles
     */
    @Transactional
    @Override
    public MediaMinioFiles addMediaChunkToDb(String fileMd5, String fileName, String bucket, String objectName, Long chunkSize) {
        MediaMinioFiles mediaMinioFile = new MediaMinioFiles();
        mediaMinioFile.setFileId(fileMd5);
        mediaMinioFile.setFilename(fileName);
        mediaMinioFile.setBucket(bucket);
        mediaMinioFile.setFilePath(objectName);
        mediaMinioFile.setFileSize(chunkSize);
        mediaMinioFile.setStatus("4");  // 上传中
        int insert = mediaMinioFilesMapper.insert(mediaMinioFile);
        if (insert < 0) {
            log.error("保存分块文件信息到数据库失败,{}", mediaMinioFile.toString());
            XueChengPlusException.cast("保存分块文件信息失败");
        }
        log.debug("保存分块文件信息到数据库成功,{}", mediaMinioFile.toString());
        return mediaMinioFile;
    }
    
    /**
     * 上传分块
     * @param fileMd5 文件md5
     * @param chunk 分块序号
     * @param localChunkFilePath 分块文件本地路径
     * @return RestResponse
     */
    @Override
    public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {
        // 得到分块文件的目录路径
        String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
        // 得到分块文件的路径
        String chunkFilePath = chunkFileFolderPath + chunk;
        // mimeType
        String mimeType = getMimeType(null);
        // 将文件存储到 minio
        boolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucketVideoFiles, chunkFilePath);
        // 将分块信息存到数据库中
        currentProxy.addMediaChunkToDb(fileMd5, null, bucketVideoFiles, chunkFilePath, 1024 * 5L);
        if (!b) {
            log.debug("上传分块文件失败: {}", chunkFilePath);
            return RestResponse.validfail(false, "上传分块失败");
        }
        log.debug("上传分块文件成功: {}", chunkFilePath);
        return RestResponse.success(true);
    }
    
  2. 如果没有上传失败,在成功合并块文件之后,要把上面数据库中记录删除。

    /**
     * 删除数据库中的分块文件上传记录
     * @param fileMd5 文件md5
     */
    @Transactional
    public void clearChunkFromDb(String fileMd5) {
        LambdaQueryWrapper<MediaMinioFiles> deleteQueryWrapper = new LambdaQueryWrapper<>();
        deleteQueryWrapper.eq(MediaMinioFiles::getFileId, fileMd5);
        try {
            mediaMinioFilesMapper.delete(deleteQueryWrapper);
            log.debug("删除分块上传记录成功");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("删除分块上传记录失败");
        }
    }
    

    在合并分块的最后几行:

    // 文件入库
    currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucketVideoFiles, mergeFilePath);
    // 清除文件分块
    clearChunkFiles(chunkFileFolderPath, chunkTotal);
    // 清除数据库中文件分块上传信息
    currentProxy.clearChunkFromDb(fileMd5);
    return RestResponse.success(true);
    
  3. 如果上传文件一半失败,要根据数据库中media_minio_files表中存的块的路径删除所有超时块。

    首先查询所有的超时块,得到上传的超时块列表:

    /**
     * 拿到数据库中所有上传超时的文件分块信息
     * @param time 当前任务执行时间
     * @return List<MediaMinioFiles>
     */
    @Override
    public List<MediaMinioFiles> getChunkTimeoutFiles(LocalDateTime time) {
        List<MediaMinioFiles> mediaMinioFiles = mediaMinioFilesMapper.selectTimeoutChunks(DateUtil.toDateTime(time));
        return mediaMinioFiles;
    }
    

    删除minio中所有超时块,并且删除所有超时记录

    @XxlJob("videoChunkTimeoutJobHandler")
    public void videoChunkTimeoutJobHandler() {
        log.debug(">>>>>>>>>> 开始执行检查上传超时块任务");
        // 拿到所有的超时分块任务
        List<MediaMinioFiles> chunkTimeoutFiles = mediaFileService.getChunkTimeoutFiles(LocalDateTime.now());
        if (chunkTimeoutFiles == null) {  // 没有超时任务
            log.debug("没有超时任务");
            return ;
        }
        // 根据记录中的file_path,删除minio中的所有文件
        chunkTimeoutFiles.forEach(chunk -> {
            String filePath = chunk.getFilePath();
            mediaFileService.clearSingleChunkFile(filePath);
        });
        // 删除数据库中对应的记录
        mediaFileService.clearChunkFromDb(null, LocalDateTime.now());
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1650975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

scikit-learn实现单因子线性回归模型

1.是什么&#xff1a; 针对机器学习提供了数据预处理&#xff0c;分类&#xff0c;回归等常见算法的框架 2.基于scikit-learn求解线性回归的问题&#xff1a; 2.1.求解a&#xff0c;b对新数据进行预测&#xff1a; 2.2评估模型表现&#xff08;y和y’的方差MSE&#xff09;…

论文查重率高,有什么办法降重吗?

现在大部分学校已经进入到论文查重降重的阶段了。如果查重率居高不下&#xff0c;延毕的威胁可能就在眼前。对于即将告别校园的学子们&#xff0c;这无疑是个噩梦。四年磨一剑&#xff0c;谁也不想在最后关头功亏一篑。 查重率过高&#xff0c;无非以下两种原因。要么是作为“…

小程序搜索排名优化 三步操作提升

搜索排名优化最直接的一个目的就是为了提升小程序的排名和流量&#xff0c;获取用户的信任度。当用户在搜索关键词的时候&#xff0c;能让用户看到小程序&#xff0c;增加被发现和点击的机会。 一、关键词优化&#xff1a; 1.选择合适的关键词&#xff1a;选择与小程序内容高…

解决Gitlab集成Jira时报SSL证书问题

1. 问题描述 在gitlab中集成jira的时候&#xff0c;由于jira是企业内部网址&#xff0c;并使用自己签名的SSL证书&#xff0c;一直会报证书验证不过的问题&#xff0c;报错信息如下&#xff1a; Connection failed. Check your integration settings. SSL_connect returned1 …

odoo实施之各种导航设计

odoo各种基础能力&#xff1a;活动、讨论 玩转odoo&#xff0c;真有玩的体验 odoo消息提醒能力 odoo 讨论模块 odoo 通过new message触发任务 安装odoo studio进行拖拉拽设计 查阅官方文档&#xff0c;向官方提issue 欧洲和美国&#xff0c;虽然都是英语&#xff0c;但日期格式…

win10下,svn上传.so文件失败

问题&#xff1a;win10下使用TortoiseSVN&#xff0c;svn上传.so文件失败 解决&#xff1a;右键&#xff0c;选择Settings&#xff0c;Global ignore pattern中删除*.so&#xff0c;保存即可。

Verilog中4bit超前进位加法器

4bit超前进位加法器的逻辑表达式如下&#xff1a; 中间变量GiAiBi&#xff0c;PiAi⊕BiGi​Ai​Bi​&#xff0c;Pi​Ai​⊕Bi​ 和&#xff1a;SiPi⊕Ci−1Si​Pi​⊕Ci−1​&#xff0c;进位&#xff1a;CiGiPiCi−1Ci​Gi​Pi​Ci−1​ 用Verilog语言采用门级描述方式&am…

页面嵌套,界面套娃,除了用iframe,还有其他方式吗?

UIOTOS可以了解下&#xff0c;uiotos.net&#xff0c;通过连线来代替脚本逻辑开发&#xff0c;复杂的交互界面&#xff0c;通过页面嵌套轻松解决&#xff0c;是个很新颖的思路&#xff0c;前端零代码&#xff01; 蓝图连线尤其是独创的页面嵌套和属性继承技术&#xff0c;好家…

如何使用dockerfile文件将项目打包成镜像

要根据Dockerfile文件来打包一个Docker镜像&#xff0c;你需要遵循以下步骤。这里假设你已经安装了Docker环境。 1. 准备Dockerfile 确保你的Dockerfile文件已经准备就绪&#xff0c;并且位于你希望构建上下文的目录中。Dockerfile是一个文本文件&#xff0c;包含了用户可以调…

vue3专栏项目 -- 项目介绍以及准备工作

这是vue3TS的项目&#xff0c;是一个类似知乎的网站&#xff0c;可以展示专栏和文章的详情&#xff0c;可以登录、注册用户&#xff0c;可以创建、删除、修改文章&#xff0c;可以上传图片等等。 这个项目全部采用Composition API 编写&#xff0c;并且使用了TypeScript&#…

【k8s多集群管理平台开发实践】八、client-go实现service读取列表、创建service、读取yaml配置并更新

文章目录 简介 一.k8s的service列表1.1.controllers控制器代码1.2.models模型代码 二.创建service2.1.controllers控制器代码2.2.models模分代码 三.读取和更新service的yaml配置3.1.controllers控制器代码3.2.models模型代码 四.路由设置4.1.路由设置 五.前端代码5.1.列表部分…

湖仓一体 - Apache Arrow的那些事

湖仓一体 - Apache Arrow的那些事 Arrow是高性能列式内存格式标准。它的优势&#xff1a;高效计算&#xff1a;所有列存的通用优势&#xff0c;CPU缓存友好、SIMD向量化计算友好等&#xff1b;零序列化/反序列化&#xff1a;arrow的任何数据结构都是一段连续的内存&#xff0c;…

基于单片机的无线数据传输系统设计

摘要:基于单片机的无线数据传输系统的设计,实现了温度和湿度的自动采集、无线通讯和报警功能。该系统包括了LCD1602显示电路、DHT11温湿度采集电路等,完成了基于无线数据传输的方法来实现温湿度的采集。 关键词:温湿度检测;N RF 24 L 01;单片机 0 引言 随着科技水平的提高,…

将矩阵按对角线排序(Lc1329)——排序

矩阵对角线 是一条从矩阵最上面行或者最左侧列中的某个元素开始的对角线&#xff0c;沿右下方向一直到矩阵末尾的元素。例如&#xff0c;矩阵 mat 有 6 行 3 列&#xff0c;从 mat[2][0] 开始的 矩阵对角线 将会经过 mat[2][0]、mat[3][1] 和 mat[4][2] 。 给你一个 m * n 的整…

Mac 链接 HP 136w 打印机步骤

打开 WI-FI 【1】打开打印机左下角Wi-Fi网络设计【或者点击…按钮进入WI-FI菜单】&#xff0c;找到NetWork选项OK进入&#xff1b; 【2】设置WI-FI选项&#xff1a;在菜单内找到Wi-Fi选项OK进入&#xff1b; 【3】在菜单内找到Wi-Fi Direct选项OK进入&#xff1b; 【4】在菜单…

flutter开发实战-webview_flutter 4.x版本使用

flutter开发实战-webview_flutter 4.x版本使用 在之前使用的webview_flutter版本是3.x的&#xff0c;升级到4.x后&#xff0c;使用方式有所变化。 一、webview_flutter 在工程的pubspec.yaml中引入插件 webview_flutter: ^4.4.2二、使用webview_flutter 在4.x版本中&#…

使用mxnet中的img2rec.py制作rec数据集

源码链接&#xff1a;mxnet/tools/im2rec.py at master apache/mxnet GitHub 重点关注入参函数即可&#xff0c; def parse_args():"""Defines all arguments.Returns-------args object that contains all the params"""parser argparse.A…

每日OJ题_贪心算法三②_力扣553. 最优除法

目录 力扣553. 最优除法 解析代码 力扣553. 最优除法 553. 最优除法 难度 中等 给定一正整数数组 nums&#xff0c;nums 中的相邻整数将进行浮点除法。例如&#xff0c; [2,3,4] -> 2 / 3 / 4 。 例如&#xff0c;nums [2,3,4]&#xff0c;我们将求表达式的值 "…

人大金仓V8R6迁移mysql8.0

人大金仓数据库迁移mysql mysql版本&#xff1a;mysql 8.0.22 人大金仓版本;KingbaseES V008R006C008B0014 on x64 打开数据迁移工具 等待执行完成后使用命令窗口中提示的地址在浏览器中打开&#xff1a; 登录。此处登录不用修改任何信息&#xff0c;点击登录即可 新建源数…

便携式显示器芯片组-->LDR6282 +RTD2556T(1HDMI+2Typec(DP) 点1080P eDP屏)

RTD2555TLDR6282实现 1VGA1MiniHDMI2Typec(DP In)点1080P eDP屏 LDR6282 PD 芯片负责通过CC线与电脑沟通&#xff0c;让电脑送出DP显示信号和协商相关的Typec充电电压。 Scaler负责接收两路Typec-->DP信号&#xff0c;Typec正反插时LDR6282会反馈给RTD2555T相应的IO状态&a…