【XXL-JOB】XXL-JOB定时处理视频转码
文章目录
- 【XXL-JOB】XXL-JOB定时处理视频转码
- 1. 准备工作
- 1.1 高级配置
- 1.2 分片广播
- 2. 需求分析
- 2.1 作业分片方案
- 2.2 保证任务不重复执行
- 2.2.1 保证幂等性
- 3. 视频处理业务流程
- 3.1 添加待处理任务
- 3.2 查询待处理任务
- 3.3 更新任务状态
- 3.4 工具介绍
- 3.4.1 什么是视频编码
- 3.4.2 ffmpeg的基本使用
- 3.5 执行任务
- 4. 相关面试题
1. 准备工作
XXL-JOB的搭建可以参考:参考博客
1.1 高级配置
在介绍分片广播之前,我们先熟悉一下执行器在集群部署下调度中心的调度策略。
![image-20230304173459457](http://blog.tempeisite.xyz/blog/image-20230304173459457.png
任务的路由策略:
- FIRST(第一个):固定选择第一台机器。
- LAST(最后一个):固定选择最后一台机器。
- ROUND(轮询):按顺序依次选择每台机器。
- RANDOM(随机):随机选择一台机器。
- CONSISTENT_HASH(一致性HASH):每个任务按照
HASH
算法固定选择一台机器,且所有任务均匀散列在不同机器上。 - LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举。
- LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举。
- FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度。
- BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度。
- SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务。
子任务ID:
- 每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
阻塞处理策略:(调度过于密集执行器来不及处理时的处理策略)
- 单击串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
- 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
- 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
**任务超时时间:**支持自定义任务超时时间,任务运行超时将会主动中断任务。
**失败重试次数:**支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试。
1.2 分片广播
分片是指调度中心将集群中的执行器标上序号,0,1,2,3…,广播是指每次调度会向集群中所有执行器发送调度请求,请求中携带分片参数。
每个执行器收到调度请求根据分片参数自行决定是否执行任务。
另外xxl-job还支持动态分片,当执行器数量有变更时,调度中心会动态修改分片的数量。
适用场景:
- 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
- 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
使用说明:
“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
Java语言任务获取分片参数方式:
BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务:
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片序号,从0开始
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
//......
}
1)在调度中心添加任务
添加成功:
启动任务,观察日志:
2. 需求分析
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?
2.1 作业分片方案
执行器收到调度请求后各自查询属于自己的任务,这样就保证了执行器之间不会重复执行任务。
xxl-job设计作业分片就是为了分布式执行任务,XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号并向执行器传递分片总数、分片序号这些参数,开发者需要自行处理分片项与真实数据的对应关系。
下图表示了多个执行器获取视频处理任务的结构:
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
2.2 保证任务不重复执行
通过作业分片方案保证了执行器之间分配的任务不重复,另外如果同一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
我们在调度中心中编辑任务,设置为调度过期策略为“忽略”,设置阻塞处理策略为“丢弃后续调度”。
这样就可以避免重复调度了。
不过,我们还需要注意保证任务处理的幂等性。
2.2.1 保证幂等性
任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。执行器接收调度请求去执行任务,要有办法去判断该任务是否处理完成,如果处理完则不再处理,即使重复调度处理相同的任务也不能重复处理相同的视频。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
3. 视频处理业务流程
既然确定了分片方案,下边就可以梳理整个视频上传及处理的业务流程了。
上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:
- 任务调度中心广播作业分片。
- 执行器收到广播作业分片,从数据库读取待处理任务。
- 执行器根据任务内容从MinIO下载要处理的文件。
- 执行器启动多线程去处理任务。
- 任务处理完成,上传处理后的视频到MinIO。
- 将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
如下图是待处理任务表:
完成任务历史表的结构与待处理任务表相同。
3.1 添加待处理任务
上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
根据MIME Type去判断是否是avi视频,下边列出部分MIME Type
avi视频的MIME Type是video/x-msvideo。
在视频上传的时候,我们同时也要将视频的信息录入数据库:
重点关注第三步,在第三步将视频信息存入待处理表中。
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileId, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
//1.封装数据
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileId);
mediaFiles.setFileId(fileId);
mediaFiles.setCompanyId(companyId);
mediaFiles.setFilename(fileId);
mediaFiles.setBucket(bucket);
mediaFiles.setFilePath(objectName);
//2.图片、MP4视频可以设置url
//2.1获取扩展名
String extension = null;
String filename = uploadFileParamsDto.getFilename();
if (StringUtils.isNotEmpty(filename) && filename.indexOf(".") >= 0) {
extension = filename.substring(filename.lastIndexOf("."));
}
//2.2媒体类型
String mimeType = getMimeTypeByextension(extension);
if (mimeType.indexOf("image") >= 0 || mimeType.indexOf("mp4") >= 0) {
mediaFiles.setUrl("/" + bucket + "/" + objectName);
}
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus("1");
mediaFiles.setAuditStatus("002003");
//2.3插入文件表
mediaFilesMapper.insert(mediaFiles);
//3.对avi视频添加到待处理任务表
if (mimeType.equals("video/x-msvideo")) {
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaFiles, mediaProcess);
mediaProcess.setStatus("1");//未处理状态
mediaProcessMapper.insert(mediaProcess);
}
}
return mediaFiles;
}
3.2 查询待处理任务
我们如何保证查询到的待处理视频记录不重复?
我们可以根据调度中心传递的分片参数和执行器总数设定特定执行器执行特定任务。
编写根据分片参数获取待处理任务的dao方法:
/**
* @description 根据分片参数获取待处理任务
* @param shardTotal 分片总数
* @param shardindex 分片序号
* @param count 任务数
* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
*/
@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} limit #{count}")
public List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal,
@Param("shardIndex") int shardIndex,
@Param("count") int count);
定义service接口和service实现方法:
我这里跳过定义接口,直接到service实现方法:
@Override
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
return mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
}
3.3 更新任务状态
我们先跳过视频处理那一部,假设我们现在已经处理完视频了,我们需要对任务进行一系列的操作。
比如:
- 根据任务是否成功去修改任务的状态
- 如果成功,则修改待处理表的状态,将媒资表中的url更新一下,将任务添加进任务历史表中,最后删除待处理表中的任务。
- 如果失败,则修改待处理表的状态。
@Override
@Transactional
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
//查询这个任务
MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);
if (mediaProcess == null) {
log.debug("更新任务状态时此任务:{}为空", taskId);
return;
}
LambdaQueryWrapper<MediaProcess> queryWrapper = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);
//判断是否成功
if ("3".equals(status)) {
//任务失败
MediaProcess mediaProcess_u = new MediaProcess();
mediaProcess_u.setStatus("3");//处理失败
mediaProcess_u.setErrormsg(errorMsg);
//这里修改是为了保证幂等性,防止一个任务重复执行
mediaProcessMapper.update(mediaProcess_u, queryWrapper);
return;
}
//处理成功,更新状态
if ("2".equals(status)) {
//更新待处理任务表
mediaProcess.setStatus("2");
mediaProcess.setUrl(url);
mediaProcess.setFinishDate(LocalDateTime.now());
//这里修改是为了保证幂等性,防止一个任务重复执行
mediaProcessMapper.updateById(mediaProcess);
//更新文件表中的url字段
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);
mediaFiles.setUrl(url);
mediaFilesMapper.updateById(mediaFiles);
}
//如果成功将任务添加到历史纪录表
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);
mediaProcessHistoryMapper.insert(mediaProcessHistory);
//如果成功则将待处理表的记录删除
mediaProcessMapper.deleteById(taskId);
}
3.4 工具介绍
在处理视频之间先了解一下什么是视频编码。
3.4.1 什么是视频编码
首先我们要分清文件格式和编码格式:
- 文件格式:是指.mp4、.avi、.rmvb等 这些不同扩展名的视频文件的文件格式 ,视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频。
- 音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如:一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,音频原来为c,通过编码后变为d。
3.4.2 ffmpeg的基本使用
FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。
下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows
我们可通过如下命令将一个.avi后缀的视频文件转成.mp4文件:
ffmpeg.exe -i 1.avi 1.mp4
3.5 执行任务
视频处理采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过服务器的cpu核数。
逻辑:
- 调用service方法,获得待处理任务列表
- 创建一个大小和待处理任务表的大小相同的线程池
- 循环待处理任务列表,每个任务交给一个线程执行。
定义VideoTask类:
@Component
@Slf4j
public class VideoTask {
@Autowired
private MediaFileProcessService mediaFileProcessService;
@Autowired
private MediaFileService mediaFileService;
@Value("${videoprocess.ffmpegpath}")
private String ffmpegpath;
/**
* 2、分片广播任务
*/
@XxlJob("videoJobHander")
public void videoJobHander() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
//查询待处理任务,一次处理的任务数和cpu核心数一样
List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, 2);
if (CollectionUtils.isEmpty(mediaProcessList)) {
log.debug("查询到的待处理任务为0");
return;
}
//要处理的任务数
int size = mediaProcessList.size();
//创建size个线程数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
//遍历mediaProcessList,将任务放入线程池
mediaProcessList.forEach(mediaProcess -> {
//任务执行逻辑
threadPool.execute(() -> {
//视频处理状态
String status = mediaProcess.getStatus();
//保证幂等性
//"2"表示处理完成
if ("2".equals(status)) {
log.debug("视频已经处理不用再次处理,视频信息:{}", mediaProcess);
//计数器-1
countDownLatch.countDown();
return;
}
//桶
String bucket = mediaProcess.getBucket();
//原视频的MD5值
String fileId = mediaProcess.getFileId();
//存储路径
String filePath = mediaProcess.getFilePath();
//原视频文件名称
String filename = mediaProcess.getFilename();
//将要处理的文件下载到服务器上
File originalFile = null;
//处理结束的视频文件
File mp4File = null;
try {
originalFile = File.createTempFile("original", null);
mp4File = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("处理视频前创建临时文件失效");
//计数器-1
countDownLatch.countDown();
return;
}
try {
//将原视频下载到本地(avi)
originalFile = mediaFileService.downloadFileFromMinIO(originalFile, bucket, filePath);
} catch (Exception e) {
log.error("下载原始文件过程出错:{},文件信息:{}", e.getMessage(), mediaProcess);
//计数器-1
countDownLatch.countDown();
return;
}
//调用工具类将avi转成MP4
//转换后MP4文件的名称
String mp4_name = fileId + ".mp4";
//转换后MP4文件的路径
String mp4_path = mp4File.getAbsolutePath();
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4_name, mp4_path);
//开始视频转换,成功将返回success,失败则返回失败原因
String result = videoUtil.generateMp4();
//默认处理失败
String statusNew = "3";
//最终访问路径
String url = null;
if ("success".equals(result)) {
//转换成功
//上传至minIO的路径
String objectName = getFilePathByMd5(fileId, ".mp4");
try {
//上传至minio
mediaFileService.addMediaFilesToMinIO(mp4_path, bucket, objectName);
//处理成功
statusNew = "2";
url = "/" + bucket + "/" + objectName;
} catch (Exception e) {
log.debug("上传文件出错:{}", e.getMessage());
//计数器-1
countDownLatch.countDown();
return;
}
}
try {
//记录任务处理结果
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), statusNew, fileId, url, result);
//删除临时文件 originalFile 和 mp4File
if(originalFile.exists()){
originalFile.delete();
}
if(mp4File.exists()){
mp4File.delete();
}
} catch (Exception e) {
log.debug("保存任务处理结果出错:{}",e.getMessage());
//计数器-1
countDownLatch.countDown();
return;
}
//计数器-1
countDownLatch.countDown();
});
});
//阻塞是为了使线程池中的任务都完成,不阻塞的话方法一下子就结束了,任务也没时间执行
//阻塞到任务执行完成,当"countDownLatch"计数器归零,解除阻塞
//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(30,TimeUnit.MINUTES);
}
private String getFilePathByMd5(String fileMd5, String fileExt) {
//将文件MD5值的第一位数作为一级目录,第二位数作为二级目录
return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;
}
}
至此,视频处理业务就完成了。
4. 相关面试题
XXL-JOB的工作原理是什么?XXL-JOB是什么,怎么工作?
答:XXL-JOB分布式任务调度服务由调度中心和执行器组成,调度中心负责按任务调度策略向执行器下发任务,执行器负责接收任务执行任务。
- 手续爱你部署并启动xxl-job调度中心。(一个java工程)
- 然后在我们的微服务中添加xxl-job依赖,在微服务中配置执行器。
- 启动微服务,执行器向调度中心上报自己。
- 在微服务中写一个任务方法并用xxl-job的注解去标记执行任务的方法名称。
- 在调度中心配置任务调度策略。
- 在调度中心启动任务。
- 调度中心根据任务调度策略,到达时间就开始下发任务给执行器。
- 执行器收到任务就开始执行任务。
如何保证任务不重复执行?
- 调度中心按分片广播的方式下发任务。
- 执行器收到作业分片广播的参数(分片总数和分片序号),计算任务id除以分片总数得到一个余数,如果余数等于分片序号这时就去执行这个任务,保证了不同执行器执行不同的任务。
- 配置调度过期策略为“忽略”,避免同一个执行器多次重复执行同一个任务。
- 配置任务阻塞处理策略为“丢弃后续调度”,注意:丢弃也没事,下一次调度又可以执行。
- 另外还要保证任务处理的幂等性,执行过的任务可以打一个状态标记已完成,下次再调度执行该任务判断该任务已完成就不再执行。
任务幂等性如何保证?
幂等性描述了一次和多次请求某一资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用方案:
- 数据库约束,比如:唯一索引,主键。同一个主键不可能两次都插入成功。
- 乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
- 唯一序列号,请求前生成唯一的序列号,携带序列号去请求,执行时在redis记录该序列号表示以该序列号的请求执行过了,如果相同的序列号再次来执行说明是重复执行。