7.1 分布式任务处理
7.1.1 什么是分布式任务调度
视频上传成功需要对视频的格式进行处理,如何用Java程序对视频进行处理呢?这里有一个关键的需求就是当视频比较多的时候我们如何可以高效处理。
如何去高效处理一批任务呢?
1、多线程
多线程是充分利用单机的资源。
2、分布式加多线程
充分利用多台计算机,每台计算机使用多线程处理。
方案2可扩展性更强。
方案2是一种分布式任务调度的处理方案。
什么是分布式任务调度?
我们可以先思考一下下面业务场景的解决方案:
某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券。
某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
某电商平台每天凌晨3点,要对订单中的无效订单进行清理。
12306网站会根据车次不同,设置几个时间点分批次放票。
电商整点抢购,商品价格某天上午8点整开始优惠。
商品成功发货后,需要向客户发送短信提醒。
类似的场景还有很多,我们该如何实现?
以上这些场景,就是任务调度所需要解决的问题。
任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
如何实现任务调度?
多线程方式实现:
学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。
以下代码简单实现了任务调度的功能:
Javapublic static void main(String[] args) { //任务执行间隔时间 final long timeInterval = 1000; Runnable runnable = new Runnable() { public void run() { while (true) { //TODO:something try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread thread = new Thread(runnable); thread.start(); } |
---|
上面的代码实现了按一定的间隔时间执行任务调度的功能。
Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,下边我们了解下。
Timer方式实现:
Javapublic static void main(String[] args){ Timer timer = new Timer(); timer.schedule(new TimerTask(){ @Override public void run() { //TODO:something } }, 1000, 2000); //1秒后开始调度,每2秒执行一次 } |
---|
Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。
ScheduledExecutor方式实现:
Javapublic static void main(String [] agrs){ ScheduledExecutorService service = Executors.newScheduledThreadPool(10); service.scheduleAtFixedRate( new Runnable() { @Override public void run() { //TODO:something System.out.println(“todo something”); } }, 1, 2, TimeUnit.SECONDS); } |
---|
Java 5 推出了基于线程池设计的 ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。
Timer 和 ScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。
Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。
第三方Quartz方式实现:
Javapublic static void main(String [] agrs) throws SchedulerException { //创建一个Scheduler SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); //创建JobDetail JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class); jobDetailBuilder.withIdentity(“jobName”,“jobGroupName”); JobDetail jobDetail = jobDetailBuilder.build(); //创建触发的CronTrigger 支持按日历调度 CronTrigger trigger = TriggerBuilder.newTrigger() .withIdentity(“triggerName”, “triggerGroupName”) .startNow() .withSchedule(CronScheduleBuilder.cronSchedule(“0/2 * * * * ?”)) .build(); //创建触发的SimpleTrigger 简单的间隔调度 /SimpleTrigger trigger = TriggerBuilder.newTrigger() .withIdentity(“triggerName”,“triggerGroupName”) .startNow() .withSchedule(SimpleScheduleBuilder .simpleSchedule() .withIntervalInSeconds(2) .repeatForever()) .build();/ scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); } public class MyJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext){ System.out.println(“todo something”); } } |
---|
通过以上内容我们学习了什么是任务调度,任务调度所解决的问题,以及任务调度的多种实现方式。
什么是分布式任务调度?
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:
分布式调度要实现的目标:
不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
1、并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
2、高可用
若某一个实例宕机,不影响其他实例来执行任务。
3、弹性扩容
当集群中增加实例就可以提高并执行任务的处理效率。
4、任务管理与监测
对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
5、避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
7.1.2 XXL-JOB介绍
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:https://www.xuxueli.com/xxl-job/
文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B
XXL-JOB主要有调度中心、执行器、任务:
调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器:
负责接收调度请求并执行任务逻辑;
只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
任务: 负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:
执行流程:
1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
2.达到任务触发条件,调度中心下发任务
3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
4.执行器消费内存队列中的执行结果,主动上报给调度中心
5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
7.1.3 搭建XXL-JOB
7.1.3.1 调度中心
首先下载XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
码云:https://gitee.com/xuxueli0323/xxl-job
项目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
也可从课程资料目录获取,解压xxl-job-2.3.1.zip
使用IDEA打开解压后的目录
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
doc :文档资料,包含数据库脚本
在下发的虚拟机的MySQL中已经创建了xxl_job_2.3.1数据库
如下图:
执行sh /data/soft/restart.sh自动启动xxl-job
访问:http://192.168.101.65:8088/xxl-job-admin/
账号和密码:admin/123456
如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。
7.1.3.2 执行器
下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。
1、首先在媒资管理模块的service工程添加依赖,在项目的父工程已约定了版本2.3.1
XML com.xuxueli xxl-job-core |
---|
2、在nacos下的media-service-dev.yaml下配置xxl-job
YAMLxxl: job: admin: addresses: http://localhost:8080/xxl-job-admin executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token |
---|
注意配置中的appname这是执行器的应用名,稍后在调度中心配置执行器时要使用。
3、配置xxl-job的执行器
将示例工程下配置类拷贝到媒资管理的service工程下
拷贝至:
4、下边进入调度中心添加执行器
点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
添加成功:
到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功
同时观察调度中心中的执行器界面
在线机器地址处已显示1个执行器。
7.1.3.3 执行任务
下边编写任务,任务类的编写方法参考示例工程,如下图:
在service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类
Javapackage com.xuecheng.media.service.jobhandler; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * @description 测试执行器 * @author Mr.M * @date 2022/9/13 20:32 * @version 1.0 / @Component @Slf4j public class SampleJob { /* * 1、简单任务示例(Bean模式) */ @XxlJob(“testJob”) public void testJob() throws Exception { log.info(“开始执行…”); } } |
---|
下边在调度中心添加任务,进入任务管理
点击新增,填写任务信息
注意红色标记处:
调度类型选择Cron,并配置Cron表达式设置定时策略。
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
xxl-job提供图形界面去配置:
一些例子如下:
30 10 1 * * ? 每天1点10分30秒触发
0/30 * * * * ? 每30秒触发一次
- 0/10 * * * ? 每10分钟触发一次
运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
JobHandler任务方法名填写@XxlJob注解中的名称。
添加成功,启动任务
通过调度日志查看任务执行情况
下边启动媒资管理的service工程,启动执行器。
观察执行器方法的执行。
如果要停止任务需要在调度中心操作
任务跑一段时间注意清理日志
7.1.4 分片广播
掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。
执行器在集群部署下调度中心有哪些调度策略呢?
查看xxl-job官方文档,阅读高级配置相关的内容:
SQL高级配置: - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括; FIRST(第一个):固定选择第一个机器; LAST(最后一个):固定选择最后一个机器; ROUND(轮询):; RANDOM(随机):随机选择在线的机器; CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举; LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举; FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务; |
---|
第一个:每次调度选择集群中第一台执行器。
最后一个:每次调度选择集群中最后一台执行器。
轮询:按照顺序每次调度选择一台执行器去调度。
随机:每次调度随机选择一台执行器去调度。
CONSISTENT_HASH:按任务的hash值选择一台执行器去调度。
其它策略请自行阅读文档。
下边要重点说的是分片广播策略,分片是指是调度中心将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中所有执行器发送调度请求,请求中携带分片参数。
如下图:
每个执行器收到调度请求根据分片参数自行决定是否执行任务。
另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量。
作业分片适用哪些场景呢?
分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
使用说明:
“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
Java语言任务获取分片参数方式:
BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler":
Java/** * 2、分片广播任务 */ @XxlJob(“shardingJobHandler”) public void shardingJobHandler() throws Exception { // 分片序号,从0开始 int shardIndex = XxlJobHelper.getShardIndex(); // 分片总数 int shardTotal = XxlJobHelper.getShardTotal(); … |
---|
下边测试作业分片:
1、定义作业分片的任务方法
Java/** * 2、分片广播任务 */ @XxlJob(“shardingJobHandler”) public void shardingJobHandler() throws Exception { // 分片参数 int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); log.info(“分片参数:当前分片序号 = {}, 总分片数 = {}”, shardIndex, shardTotal); log.info(“开始执行第”+shardIndex+“批任务”); } |
---|
2、在调度中心添加任务
高级配置说明:
Plain Text - 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。 - 调度过期策略: - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间; - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间; - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略; 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行; 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败; 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务; - 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务; - 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试; |
---|
添加成功:
启动任务,观察日志
下边启动两个执行器实例,观察每个实例的执行情况
首先在nacos中配置media-service的本地优先配置:
YAML#配置本地优先 spring: cloud: config: override-none: true |
---|
将media-service启动两个实例
两个实例的在启动时注意端口不能冲突:
实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999
例如:
启动两个实例
观察任务调度中心,稍等片刻执行器有两个
观察两个执行实例的日志:
另一实例的日志如下:
从日志可以看每个实例的分片序号不同。
到此作业分片任务调试完成,此时我们可以思考:
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?
7.3 需求分析
7.3.1 作业分片方案
掌握了xxl-job的作业分片调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会重复执行任务?
执行器收到调度请求后各自己查询属于自己的任务,这样就保证了执行器之间不会重复执行任务。
xxl-job设计作业分片就是为了分布式执行任务,XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号并向执行器传递分片总数、分片序号这些参数,开发者需要自行处理分片项与真实数据的对应关系。
下图表示了多个执行器获取视频处理任务的结构:
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
7.3.1 保证任务不重复执行
通过作业分片方案保证了执行器之间分配的任务不重复,另外如果同一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
首先配置调度过期策略:
查看文档如下:
- 调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次可能会重复调度。
其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度时间到达到,此时该如何处理。
查看文档如下:
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里选择 丢弃后续调度,避免重复调度。
最后,也就是要注意保证任务处理的幂等性,什么是任务的幂等性?任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
7.3.2 业务流程
确定了分片方案,下边梳理整个视频上传及处理的业务流程。
上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:
1、任务调度中心广播作业分片。
2、执行器收到广播作业分片,从数据库读取待处理任务。
3、执行器根据任务内容从MinIO下载要处理的文件。
4、执行器启动多线程去处理任务。
5、任务处理完成,上传处理后的视频到MinIO。
6、将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
下图是待处理任务表:
完成任务历史表与结果与待处理任务相同。
7.4 查询待处理任务
7.4.1添加待处理任务
上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
根据MIME Type去判断是否是avi视频,下边列出部分MIME Type
avi视频的MIME Type是video/x-msvideo
修改文件信息入库方法,如下:
| Java@Transactional public MediaFiles addMediaFilesToDb(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName){ //根据文件名称取出媒体类型 //扩展名 String extension = null; if(objectName.indexOf(“.”)>=0){ extension = objectName.substring(objectName.lastIndexOf(“.”)); } //获取扩展名对应的媒体类型 String contentType = getMimeTypeByExtension(extension); //从数据库查询文件 MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5); if (mediaFiles == null) { mediaFiles = new MediaFiles(); //拷贝基本信息 BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles); mediaFiles.setId(fileMd5); mediaFiles.setFileId(fileMd5); mediaFiles.setCompanyId(companyId); //图片及mp4文件设置url if(contentType.indexOf(“image”)>=0 || contentType.indexOf(“mp4”)>=0){ mediaFiles.setUrl(“/” + bucket + “/” + objectName); } mediaFiles.setBucket(bucket); mediaFiles.setFilePath(objectName); mediaFiles.setCreateDate(LocalDateTime.now()); mediaFiles.setAuditStatus(“002003”); mediaFiles.setStatus(“1”); //保存文件信息到文件表 int insert = mediaFilesMapper.insert(mediaFiles); if (insert < 0) { XueChengPlusException.cast(“保存文件信息失败”); } //如果是avi视频添加到视频待处理表 if(contentType.equals(“video/x-msvideo”)){ MediaProcess mediaProcess = new MediaProcess(); BeanUtils.copyProperties(mediaFiles,mediaProcess); mediaProcess.setStatus(“1”);//未处理 mediaProcessMapper.insert(mediaProcess); } } return mediaFiles; } |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
7.4.2 查询待处理任务
如何保证查询到的待处理视频记录不重复?
编写根据分片参数获取待处理任务的DAO方法,定义DAO接口如下:
Javapublic interface MediaProcessMapper extends BaseMapper { /** * @description 根据分片参数获取待处理任务 * @param shardTotal 分片总数 * @param shardindex 分片序号 * @param count 任务数 * @return java.util.List<com.xuecheng.media.model.po.MediaProcess> * @author Mr.M * @date 2022/9/14 8:54 / @Select("SELECT t. FROM media_process t WHERE t.id % #{shardTotal} = #{shardindex} and t.status=‘1’ limit #{count}") List selectListByShardIndex(@Param(“shardTotal”) int shardTotal, @Param(“shardindex”) int shardindex, @Param(“count”) int count); } |
---|
定义Service接口,查询待处理
Javapackage com.xuecheng.media.service; import com.xuecheng.base.model.PageParams; import com.xuecheng.base.model.PageResult; import com.xuecheng.base.model.RestResponse; import com.xuecheng.media.model.dto.QueryMediaParamsDto; import com.xuecheng.media.model.dto.UploadFileParamsDto; import com.xuecheng.media.model.dto.UploadFileResultDto; import com.xuecheng.media.model.po.MediaFiles; import com.xuecheng.media.model.po.MediaProcess; import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.util.List; /** * @author Mr.M * @version 1.0 * @description 媒资文件处理业务方法 * @date 2022/9/10 8:55 / public interface MediaFileProcessService { /* * @description 获取待处理任务 * @param shardIndex 分片序号 * @param shardTotal 分片总数 * @param count 获取记录数 * @return java.util.List<com.xuecheng.media.model.po.MediaProcess> * @author Mr.M * @date 2022/9/14 14:49 */ public List getMediaProcessList(int shardIndex,int shardTotal,int count); } |
---|
service接口实现
Javapackage com.xuecheng.media.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.xuecheng.media.mapper.MediaFilesMapper; import com.xuecheng.media.mapper.MediaProcessHistoryMapper; import com.xuecheng.media.mapper.MediaProcessMapper; import com.xuecheng.media.model.po.MediaFiles; import com.xuecheng.media.model.po.MediaProcess; import com.xuecheng.media.model.po.MediaProcessHistory; import com.xuecheng.media.service.MediaFileProcessService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureOrder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.List; /** * @description TODO *** @author Mr.M * @date 2022/9/14 14:41 * @version 1.0 */ @Slf4j @Service public class MediaFileProcessServiceImpl implements MediaFileProcessService { @Autowired MediaFilesMapper mediaFilesMapper; @Autowired MediaProcessMapper mediaProcessMapper; @Override public List getMediaProcessList(int shardIndex, int shardTotal, int count) { List mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count); return mediaProcesses; } } |
---|
7.5 更新任务状态
任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。
在Service接口添加方法
Java/** * @description 保存任务结果 * @param taskId 任务id * @param status 任务状态 * @param fileId 文件id * @param url url * @param errorMsg 错误信息 * @return void * @author Mr.M * @date 2022/10/15 11:29 */ void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg); |
---|
service接口方法实现如下:
Javapackage com.xuecheng.media.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.xuecheng.media.mapper.MediaFilesMapper; import com.xuecheng.media.mapper.MediaProcessHistoryMapper; import com.xuecheng.media.mapper.MediaProcessMapper; import com.xuecheng.media.model.po.MediaFiles; import com.xuecheng.media.model.po.MediaProcess; import com.xuecheng.media.model.po.MediaProcessHistory; import com.xuecheng.media.service.MediaFileProcessService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureOrder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.List; /** * @description TODO *** @author Mr.M * @date 2022/9/14 14:41 * @version 1.0 */ @Slf4j @Service public class MediaFileProcessServiceImpl implements MediaFileProcessService { @Autowired MediaFilesMapper mediaFilesMapper; @Autowired MediaProcessMapper mediaProcessMapper; @Autowired MediaProcessHistoryMapper mediaProcessHistoryMapper; @Transactional @Override public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) { //查出任务,如果不存在则直接返回 MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId); if(mediaProcess == null){ return ; } //处理失败,更新任务处理结果 LambdaQueryWrapper queryWrapperById = new LambdaQueryWrapper().eq(MediaProcess::getId, taskId); if(status.equals(“3”)){ MediaProcess mediaProcess_u = new MediaProcess(); mediaProcess_u.setStatus(“3”); mediaProcess_u.setErrormsg(errorMsg); mediaProcessMapper.update(mediaProcess_u,queryWrapperById); return ; } MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId); if(mediaFiles!=null){ mediaFiles.setUrl(url); mediaFilesMapper.updateById(mediaFiles); } //处理成功,更新url和状态 mediaProcess.setUrl(url); mediaProcess.setStatus(“2”); mediaProcess.setFinishDate(LocalDateTime.now()); mediaProcessMapper.updateById(mediaProcess); //添加到历史记录 MediaProcessHistory mediaProcessHistory = new MediaProcessHistory(); BeanUtils.copyProperties(mediaProcess, mediaProcessHistory); mediaProcessHistoryMapper.insert(mediaProcessHistory); //删除mediaProcess mediaProcessMapper.deleteById(mediaProcess.getId()); } @Override public List getMediaProcessList(int shardIndex, int shardTotal, int count) { List mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count); return mediaProcesses; } } |
---|