Elasticjob(2.1.4) failover 、misfire及执行线程池分析

news2024/12/25 1:44:26

Failover

当设置failover为true时候,elasticjob 集群通过zookeeper 的event watcher 监听是否有instance 丢失,然后对丢失instance 对应的分片进行立即执行。

重复一下,failover是立即执行,不是按crontab时间来触发,这个触发是不管丢失的分片是否处于运行状态。

在这里插入图片描述

Elasticjob 执行和分配是分离的,上图的node data 是instance 的IP+通信端口(instance client port)决定哪个instance 负责执行,但failover执行除外。

以同一 job 有A,B两个instance ,两个instance 各负责1个分片为例:
1、 instance A 异常退出,instance B在秒级收到zookeeper node remove 事件后 重新设置shard 对应 node data 为自己并设置failover 信息
2、 instance B设置failover后立即触发新分片的执行

核心函数如下:

 class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
	//注意只检查对应instanceid,如果remove的instance 与sharding 的node data不一致,不会触发
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
 
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
// 只要分片是对应被remove的instance 就立即触发
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
}

以上代码要注意只检查对应instanceid,如果remove的instance 与sharding 的node data不一致,不会触发failover。
而根据以下代码

/**
     * 如果需要失效转移, 则执行作业失效转移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.info("Failover job '{}' begin, crashed item '{}'.................", jobName, crashedItem);
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO 不应使用triggerJob, 而是使用executor统一调度
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                jobScheduleController.triggerJob();
            }
        }

在failover 执行完毕才会去修改sharding 的node data,因此当接管failover的instance 没有执行完failover就shutdown 的话,此时由于node data 与新shutdown instance id不一致 、failover是不会被再次触发的。

因此继续分析以上案例, 以同一 job 有A,B两个instance ,两个instance 各负责1个分片为例:
1、 instance A 异常退出,instance B在秒级收到zookeeper node remove 事件后 重新设置shard 对应 node data 为自己并设置failover 信息
2、 instance B设置failover后立即触发新分片的执行
3、 重新启动一个instance C(PS:进程重新启动由于端口不一样,会认为是新instance)并立即shutdown instance B
4、 如果第1-3步间隔时间短,instance B 还收到zookeeper event,只有1个sharding 在instance C被立即触发
5、 如果1-3 间隔时间长,instance B已经完成instance A 分表执行,instance C会触发instance B 负责2个分片并在执行完毕后接管2个分片直到新instance 加入并重新分片。

Misfire

从代码看在2次正常触发过程中,配置misfire最多执行一次。

Quartz只在执行完毕后执行一次next fire的计算,在计算过程发现misfire 会去调用misfire listener。

调用的stack 信息如下:
Thread [springSimpleJob_QuartzSchedulerThread] (Suspended (breakpoint at line 46 in JobTriggerListener)) 
owns: Object (id=3883) 
JobTriggerListener.triggerMisfired(Trigger) line: 46 
QuartzScheduler.notifyTriggerListenersMisfired(Trigger) line: 1905 
SchedulerSignalerImpl.notifyTriggerListenersMisfired(Trigger) line: 74 
RAMJobStore.applyMisfire(TriggerWrapper) line: 1354 
RAMJobStore.acquireNextTriggers(long, int, long) line: 1412 
QuartzSchedulerThread.run() line: 272 

triggerMisfired 负责设置misfire 标志外,以上堆栈的类都是quartz的类。
Elasticjob triggerMisfired相关代码如下:

 @Override
    public void triggerMisfired(final Trigger trigger) {
        if (null != trigger.getPreviousFireTime()) {
            executionService.setMisfire(shardingService.getLocalShardingItems());
        }
}

而实际执行代码如下:

 if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                    shardingContexts.getShardingItemParameters().keySet()));
        }
        return;
    }
    try {
        jobFacade.beforeJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
    execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
    }

可以看出在正常执行完毕后再去触发misfire一次,而且是顺序执行。

执行线程池管理

Elasticjob 默认为每个job 创建CORE*2的线程池队列,核心代码如下

public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {
    
    @Override
    public ExecutorService createExecutorService(final String jobName) {
        return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
    }
}

public ExecutorServiceObject(final String namingPattern, final int threadSize) {
        workQueue = new LinkedBlockingQueue<>();
        threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue, 
                new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }
    

每次执行通过以下代码获取对应的执行线程池,没有就去新建:

/**
     * 获取线程池服务.
     * 
     * @param jobName 作业名称
     * @param executorServiceHandler 线程池服务处理器
     * @return 线程池服务
     */
    public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) {
        if (!REGISTRY.containsKey(jobName)) {
            REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName));
        }
        return REGISTRY.get(jobName);
    }

因此如果job很多,会创建大量的线程,如果同时运行的job多就会带来频繁的Thread Context 切换。

如果都是短时间可以执行job,可以配置job的
executor-service-handler=“com.dangdang.ddframe.job.custom. DefaultExecutorServiceHandler”
并使用以下代码替换默认线程池

public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {
	
	final ExecutorService executorService=       new ExecutorServiceObject("innerjob-executepool", Runtime.getRuntime().availableProcessors() * N).createExecutorService();

    
    @Override
    public ExecutorService createExecutorService(final String jobName) {
    	  return executorService;
    }
}

即共用一个线程池,执行效率可能更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431455.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于RDF本体模型和图数据库实现知识查询与推理

基于RDF本体模型和图数据库实现知识查询与推理 基于RDF本体模型和图数据库实现知识查询与推理一、案例本体模型解释二、数据构建与查询 Here’s the table of contents: 基于RDF本体模型和图数据库实现知识查询与推理 本文主要使用ONgDB图数据库和Neosemantics组件&#xff0c;…

自建个人音乐播放器Navidrome - 内网穿透实现在外随时访问

文章目录 1. 前言2. Navidrome网站搭建2.1 Navidrome下载和安装2.1.1 安装并添加ffmpeg2.1.2下载并配置Navidrome2.1.3 添加Navidrome到系统服务 2.2. Navidrome网页测试 3. 本地网页发布3.1 cpolar的安装和注册3.2 Cpolar云端设置3.3 Cpolar本地设置 4. 公网访问测试5. 结语 转…

【Android实战开发】flutter实现网络请求的方法示例

Flutter网络请求使用的是Dio。Dio是一个强大易用的dart http请求库&#xff0c;支持Restful API、FormData、拦截器、请求取消、Cookie管理、文件上传/下载……. Flutter json数据解析是使用了json_serializable package包。它是一个自动化源代码生成器&#xff0c;可以为我们…

C++快速幂详解例题

基本概念 什么是快速幂呢&#xff1f;个人理解&#xff0c;就是更快速的计算幂运算。 比如计算a^b 刚学这个算法的时候我也很疑惑&#xff0c;幂运算不是有现成的公式么&#xff0c;直接pow&#xff08;a,b&#xff09;不就好了吗&#xff1f; 后来才明白&#xff0c;pow(a,b)的…

三分钟了解什么是时序数据库

在介绍时序数据库之前&#xff0c;我们先来看看什么是时序数据。时序数据就是基于时间排序的数据&#xff0c;再通过时间坐标将这些数据连接起来&#xff0c;形成一个折线图&#xff0c;直观地展示一个指标在过去一段时间内的走势和规律&#xff0c;帮助定位数据异常点。 时序…

Oracle中Archived redolog的生成

目录 一、问题预览 二、问题解答 一、问题预览 大家都知道 Oracle 中 online redolog切换后会生成 archived redolog&#xff0c;心里默认的就是 online redolog 切换后 archived redolog 已经生成。切换示意图&#xff0c;如下图所示。 但事实真的是这样吗&#xff1f; 二、…

C++ 23 实用工具(一)

C 23 实用工具&#xff08;一&#xff09; 工具函数是非常有价值的工具。它们不仅可以用于特定的领域&#xff0c;还可以应用于任意值和函数&#xff0c;甚至可以创建新的函数并将它们绑定到变量上。 常用函数 你可以使用各种变体的 min、max 和 minmax 函数来对值和初始化列…

【使用ChatGPT自动化】批量转换.xls文件为.xlsx文件

第1次提问&#xff1a; 我&#xff1a;我想使用Python批量转换.xls文件为.xlsx文件&#xff0c;请你提供代码 它&#xff1a; 当涉及到批量处理文件时&#xff0c;我们通常需要使用Python中的os模块和glob模块。os模块用于管理文件和目录&#xff0c;glob模块用于匹配文件路径名…

Visual Studio Code 1.77 发布!

欢迎使用 Visual Studio Code 2023 年3月版。此版本有许多更新&#xff0c;其中一些主要亮点包括&#xff1a; 无障碍改进&#xff1a;新的悬停、通知和 Sticky Scroll 快捷键 复制 GitHub 深度链接&#xff1a;在编辑器内创建永久链接和 HEAD 链接 笔记本保存格式&#xff1…

软件测试流程进阶----四年软件测试总结

工作四年了&#xff0c;我一直希望让自己每年对测试的理解更深入一层。工作一年的时候&#xff0c;我谈轮了自己对各种测试的理解&#xff0c;这一年来&#xff0c;虽然对那些理概念的有所加强&#xff0c;自我感觉没有什么质的变化。前些天听我们公司的一位测试经理讲《敏捷测…

精准抓住核心要点!!!十名面试官总结出这样一份面试通关答案,还不赶紧开始“作弊”通关!!!

金三银四求职季&#xff0c;但最近很多朋友私信说&#xff1a; 熬过了去年的寒冬&#xff0c;却没躲过如今的内卷&#xff1b; 打开Boss直拒&#xff0c;一排已读不回&#xff1b; 大部分回复的都是外包&#xff0c;薪资低于预期&#xff0c;对技术水平要求却远超从前&#x…

大数据和 CRM系统:它们如何帮助中小企业?

作为中小企业主&#xff0c;你可能在想&#xff0c;"大数据与我有什么关系&#xff1f;"但如果你使用某些类型的业务应用&#xff0c;即使预算很少&#xff0c;你也可以从大数据中获益。一个最好的例子是客户关系管理&#xff08;CRM&#xff09;系统&#xff0c;它提…

二极管反向恢复过程详细解析

二极管反向恢复过程&#xff0c;现代脉冲电路中大量使用晶体管或二极管作为开关, 或者使用主要是由它们构成的逻辑集成电路。而作为开关应用的二极管主要是利用了它的通(电阻很小)、断(电阻很大) 特性, 即二极管对正向及反向电流表现出的开关作用。二极管和一般开关的不同在于,…

在线帮助中心对企业的作用及解决方案

帮助中心对于一款互联网产品来说&#xff0c;重要性不言而喻。随着公司客户服务水平的不断提高&#xff0c;越来越多的公司逐渐重视客户服务。一个好的在线帮助中心必定能提高客户的转化率。那么&#xff0c;在线帮助中心对企业的帮助和作用有哪些呢? 在线帮助中心的作用 1.快…

3.10——常类型

常类型的引入&#xff0c;就是为了既保证数据共享又防止数据被改动。常类型是指使用类型修饰符const说明的类型&#xff0c;常类型的变量或对象成员的值在程序运行期间是不可改动的。 常引用 如果在说明引用时用const修饰&#xff0c;则被说明的引用为常引用。如果用常引用作为…

adb环境变量配置

adb环境变量配置Android一. 简介二. 环境变量配置1.JDK安装2.SDK安装3. 资源共享4. 配置环境变量4.1 方式一&#xff1a;4.2 方式二&#xff1a;5. adb常用命令的使用6. 结果Android List of ADB Commands and Fastboot Commands for Android 如果你是一个android用户&#xf…

paddleocr,windows pip 安装巨坑 lanms 库

安装 lanms 最佳参考&#xff1a; paddleocr&#xff0c;windows pip 安装巨坑 lanms 库 防丢失&#xff0c;直接抄录的一份 paddleocr最后几个库一个比一个难装&#xff0c;特别是 lanms 库&#xff0c;巨难装&#xff0c;拒绝任何花里胡哨&#xff0c;十分钟&#xff0c;三步…

HTML5 <menu> 标签

HTML5 <menu> 标签 实例 HTML5 <menu>标签用于定义菜单列表。 两个菜单按钮系列选项实例&#xff08;"File" 和 "Edit"&#xff09;&#xff1a; <menu type"toolbar"> <li> <menu label"File"> &…

Linux-初学者系列——篇幅4_系统运行相关命令

系统运行相关命令-目录一、关机重启注销命令1、重启或者关机命令-shutdown语法格式&#xff1a;常用参数&#xff1a;01 指定多久关闭/重启系统02 指定时间关闭/重启系统03 实现立即关闭/重启系统04 取消关闭/重启系统计划2、重启或者关机命令-halt/poweroff/reboot/systemctl语…

Lucene Solr Elasticsearch三者之间的关系,怎么选?

Lucene简介&#xff1a; Lucene主要用于构建文本搜索应用程序&#xff0c;包括Web搜索引擎、桌面搜索工具和商业应用程序。它提供了诸如单词分析、查询解析、搜索结果排序等功能&#xff0c;可以轻松地在大量文档中快速搜索和查找相关信息。 Lucene具有以下特点&#xff1a; …