【微服务 | 学成在线】项目易错重难点分析(媒资管理模块篇·下)

news2024/11/23 12:02:03

文章目录

  • 视频处理
    • 视频编码和文件格式
    • 文件格式和视频编码方式区别
    • ProcessBuilder
    • 分布式任务调度
    • XXL-JOB
    • XXL-JOB配置
    • XXL-JOB使用
    • 分片广播
    • 技术方案
    • 视频处理方案及实现思路
    • 分布式锁

视频处理

视频编码和文件格式

什么是视频编码?
在这里插入图片描述

同时我们还要知道我们为什么要对视频进行编码,如果我们不编码我们的视频文件会变得非常庞大,而通过视频编码可以将我们的视频文件大小缩小为原来的十分之一、百分之一,亦或是千分之一。

首先我们要分清文件格式和编码格式:

  • 文件格式:是指.mp4、.avi、.rmvb等 这些不同扩展名的视频文件的文件格式 ,视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频。
  • 音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如:一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,音频原来为c,通过编码后变为d。

音视频编码格式各类繁多,主要有几下几类:

  • MPEG系列:(由ISO[国际标准组织机构]下属的MPEG[运动图象专家组]开发 )视频编码方面主要是Mpeg1(vcd用的就是它)、Mpeg2(DVD使用)、Mpeg4(的DVDRIP使用的都是它的变种,如:divx,xvid等)、Mpeg4 AVC(正热门);音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(大名鼎鼎的mp3)、MPEG-2 AAC 、MPEG-4 AAC等等。注意:DVD音频没有采用Mpeg的。
  • H.26X系列:(由ITU[国际电传视讯联盟]主导,侧重网络传输,注意:只是视频编码)包括H.261、H.262、H.263、H.263+、H.263++、H.264(就是MPEG4 AVC-合作的结晶)。目前最常用的编码标准是视频H.264,音频AAC。

文件格式和视频编码方式区别

我的MP4分明写着能播放AVI吗?为什么这一个AVI文件就播放不了?
我的MP4支持Mpeg-4啊,为什么Mp4文件不能播放呢?
通过本节你就可以知道这些问题的答案

视频文件格式一般情况下从视频文件的后缀名就能看出来,比如AVI,Mp4,3gp,mov,rmvb等等。这些格式又叫做容器格式(container format),顾名思义就是用来装东西的,你可以把它想象成为一个便当盒,或者野餐篮。

通常我们从网上下载的电影都是有声音的,所以容器格式中一般至少包含有两个数据流(stream),一个视频流,一个音频流,就好比是一个便当盒里装着的配菜和米饭。

视频编码方式则是指容器格式中视频流数据的压缩编码方式,例如Mpeg-4,H.264,H.263,等等。而视频数据采用了何种编码方式是无法单单从文件格式的后缀上看出来的。就是说你无法从一个盖着盖子的便当盒外面看出里面装了什么配菜。

在这里插入图片描述

如果你想播放一个视频文件,第一步你的播放器(不论是软件的还是硬件的)要能够解析相应的容器格式,这一步也叫做解复用(demux),第二步你的播放器要能够解码其中所包含视频流和音频流。这样影片才能播放出来。

打个不太恰当的比方,播放器好比你雇用的一个试菜员,由他来品尝便当(视频文件),然后告诉你便当里装了什么东西。

所以试菜员首先要懂得如何打开便当盒,还要知道吃的出来便当盒里装了什么配菜,这样你才能获得你想要的信息。

回过头来看前面的两个问题,用以上的比喻翻译一下。

  • 问题A,我的试菜员能打开AVI这种便当的,为什么我不能知道里面装了什么?

    • 回答很简单,虽然他能够打开便当,但是吃不出里面的东西是什么。理论上没有一个播放器能够播放所有的AVI格式的电影,因为你不知道我会往里面放什么配菜。
  • 问题B,我的试菜员吃过Mpeg-4这种牛排阿,为什么不能打开Mp4这种便当盒呢?

    • 这个问题通过翻译之后看起来已经不是问题了,Mpeg-4是视频编码方式,而Mp4是容器格式,两者本来就不是一个范畴里的东西。

总结:

  • 视频文件格式是指用于存储和传输视频数据的标准格式
  • 视频编码格式指的是将源视频数据进行压缩和编码的方式和标准

那么这两者有什么对应关系吗?

视频文件格式和视频编码格式之间有一定的对应关系,但并不完全一致。

一般来说,视频文件格式包含视频编码格式的信息。视频文件中存储的视频数据和音频数据通常是压缩编码过的数据,其所使用的编码格式可以从文件本身的信息中得知。例如,MP4格式文件通常采用H.264或VP9编码,AVI格式文件通常采用MPEG-4等编码。通常情况下同一种视频文件格式可以支持多种视频编码格式。

此外,不同的视频文件格式也会有对不同的视频编码格式的兼容性。例如,某些视频播放器可能只支持特定的视频文件格式和特定的视频编码格式,对于不兼容的文件或编码格式则无法进行播放。因此,在选择视频文件格式和视频编码格式时需要关注其兼容性和支持度。

总之,视频文件格式和视频编码格式虽然有一定的对应关系,但两者是不同的概念,需要根据实际需求进行选择。

ProcessBuilder

ProcessBuilder类是J2SE 1.5在java.lang中新添加的一个新类,此类用于创建操作系统进程,它提供一种启动和管理进程(也就是应用程序)的方法。在J2SE 1.5之前,都是由Process类处来实现进程的控制管理

每个 ProcessBuilder 实例管理一个进程属性集。它的start() 方法利用这些属性创建一个新的 Process 实例。start() 方法可以从同一实例重复调用,以利用相同的或相关的属性创建新的子进程。

我们来看一个例子:

下边的代码运行本机安装的QQ软件

ProcessBuilder builder = new ProcessBuilder();
builder.command("C:\\Program Files (x86)\\Tencent\\QQ\\Bin\\QQScLauncher.exe");
//将标准输入流和错误输入流合并,通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p = builder.start();

分布式任务调度

什么是任务调度?

我们可以先思考一下下面业务场景的解决方案:

  • 某电商系统需要在每天上午 10 点,下午 3 点,晚上 8 点发放一批优惠券。
  • 某银行系统需要在信用卡到期还款日的前三天进行短信提醒。
  • 某财务系统需要在每天凌晨 0:10 结算前一天的财务数据,统计汇总。
  • 12306 会根据车次的不同,而设置某几个时间点进行分批放票。
  • 某网站为了实现天气实时展示,每隔 5 分钟就去天气服务器获取最新的实时天气信息。

以上场景就是任务调度所需要解决的问题。

任务调度是指系统为了自动完成特定任务,在约定的特定时刻去执行任务的过程。有了任务调度即可解放更多的人力由系统自动去执行任务

任务调度如何实现?

多线程方式实现

学过多线程的同学,可能会想到,我们可以开启一个线程,每 sleep 一段时间,就去检查是否已到预期执行时间。

以下代码简单实现了任务调度的功能:

public static void main(String[] args) {    
  	//任务执行间隔时间
    final long timeInterval = 1000;
    Runnable runnable = new Runnable() {
        public void run() {
            while (true) {
                //TODO:something
                try {
                    Thread.sleep(timeInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    Thread thread = new Thread(runnable);
    thread.start();
}

上面的代码实现了按一定的间隔时间执行任务调度的功能。

JDK 也为我们提供了相关支持,如 Timer、ScheduledExecutor,下边我们了解下:

Timer 方式实现

public static void main(String[] args){  
   	Timer timer = new Timer();  
   	timer.schedule(new TimerTask(){
        @Override  
        public void run() {  
           //TODO:something
        }  
	}, 1000, 2000);  //1秒后开始调度,每2秒执行一次
}

Timer 的优点在于简单易用,每个 Timer 对应一个线程,因此可以同时启动多个 Timer 并行执行多个任务,同一个 Timer 中的任务是串行执行。

ScheduledExecutor 方式实现

public static void main(String [] agrs){
    ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
    service.scheduleAtFixedRate(
            new Runnable() {
                @Override
                public void run() {
                    //TODO:something
                    System.out.println("todo something");
                }
            }, 1,
            2, TimeUnit.SECONDS);
}

Java 5 推出了基于线程池设计的 ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。

Timer 和 ScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨 1 点执行任务、复杂调度任务的管理、任务间传递数据等等。

Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行。Quartz 支持简单的按时间间隔调度、还支持按日历调度方式,通过设置 CronTrigger 表达式(包括:秒、分、时、日、月、周、年)进行任务调度。

第三方 Quartz 方式实现

public static void main(String [] agrs) throws SchedulerException {
    //创建一个Scheduler
    SchedulerFactory schedulerFactory = new StdSchedulerFactory();
    Scheduler scheduler = schedulerFactory.getScheduler();
    //创建JobDetail
    JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
    jobDetailBuilder.withIdentity("jobName","jobGroupName");
    JobDetail jobDetail = jobDetailBuilder.build();
    //创建触发的CronTrigger 支持按日历调度
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName", "triggerGroupName")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
                .build();
        //创建触发的SimpleTrigger 简单的间隔调度
        /*SimpleTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName","triggerGroupName")
                .startNow()
                .withSchedule(SimpleScheduleBuilder
                        .simpleSchedule()
                        .withIntervalInSeconds(2)
                        .repeatForever())
                .build();*/
    scheduler.scheduleJob(jobDetail,trigger);
    scheduler.start();
}

public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        System.out.println("todo something");
    }
}

什么是分布式任务调度

当前软件的架构正在逐步转变为分布式架构,将单体结构分为若干服务,服务之间通过网络交互来完成用户的业务处理,如下图,电商系统为分布式架构,由订单服务、商品服务、用户服务等组成:
在这里插入图片描述

分布式系统具体如下基本特点:

  • 分布性:每个部分都可以独立部署,服务之间交互通过网络进行通信,比如:订单服务、商品服务。
  • 伸缩性:每个部分都可以集群方式部署,并可针对部分结点进行硬件及软件扩容,具有一定的伸缩能力。
  • 高可用:每个部分都可以集群部分,保证高可用。

通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:

在这里插入图片描述

分布式调度要实现的目标:

不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:

  1. 并行任务调度:并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
  2. 高可用:若某一个实例宕机,不影响其他实例来执行任务。
  3. 弹性扩容:当集群中增加实例就可以提高并执行任务的处理效率。
  4. 任务管理与监测:对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
  5. 避免任务重复执行:当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。考虑采用下边的方法:
    • 分布式锁,多个实例在任务执行前首先需要获取锁,如果获取失败那么久证明有其他服务已经再运行,如果获取成功那么证明没有服务在运行定时任务,那么就可以执行。
      在这里插入图片描述
    • ZooKeeper 选举,利用 ZooKeeper 对 Leader 实例执行定时任务,有其他业务已经使用了 ZK,那么执行定时任务的时候判断自己是否是 Leader,如果不是则不执行,如果是则执行业务逻辑,这样也能达到我们的目的。
      在这里插入图片描述

总结:
分布式任务调度是指在分布式系统中,将任务分散到多个计算节点上,根据任务需求和节点状况,合理地调度和管理任务的执行。它负责将一个或多个任务分发到多个执行器上,并支持对任务进行定时执行、多次重试、任务依赖关系、监控和报警等功能。分布式任务调度可以大大提高任务的执行效率和可靠性,适用于大规模数据处理、定时任务调度和任务并发执行等场景。常见的分布式任务调度框架包括Apache Mesos、Kubernetes、Apache Hadoop YARN、Apache Spark等。

XXL-JOB

XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

官网:https://www.xuxueli.com/xxl-job/
文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B

XXL-JOB主要有调度中心、执行器、任务:

在这里插入图片描述

  • 调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;主要职责为执行器管理、任务管理、监控运维、日志管理等
  • 任务执行器:负责接收调度请求并执行任务逻辑;主要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
  • 任务:负责执行具体的业务处理。

调度中心与执行器之间的工作流程如下:
在这里插入图片描述

执行流程:

  1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心
  2. 达到任务触发条件,调度中心下发任务(下发到任务执行器中待执行任务queue)
  3. 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
  4. 执行器消费内存队列中的执行结果,主动上报给调度中心
  5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

XXL-JOB配置

XXL-JOB的调度中心是一个SpringBoot项目,在本机部署还是虚拟机上部署都可以,这里不多赘述。

我们主要来看看执行器的配置。

首先我们进入调度中心添加执行器

在这里插入图片描述

点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。

在这里插入图片描述
添加成功:

在这里插入图片描述
然后我们在媒资管理模块的service工程添加依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
</dependency>

接下来我们在nacos下的media-service-dev.yaml下配置xxl-job

xxl:
  job:
    admin: 
      addresses: http://192.168.101.65:8088/xxl-job-admin
    executor:
      appname: media-process-service
      address: 
      ip: 
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: default_token

注意配置中的appname这是执行器的应用名,port是执行器启动的端口,如果本地启动多个执行器注意端口不能重复。

最后我们配置一下xxl-job的执行器:

这个配置文件xxl-job示例工程中有:

在这里插入图片描述

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

XXL-JOB使用

我们接下来编写一个任务:

我们可以参考示例工程中任务类SampleXxlJob的编写方法

@Component
@Slf4j
public class SampleJob {
	 /**
	  * 1、简单任务示例(Bean模式)
	  */
	 @XxlJob("testJob")
	 public void testJob() throws Exception {
	 	 log.info("开始执行.....");
	 }
}

下边在调度中心添加任务,进入任务管理:
在这里插入图片描述
点击新增,填写任务信息:
在这里插入图片描述
配置解析:

  • 调度类型:Cron,通过Cron表达式实现更丰富的定时调度策略。Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}。xxl-job提供图形界面进行配置。一些例子如下:

    • 30 10 1 * * ? 每天1点10分30秒触发
    • 0/30 * * * * ? 每30秒触发一次
    • * 0/10 * * * ? 每10分钟触发一次
  • 运行模式:有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。

  • JobHandler:即任务方法名,填写任务方法上边@XxlJob注解中的名称

  • 路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,我们还有如下几种路由策略:

    • LAST(最后一个):固定选择最后一个机器
    • ROUND(轮询)
    • RANDOM(随机):随机选择在线的机器
    • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
    • LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
    • LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
    • FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
    • BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
    • SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

    我们可以发现除了分片广播策略其他的策略都是根据不同方法选取一个执行器下发任务

  • 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。

  • 调度过期策略:已经计划好的任务在某个指定的时间点或时间范围内未能在系统内被执行的情况。当一个任务的触发时间已经到达或超过了其指定的截止时间,但尚未被执行时,就会发生调度过期。通常情况下,这种情况会导致任务被取消或者延迟执行。调度过期可能由于各种原因发生,例如:系统故障、网络延迟、资源稀缺等。

    • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
    • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略

    • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行
    • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
    • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
  • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;

  • 失败重试次数:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试

添加成功,启动任务
在这里插入图片描述
通过调度日志查看任务执行情况
在这里插入图片描述
如果要停止任务需要在调度中心操作
在这里插入图片描述
任务跑一段时间注意清理日志
在这里插入图片描述

分片广播

分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数:

  • 当前执行器的编号
  • 执行器的数量

在这里插入图片描述

每个执行器收到调度请求同时接收分片参数。

xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。

作业分片适用哪些场景呢?

  • 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍
  • 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
    所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。

使用流程

“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理,可参考Sample示例执行器中的示例任务。

首先我们还是定义一个分片任务:

/**
  * 2、分片广播任务
  */
 @XxlJob("shardingJobHandler")
 public void shardingJobHandler() throws Exception {

 	 // 分片参数
  	int shardIndex = XxlJobHelper.getShardIndex();
  	int shardTotal = XxlJobHelper.getShardTotal();

	log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
	log.info("开始执行第"+shardIndex+"批任务");

 }

然后我们在在调度中心添加任务:
在这里插入图片描述

添加成功:
在这里插入图片描述
启动任务,观察日志:
在这里插入图片描述

下边启动两个执行器实例,观察每个实例的执行情况

首先在nacos中配置media-service的本地优先配置:

#配置本地优先
spring:
 cloud:
  config:
    override-none: true

将media-service启动两个实例

两个实例的在启动时注意端口不能冲突:
实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999

在这里插入图片描述
启动两个实例

观察任务调度中心,稍等片刻执行器有两个
在这里插入图片描述

如果其中一个执行器挂掉,只剩下一个执行器在工作,稍等片刻调用中心发现少了一个执行器将动态调整总分片数为1。

到此作业分片任务调试完成,此时我们可以思考:
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?

技术方案

掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。

任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?

XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。

下图表示了多个执行器获取视频处理任务的结构:

在这里插入图片描述
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。

上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:

  • 1 % 2 = 1 执行器2执行
  • 2 % 2 = 0 执行器1执行
  • 3 % 2 = 1 执行器2执行
  • 以此类推…

通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?

  • 调度过期策略选择忽略
  • 阻塞处理策略选择 丢弃后续调度单机串行方式 均可

只做这些配置可以保证任务不会重复执行吗?

做不到,还需要保证任务处理的幂等性,什么是任务的幂等性?任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。

解决幂等性常用的方案:

  • 数据库约束,比如:唯一索引,主键。
  • 乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
  • 唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。

基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理

视频处理方案及实现思路

确定了分片方案,下边梳理整个视频上传及处理的业务流程。

在这里插入图片描述

上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:

在这里插入图片描述

  1. 任务调度中心广播作业分片。
  2. 执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
  3. 执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
  4. 执行器启动多线程去处理任务。
  5. 任务处理完成,上传处理后的视频到MinIO。
  6. 更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。

然后我们整体的梳理一下视频上传的大致过程:

在上传视频这一模块我们使用到了断点续传这一技术,其本质就是将文件分块上传,然后在服务器端进行分块的合并。分块这一步骤由前端帮我们做,我们不需要写分块的代码。我们将视频上传到服务器之后,我们要对视频进行转码处理,这样我们才能保证上传之后的视频能直接播放(在这个项目中我们只考虑avi格式视频转mp4格式)。而这个转码的过程我们想要方便高效的实现就要借助任务调度技术,而这里我们使用了分布式架构,所以我们使用的分布式任务调度,也就是将任务分散到多个计算节点上,根据任务需求和节点状况,合理地调度和管理任务的执行。

然后我们详细的说说整个过程

首先用户在前端页面上传自己的视频(.avi),前端对文件进行分块,在上传这个文件前先请求媒资服务判断该文件是否存在,如果存在则进行文件上传步骤。在上传当前分块之前前端还会请求媒资服务判断该分块是否已经存在,如果不存在则上传这个分块。前端上传完分块之后请求媒资服务进行分块的合并,合并完之后在后端进行md5校验,校验通过则将文件信息保存到媒资服务的数据库中,同时还要将文件信息加入到待处理任务表中为后面的转码工作做准备。

接下来就来到了我们的视频处理部分,任务调度中心会向执行器发送任务,执行器收到广播分片会请求媒资服务从其数据库中拿到待处理的任务,根据待处理的任务再去从MinIO中下载要处理的文件,任务处理完成之后将处理好的视频上传到MinIO。最后更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。

实现思路:

  • 查询待处理任务:只处理未提交及处理失败的任务,任务处理失败后进行重试,最多重试3次
  • 开启任务:基于数据库的分布式锁
  • 更新任务状态:更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录
  • 视频处理:视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。

分布式锁

我们之前为了保证任务不重复执行采取了很多措施,但是仍然不够周密。我们可以试想一种情况,我们有四个执行器每个执行器都拿到自己要完成的任务,这四个执行器与调度中心之间使用网络进行通信,而网络通信有时候可能不稳定,假如这个时候有两个执行器失去了联系,根据xxl-job的弹性扩容机制,调度中心会对剩下的两个执行器重新编号然后发放任务,如此刚才发送给失联执行器的任务这个时候又发送给了剩下的两个存活执行器,这就可能造成多线程对任务的重复执行。

为了避免多线程去争抢同一个任务可以使用锁去解决这个问题

使用synchronized同步锁去解决,如下代码:

synchronized(锁对象){
   执行任务...
}

synchronized只能保证同一个虚拟机中多个线程去争抢锁。

在这里插入图片描述

如果是多个执行器分布式部署,并不能保证同一个视频只有一个执行器去处理。
现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:
在这里插入图片描述

虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。
该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
实现分布式锁的方案有很多,常用的如下:

  1. 基于数据库实现分布锁
    利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
  2. 基于redis实现锁
    redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
    拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
  3. 使用zookeeper实现
    zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。

这里我们使用数据库来实现分布式锁。

下边的sql语句可以实现更新操作:

update media_process m set m.status='4' where  m.id=?

如果是多个线程去执行该sql都将会执行成功,但需求是只能有一个线程抢到锁,所以此sql无法满足需求。

下边使用乐观锁的方式实现更新操作:

update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?

多个线程同时执行上边的sql只会有一个线程执行成功。

什么是乐观锁、悲观锁?
synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/488919.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

家用洗地机哪款好?2023入门级智能洗地机

现代社会对卫生日益重视&#xff0c;尤其是在工业、商业和公共场所要求越来越高。传统清洁方式不能满足人们的需求&#xff0c;清洁工作效率低且卫生难以保证。而洗地机的出现&#xff0c;正是为了解决这些问题。它能够深入清洁地面&#xff0c;有效防止不必要的污垢、细菌和病…

小满nestjs(第二十八章 nestjs 事务)

事务的四大特性 事务具有4个基本特征&#xff0c;分别是&#xff1a;原子性&#xff08;Atomicity&#xff09;、一致性&#xff08;Consistency&#xff09;、隔离性&#xff08;Isolation&#xff09;、持久性&#xff08;Duration&#xff09;&#xff0c;简称ACID ① 原子…

2023年5月产品经理认证NPDP线上班火热招生中

产品经理国际资格认证NPDP是新产品开发方面的认证&#xff0c;集理论、方法与实践为一体的全方位的知识体系&#xff0c;为公司组织层级进行规划、决策、执行提供良好的方法体系支撑。 【认证机构】 产品开发与管理协会&#xff08;PDMA&#xff09;成立于1979年&#xff0c;是…

23年5月高项备考学习笔记 —— 信息系统治理

治理是管理的控制 IT治理&#xff1a;关注风险 治理的驱动因素&#xff1a; 信息孤岛 资源整合目的空泛&#xff0c;缺少规划 目标价值&#xff1a; 与业务目标一致 有效利用信息资源 风险管理 管理层次&#xff1a; 最高管理层&#xff1a;董事会、证实***、战略 执行管理…

数值分析-埃特金算法

目录 一、前言 二、什么是埃特金算法 三、埃特金算法的原理 四、埃特金算法的步骤 1.确定插值点和半方差函数模型 2.计算插值点与已知点之间的距离和半方差函数值 3.确定权重 4.进行插值计算 5.评估插值结果 五、埃特金算法的优缺点 一、前言 数值分析是数学中的一个…

CUDA Stream, Event 与 NVVP

文章目录 一、CUDA StreamAPI实战CUDA Stream和 Serial执行的对比&#xff1a;PCIE和NVLINKCUDA Stream 多流的收益和上限CUDA Kernel合并CUDA7中的Per-Thread编译选项 二、Event三、NVVP四、知识点四 一、CUDA Stream CUDA Stream是GPU上task的执行队列&#xff0c;所有CUDA操…

Mysql表索引(总结篇)

目录 前言 ✨✨✨大家好&#xff0c;我是会飞的鱼-blog&#xff0c;今天我来给大家介绍一下Mysql&#xff0c;有不足之处&#xff0c;请大家多多指教。感谢大家支持&#xff01;&#xff01;&#xff01; 一、索引的概述 1.索引类型 2.索引存储 3.索引优缺点 4.使用建议…

如何在Windows上搭建NFS服务器实现开发板与Windows之间的文件共享

目录 1 安装nfs.exe 2 mounting 172.31.8.183:/f/nfs on /mnt/nfs failed: No such file or directory 3 mounting 172.31.8.183:/d/nfs on /mnt/nfs failed: Permission denied 1 安装nfs.exe 某项目中需要把程序放到Linux开发板中测试&#xff0c;刚开始使用tftp命令下载…

常见8大排序算法详解

常见8大排序算法 分别是冒泡排序、选择排序、插入排序、希尔排序、快速排序、堆排序、归并排序、基数排序&#xff08;桶排序&#xff09; 冒泡排序 思路 n个数字从小到大排序&#xff0c;每个数和它后面的数比较&#xff0c;小的放前面&#xff0c;大的放后面&#xff0c;…

微信小程序学习实录5(H5嵌入小程序、map组件、地图调起功能、腾讯百度高德导航页、返回web-view页)

H5嵌入微信小程序 一、H5页面地图1.H5地图加载2.标注事件 二、H5返回微信小程序1.H5页面核心代码2.微信小程序接收传参核心代码 三、开发中遇见的坑1.wx.openLocation调起地图后需要点击两次返回才到web-view页面2.H5无法调用百度定位new BMap.Geolocation对象3.安卓某些机型无…

Linux:《tar》归档命令

准备好4个文件然后使用tar命令进行归档 最常用的是 -c, --create&#xff08;小写&#xff09; 建立新的存档 -f, --file [HOSTNAME:]F 指定存档或设备 (缺省为 /dev/rmt0) -z, --gzip, --ungzip 用 gzip 对存档压缩或解压 -j&…

京东数据分析:2023年Q1白酒电商整体动销增长,中低端酒企压力大

早在年初就有白酒市场业内人士表示&#xff0c;2023年春节白酒消费市场整体景气度较高&#xff0c;白酒动销表现较为优秀&#xff0c;预计2023年一季度白酒动销有望实现一定的增长。 对于这一观点&#xff0c;我们在今年电商数据平台鲸参谋出炉的Q1白酒销售数据表现中得到了验证…

【软考高级】2019年系统分析师案例分析

1、阅读以下关于软件系统分析的叙述&#xff0c;在答题纸上回答问题1至问题3。 【说明】 某软件企业为电信公司开发一套网上营业厅系统&#xff0c;以提升服务的质量和效率。项目组经过分析&#xff0c;列出了项目开发过程中的主要任务、持续时间和所依赖的前置任务&#xff…

1.0 Vue的编译和运行

1、编程范式&#xff1a;命令式和声明式 编程范式是指一种程序语言的代码风格、样式&#xff0c;每一种范式都包含了代码特征和结构&#xff0c;以及处理错误的方式。 例如现在需要实现生成一个div模块&#xff0c;其显示的文本内容为hello world&#xff0c;添加一个点击事件…

X3派caffe yolov3 部署demo

yolov3放置在docker中/open_explorer/ddk/samples/ai_toolchain/horizon_model_convert_sample/04_detection/02_yolov3_darknet53/mapper 模型所需要的prototxt和caffe模型yolov3.caffemodel文件放置在docker中的/open_explorer/ddk/samples/ai_toolchain/model_zoo/mapper/de…

78页2023年智慧公安发展构思与建设解决方案(ppt可编辑)

本资料来源公开网络&#xff0c;仅供个人学习&#xff0c;请勿商用&#xff0c;如有侵权请联系删除。 总体架构10 建设方案-网络系统11 物联网15 视频网系统16 视频专网主干网根据运营商链路分为若干个环网部署&#xff0c;市局两台核心交换机位于网络核心层&#xff1b;部分…

SpringCloud 分布式事务组件之Seata

目录 背景介绍什么是分布式事务什么叫做逆向补偿呢互联网最流行的分布式事务组件seata总结 背景 大家好&#xff0c;今天给大家分享一个在2022年出去面试Java几乎必问的一个技术&#xff0c;那就是seata。什么&#xff1f;&#xff1f;你才看了第一句话心里有闪现了无数个问…

024 - C++ 虚函数

本期我们学习的是 C 中的虚函数。 过去的几期&#xff0c;我们一直在讨论类、面向对象编程、继承这些内容&#xff0c;所有的这些内容&#xff0c;包括本期我们将要学习的虚函数&#xff0c;对整个面向对象的概念都非常重要。 虚函数能干什么呢? 虚函数允许我们在子类中重写…

关于C语言的一些笔记

文章目录 May4,2023常量问题基本数据类型补码printf的字符格式控制关于异或、异或的理解赋值运算i和i的区别关系运算符 摘自加工于C技能树 May4,2023 常量问题 //定义常量 const float PI; PI 3.14; //false ,这种声明变量是错误的&#xff0c;常量声明之后就不能修改&…

树脂塞孔有哪些优缺点及应用?

树脂塞孔的概述 树脂塞孔就是利用导电或者非导电树脂&#xff0c;通过印刷&#xff0c;利用一切可能的方式&#xff0c;在机械通孔、机械盲埋孔等各种类型的孔内进行填充&#xff0c;实现塞孔的目的。 树脂塞孔的目的 1 树脂填充各种盲埋孔之后&#xff0c;利于层压的真空下…