项目难点——【3】分布式任务调度、线程池使用、视频转换
我们有时候在处理视频文件的时候会遇到视频格式转换问题。
1 分布式任务调度
在项目开发中我们想要提升我们项目响应的速度或者想要服务器高效处理一批任务,这个时候就有两种方式:
- 多线程:充分利用单机资源
- 分布式+多线程:充分利用多台机器,并且每台计算机使用多线程处理
方案2可扩展性更强,并且方案2是一种分布式任务调度的处理方案。
1.1 概念
我们可以先思考一下下面业务场景的解决方案:
- 某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券。
- 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
- 某电商平台每天凌晨3点,要对订单中的无效订单进行清理。
- 12306网站会根据车次不同,设置几个时间点分批次放票。
- 商品成功发货后,需要向客户发送短信提醒。
类似的场景还有很多,我们该如何实现?
以上这些场景,就是任务调度所需要解决的问题。
任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,
- 由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:
分布式调度要实现目标:
- 并行任务调度
- 高可用
- 弹性扩容
- 任务管理与监测
- 避免任务重复执行
1.2 实现方式
①多线程
开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。
public static void main(String[] args) {
//任务执行间隔时间
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
public void run() {
while (true) {
//TODO:something
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
thread.start();
}
②Timer、ScheduledExecutor
Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor.
- Timer
Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。
public static void main(String[] args){
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
//TODO:something
}
}, 1000, 2000); //1秒后开始调度,每2秒执行一次
}
- ScheduledExecutor
public static void main(String [] agrs){
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
//TODO:something
System.out.println("todo something");
}
}, 1,
2, TimeUnit.SECONDS);
}
③第三方框(Quartz、XXL-Job)
Quartz
public static void main(String [] agrs) throws SchedulerException {
//创建一个Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//创建JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName","jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
//创建触发的CronTrigger 支持按日历调度
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();
//创建触发的SimpleTrigger 简单的间隔调度
/*SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","triggerGroupName")
.startNow()
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();*/
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();
}
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
System.out.println("todo something");
}
}
XXL-JOB
XXL-JOB主要有调度中心、执行器、任务
XXL-JOB快速入门
XXL-JOB执行流程:
1. 任务执行器根据配置的调度中心地址,自动注册到调度中心【执行器注册】
2. 到达任务触发条件,调度中心下发任务【下发任务】
3. 执行器基于线程池执行任务,并把结果放入内存队列中、把执行日志写入
日志文件中【记录执行结果】
4. 执行器消费内存队列中的执行结果,主动上报调度中心【上报调度中心】
5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器
读取任务日志文件并返回日志详情【日志查看】
拓展:分片广播策略
掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。
分片是指是调度中心将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中所有执行器发送调度请求,请求中携带分片参数。
分片广播:执行器并行处理数据,并且不重复,压力均衡分散到每个执行器
作业分片适用哪些场景呢?
• 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
• 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
2 视频转换及线程池使用
2.1 视频编码
- 文件格式:是指.mp4、.avi、.rmvb等 这些不同扩展名的视频文件的文件格式 ,视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频。
- 音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如:一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,音频原来为c,通过编码后变为d。
- 常见的音视频编码格式:MPEG系列、H.26X系列
2.2 FFmpeg介绍与使用
FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。
下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows
基本命令使用范例:
将一个.avi文件转成mp4、mp3、gif等。
比如我们将nacos.avi文件转成mp4,运行如下命令:
ffmpeg -i nacos.avi nacos.mp4
转成mp3:ffmpeg -i nacos.avi nacos.mp3
转成gif:ffmpeg -i nacos.avi nacos.gif
2.3 分布式处理视频
①导入依赖
<!-- fast Json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<!-- servlet Api 依赖 -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- 通用组件 -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>
②编写视频处理工具类
public class Mp4VideoUtil extends VideoUtil {
String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置
String video_path = "D:\\BaiduNetdiskDownload\\test1.avi";
String mp4_name = "test1.mp4";
String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/";
public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){
super(ffmpeg_path);
this.ffmpeg_path = ffmpeg_path;
this.video_path = video_path;
this.mp4_name = mp4_name;
this.mp4folder_path = mp4folder_path;
}
//清除已生成的mp4
private void clear_mp4(String mp4_path){
//删除原来已经生成的m3u8及ts文件
File mp4File = new File(mp4_path);
if(mp4File.exists() && mp4File.isFile()){
mp4File.delete();
}
}
/**
* 视频编码,生成mp4文件
* @return 成功返回success,失败返回控制台日志
*/
public String generateMp4(){
//清除已生成的mp4
// clear_mp4(mp4folder_path+mp4_name);
clear_mp4(mp4folder_path);
/*
ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4
*/
List<String> commend = new ArrayList<String>();
//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");
commend.add(ffmpeg_path);
commend.add("-i");
// commend.add("D:\\BaiduNetdiskDownload\\test1.avi");
commend.add(video_path);
commend.add("-c:v");
commend.add("libx264");
commend.add("-y");//覆盖输出文件
commend.add("-s");
commend.add("1280x720");
commend.add("-pix_fmt");
commend.add("yuv420p");
commend.add("-b:a");
commend.add("63k");
commend.add("-b:v");
commend.add("753k");
commend.add("-r");
commend.add("18");
// commend.add(mp4folder_path + mp4_name );
commend.add(mp4folder_path );
String outstring = null;
try {
ProcessBuilder builder = new ProcessBuilder();
builder.command(commend);
//将标准输入流和错误输入流合并,通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p = builder.start();
outstring = waitFor(p);
} catch (Exception ex) {
ex.printStackTrace();
}
// Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);
Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);
if(!check_video_time){
return outstring;
}else{
return "success";
}
}
public static void main(String[] args) throws IOException {
// ProcessBuilder builder = new ProcessBuilder();
// builder.command("C:\\Program Files (x86)\\Tencent\\QQ\\Bin\\QQScLauncher.exe");
// //将标准输入流和错误输入流合并,通过标准输入流程读取信息
// builder.redirectErrorStream(true);
// Process p = builder.start();
//ffmpeg的路径
String ffmpeg_path = "D:\\soft\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "D:\\develop\\bigfile_test\\nacos_01.avi";
//转换后mp4文件的名称
String mp4_name = "nacos_01.mp4";
//转换后mp4文件的路径
String mp4_path = "D:\\develop\\bigfile_test\\nacos_01.mp4";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);
}
}
③编写视频处理任务VideoTask【线程池】
@Slf4j
@Component
public class VideoTask {
@Autowired
MediaFileProcessService mediaFileProcessService;
@Autowired
MediaFileService mediaFileService;
@Value("${videoprocess.ffmpegpath}")
String ffmpegpath;
/**
* 视频处理任务
*/
@XxlJob("videoJobHander")
public void videoJobHander() throws Exception {
// 分片序号,从0开始
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
//查询待处理任务,一次处理的任务数和cpu核心数一样
List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, 2);
if(mediaProcessList==null || mediaProcessList.size()<=0){
log.debug("查询到的待处理视频任务为0");
return ;
}
//要处理的任务数
int size = mediaProcessList.size();
//创建size个线程数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
//遍历mediaProcessList,将任务放入线程池
mediaProcessList.forEach(mediaProcess -> {
threadPool.execute(()->{
//视频处理状态
String status = mediaProcess.getStatus();
//保证幂等性
if("2".equals(status)){
log.debug("视频已经处理不用再次处理,视频信息:{}",mediaProcess);
countDownLatch.countDown();//计数器减1
return ;
}
//桶
String bucket = mediaProcess.getBucket();
//存储路径
String filePath = mediaProcess.getFilePath();
//原始视频的md5值
String fileId = mediaProcess.getFileId();
//原始文件名称
String filename = mediaProcess.getFilename();
//将要处理的文件下载到服务器上
File originalFile = null;
//处理结束的视频文件
File mp4File = null;
try {
originalFile = File.createTempFile("original", null);
mp4File = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("处理视频前创建临时文件失败");
countDownLatch.countDown();//计数器减1
return;
}
try {
//将原始视频从MinIO下载到本地
mediaFileService.downloadFileFromMinIO(originalFile,bucket,filePath);
} catch (Exception e) {
log.error("下载源始文件过程出错:{},文件信息:{}",e.getMessage(),mediaProcess);
countDownLatch.countDown();//计数器减1
return ;
}
//调用工具类将avi转成mp4
//转换后mp4文件的名称
String mp4_name = fileId+".mp4";
//转换后mp4文件的路径
String mp4_path = mp4File.getAbsolutePath();
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath,originalFile.getAbsolutePath(),mp4_name,mp4_path);
//开始视频转换,成功将返回success,失败返回失败原因
String result = videoUtil.generateMp4();
String statusNew ="3";
String url = null;//最终访问路径
if("success".equals(result)){
//转换成功
//上传到minio的路径
String objectName = getFilePath(fileId, ".mp4");
try {
//上传到minIO
mediaFileService.addMediaFilesToMinIO(mp4_path,bucket,objectName);
} catch (Exception e) {
log.debug("上传文件出错:{}",e.getMessage());
countDownLatch.countDown();//计数器减1
return ;
}
statusNew = "2";//处理成功
url = "/"+bucket+"/"+objectName;
}
try {
//记录任务处理结果
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(),statusNew,fileId,url,result);
} catch (Exception e) {
log.debug("保存任务处理结果出错:{}",e.getMessage());
countDownLatch.countDown();//计数器减1
return ;
}
//计数器减去1
countDownLatch.countDown();
});
});
//阻塞到任务执行完成,当countDownLatch计数器归零,这里的阻塞解除
//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(30, TimeUnit.MINUTES);
}
private String getFilePath(String fileMd5,String fileExt){
return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
}
}
拓展:不建议使用Executors
阿里巴巴开发手册中明确表示不建议使用Executor来创建线程
- Executors.newCachedThreadPool()
最大线程数是Integer.MAX_VALUE,任务队列是SynchronousQueue,对任务来者不拒,创建线程是一个很耗性能的事,创建过多线程易导致OOM
- Executors.newFixedThreadPool(1)
这个线程池倒是没有上个线程池豪横了, 它定死了线程数量, 所以线程数量是不会超出的,但是它的任务队列是无界的LinkedBlockingQueue, 对于加进来的任务处理不过来就会存入任务队列中, 并且无限制的存入队列。 这个线程池感觉就是家里有地, 无论来多少货都往里面装。【这个线程池如果使用不当很容易导致OOM】
- Executors.newSingleThreadExecutor()
这个线程池只有一个线程, 比newFixedThreadPool还穷, 但是任务队列和上面一样, 没有限制, 很容易就使用不当导致OOM
- Executors.newScheduledThreadPool(2)
这个是定时任务的线程池, 没有定义线程创建数量的上线, 同时任务队列也没有定义上限, 如果前一次定时任务还没有完成, 后一个定时任务的运行时间到了, 它也会运行, 线程不够就创建。 这样如果定时任务运行的时间过长, 就会导致前后两个定时任务同时执行,如果他们之间有锁,还有可能出现死锁, 此时灾难就发生了。
参考:https://blog.csdn.net/leisurelen/article/details/107872827