分布式之任务调度Elastic-Job学习一

news2024/11/28 0:59:28

1 E-Job

1.1 任务调度高级需求

Quartz 的不足:
1、 作业只能通过 DB 抢占随机负载,无法协调
2、 任务不能分片——单个任务数据太多了跑不完,消耗线程,负载不均
3、 作业日志可视化监控、统计

1.2 发展历史

E-Job 是怎么来的?
在当当的 ddframe 框架中,需要一个任务调度系统(作业系统)。
在这里插入图片描述
实现的话有两种思路,一个是修改开源产品,一种是基于开源产品搭建(封装),当当选择了后者,最开始这个调度系统叫做 dd-job。它是一个无中心化的分布式调度框架。因为数据库缺少分布式协调功能(比如选主),替换为 Zookeeper 后,增加了弹性扩容和数据分片的功能。
Elastic-Job 是 ddframe 中的 dd-job 作业模块分离出来的作业框架,基于 Quartz和 Curator 开发,在 2015 年开源。
轻量级,无中心化解决方案。
为什么说是去中心化呢?因为没有统一的调度中心。集群的每个节点都是对等的,节点之间通过注册中心进行分布式协调。E-Job 存在主节点的概念,但是主节点没有调度的功能,而是用于处理一些集中式任务,如分片,清理运行时信息等。
思考:如果 ZK 挂了怎么办
每个任务有独立的线程池。
在这里插入图片描述
从官网开始
http://elasticjob.io/docs/elastic-job-lite/00-overview/
https://github.com/elasticjob
Elastic-Job 最开始只有一个 elastic-job-core 的项目,在 2.X 版本以后主要分为Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目。其中,Elastic-Job-Lite 定位为轻量级 无 中 心 化 解 决 方 案 , 使 用 jar 包 的 形 式 提 供 分 布 式 任 务 的 协 调 服 务 。 而Elastic-Job-Cloud 使用 Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务(跟 Lite 的区别只是部署方式不同,他们使用相同的 API,只要开发一次)。

1.3 功能特性

 分布式调度协调:用 ZK 实现注册中心
 错过执行作业重触发(Misfire)
 支持并行调度(任务分片)
 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
 弹性扩容缩容:将任务拆分为 n 个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job 将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
 失效转移 failover:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部
分性能。
 支持作业生命周期操作(Listener)
 丰富的作业类型(Simple、DataFlow、Script)
 Spring 整合以及命名空间提供
 运维平台

1.4 项目架构

应用在各自的节点执行任务,通过 ZK 注册中心协调。节点注册、节点选举、任务分片、监听都在 E-Job 的代码中完成。
在这里插入图片描述

2 Java 开发

工程:ejob-standalone

2.1 pom 依赖

<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>elastic-job-lite-core</artifactId>
	<version>2.1.5</version>
</dependency>

2.2 任务类型

standalone 工程
任务类型有三种:
2.2.1 SimpleJob
SimpleJob: 简单实现,未经任何封装的类型。需实现 SimpleJob 接口
ejob-standalone MySimpleJob.java
public class MyElasticJob implements SimpleJob {
	public void execute(ShardingContext context) {
	System.out.println(String.format("Item: %s | Time: %s | Thread: %s ", context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId()));
	}
}
2.2.2 DataFlowJob
DataFlowJob:Dataflow 类型用于处理数据流,必须实现 fetchData()和processData()的方法,一个用来获取数据,一个用来处理获取到的数据。
ejob-standalone MyDataFlowJob.java
public class MyDataFlowJob implements DataflowJob<String> {
	@Override
	public List<String> fetchData(ShardingContext shardingContext) {
		// 获取到了数据
		return Arrays.asList("qingshan","jack","seven");
	}
	@Override
	public void processData(ShardingContext shardingContext, List<String> data) {
		data.forEach(x-> System.out.println("开始处理数据:"+x));
	}
}
2.2.3 ScriptJob
Script:Script 类型作业意为脚本类型作业,支持 shell,python,perl 等所有类型脚本。D 盘下新建 1.bat,内容
@echo ------【脚本任务】Sharding Context: %*
ejob-standalone script.ScriptJobTest
只要指定脚本的内容或者位置

2.3 E-Job 配置

2.3.1 配置步骤
配置手册:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
1、ZK 注册中心配置(后面继续分析)
2、作业配置(从底层往上层:Core——Type——Lite)
配置级别配置类配置内容
CoreJobCoreConfiguration用于提供作业核心配置信息,如:作业名称、CRON 表达式、分片总数等。
TypeJobTypeConfiguration有 3 个子类分别对应 SIMPLE, DATAFLOW 和 SCRIPT 类型作业,提供 3 种作业需要的不同配置,如:DATAFLOW 类型是否流式处理或 SCRIPT 类型的命
令行等。Simple 和 DataFlow 需要指定任务类的路径。
RootJobRootConfiguration有 2 个子类分别对应 Lite 和 Cloud 部署类型,提供不同部署类型所需的配置,如:Lite 类型的是否需要覆盖本地配置或 Cloud 占用 CPU 或 Memory数量等。可以定义分片策略。http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/
public class SimpleJobTest {
	public static void main(String[] args) {
		// ZK 注册中心
		CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new
		ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
		regCenter.init();
		// 定义作业核心配置
		JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("MyElasticJob", "0/2 * * * * ?", 1).build();
		// 定义 SIMPLE 类型配置
		SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
		// 定义 Lite 作业根配置
		LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
		// 构建 Job
		new JobScheduler(regCenter, simpleJobRootConfig).init();
	}
}
作业配置分为 3 级,分别是 JobCoreConfiguration,JobTypeConfiguration 和LiteJobConfiguration 。 LiteJobConfiguration 使 用 JobTypeConfiguration ,JobTypeConfiguration 使用 JobCoreConfiguration,层层嵌套。

在这里插入图片描述
JobTypeConfiguration 根 据 不 同 实 现 类 型 分 为 SimpleJobConfiguration ,DataflowJobConfiguration 和 ScriptJobConfiguration。E-Job 使用 ZK 来做分布式协调,所有的配置都会写入到 ZK 节点。

2.3.2 ZK 注册中心数据结构
一个任务一个二级节点。
这里面有些节点是临时节点,只有任务运行的时候才能看到。
注意:修改了任务重新运行任务不生效,是因为 ZK 的信息不会更新, 除非把overwrite 修改成 true
config 节点
JSON 格式存储。
存储任务的配置信息,包含执行类,cron 表达式,分片算法类,分片数量,分片参数等等。
{
"jobName": "MySimpleJob", "jobClass": "job.MySimpleJob", "jobType": "SIMPLE", "cron": "0/2 * * * * ?", "shardingTotalCount": 1, "shardingItemParameters": "", "jobParameter": "", "failover": false, "misfire": true, "description": "", "jobProperties": {
"job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler", "executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler" }, "monitorExecution": true, "maxTimeDiffSeconds": -1, "monitorPort": -1, "jobShardingStrategyClass": "", "reconcileIntervalMinutes": 10, "disabled": false, "overwrite": false
}
config 节点的数据是通过ConfigService 持久化到 zookeeper中去的。默认状态下,如果你修改了 Job 的配置比如 cron 表达式、分片数量等是不会更新到 zookeeper 上去的,除非你在 Lite 级别的配置把参数 overwrite 修改成 true。
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();

instances 节点
同一个 Job 下的 elastic-job 的部署实例。一台机器上可以启动多个 Job 实例,也就是 Jar 包。instances 的命名是 

在这里插入图片描述
leader 节点
在这里插入图片描述
任务实例的主节点信息,通过 zookeeper 的主节点选举,选出来的主节点信息。在elastic job 中,任务的执行可以分布在不同的实例(节点)中,但任务分片等核心控制,需要由主节点完成。因此,任务执行前,需要选举出主节点。
下面有三个子节点:
election:主节点选举
sharding:分片
failover:失效转移
election 下面的 instance 节点显示了当前主节点的实例 ID:jobInstanceId。
election 下面的 latch 节点也是一个永久节点用于选举时候的实现分布式锁。
sharding 节点下面有一个临时节点,necessary,是否需要重新分片的标记。如果分片总数变化,或任务实例节点上下线或启用/禁用,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算。

servers 节点
在这里插入图片描述
任务实例的信息,主要是 IP 地址,任务实例的 IP 地址。跟 instances 不同,如果多个任务实例在同一台机器上运行则只会出现一个 IP 子节点。可在 IP 地址节点写入DISABLED 表示该任务实例禁用。
sharding 节点
在这里插入图片描述
任务的分片信息,子节点是分片项序号,从 0 开始。分片个数是在任务配置中设置的。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。最主要的子节点就是 instance。

子节点名是否临时节点描述
instance执行该分片项的作业运行实例主键
running分片项正在运行的状态 仅配置 monitorExecution 时有效
failover如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器 IP
misfire是否开启错过任务重新执行
disabled是否禁用此分片项

3 运维平台

3.1 下载解压运行

git 下载源码 https://github.com/elasticjob/elastic-job-lite
对 elastic-job-lite-console 打包得到安装包(网盘已提供现成的 console 包)。解压缩 elastic-job-lite-console-${version}.tar.gz 并执行 bin\start.sh(Windows运行.bat)。打开浏览器访问 http://localhost:8899/即可访问控制台。
8899 为默认端口号,可通过启动脚本输入-p 自定义端口号。
默认管理员用户名和密码是 root/root。右上角可以切换语言。

3.2 添加 ZK 注册中心

第一步,添加注册中心,输入 ZK
运维平台和 elastic-job-lite 并无直接关系,是通过读取作业注册中心数据展现作业状态,或更新注册中心数据修改全局配置。
控制台只能控制作业本身是否运行,但不能控制作业进程的启动,因为控制台和作业本身服务器是完全分离的,控制台并不能控制作业服务器。
可以对作业进行操作

在这里插入图片描述
在这里插入图片描述

3.3 事件追踪

http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/
Elastic-Job 提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。
Elastic-Job-Lite 在配置中提供了 JobEventConfiguration,目前支持数据库方式配置。
ejob-standalone:simple.SimpleJobTest
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");
dataSource.setUsername("root");
dataSource.setPassword("123456");
JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); …………
new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();
事件追踪的 event_trace_rdb_url 属性对应库自动创建 JOB_EXECUTION_LOG 和JOB_STATUS_TRACE_LOG 两张表以及若干索引。

在这里插入图片描述
需要在运维平台中添加数据源信息,并且连接:
在这里插入图片描述
在作业历史中查询:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1357005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第11课 实现桌面与摄像头叠加

在上一节&#xff0c;我们实现了桌面捕获功能&#xff0c;并成功把桌面图像和麦克风声音发送给对方。在实际应用中&#xff0c;有时候会需要把桌面与摄像头图像叠加在一起发送&#xff0c;这节课我们就来看下如何实现这一功能。 1.备份与修改 备份demo10并修改demo10为demo11…

Python 快速合并PDF表格转换输出CSV文件

单位的刷脸考勤机后台系统做得比较差&#xff0c;只能导出每个部门的出勤统计表pdf&#xff0c;格式如下&#xff1a; 近期领导要看所有部门的考勤数据&#xff0c;于是动手快速写了个合并pdf并输出csv文件的脚本。 安装模块 pypdf2&#xff0c;pdfplumber&#xff0c;前者用…

切换node.js不同版本

切换node.js不同版本 因新项目用到vite4创建项目&#xff0c;输入命令后报错&#xff0c;经查询得知是node版本过低导致&#xff0c;所以需要升级node版本&#xff0c;但是又有老的项目需要维护&#xff0c;因此需要多个版本的node使用需求。 流程&#xff1a; 卸载原有的node…

金和OA c6 uploadfileeditorsave接口存在任意文件上传漏洞

产品简介 金和网络是专业信息化服务商&#xff0c;为城市监管部门提供了互联网监管解决方案&#xff0c;为企事业单位提供组织协同OA系统升开发平台&#xff0c;电子政务一体化平台智慧电商平合等服务 漏洞概述 金和-c6 uploadfileeditorsave 任意文件上传&#xff0c;攻击者…

MySQL基础篇(四)多表查询

一、多表关系 项目开发中&#xff0c;在进行数据库表结构设计时&#xff0c;会根据业务需求及业务模块之间的关系&#xff0c;分析并设计表结构&#xff0c;由于业务之间相互关联&#xff0c;所以各个表结构之间也存在着各种联系&#xff0c;基本上分为三种&#xff1a; &#…

Hi5 2.0 虚拟手与追踪器(Tracker)的位置修正

问题描述 使用环境与工具&#xff1a;Unity 2022.3.4fc1&#xff0c;steam VR(2.7.3)&#xff0c;steamvrSDK&#xff08;1.14.15&#xff09;&#xff0c;HTC vive pro专业版&#xff0c;Hi5 2.0数据手套 首先按照Hi5 2.0的使用说明&#xff08;可参考&#xff1a;HI5 2.0 交…

【数据结构—二叉树的链式结构实现】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 一、二叉树的存储结构 二、二叉树链式结构的实现 2.1手动构建一课树 2.2二叉树的遍历 三、二叉树链式结构的实现 3.1前序遍历(递归) 3.2中序遍历(递归) 3.3后序…

多线程常用信号:ManualResetEvent,AutoResetEvent

1.了解信号 在这两个信号中: 1.Set方法可以将信号置为发送状态&#xff1b; 释放信号&#xff0c;所有等待信号的线程都将获得信号&#xff0c;开始执行WaitOne()后面的语句&#xff1b; 将事件状态设置为中&#xff0c;终止状态许一个或多个的等待线程继续 2.Reset方法将信号置…

【MFC】计算机图形学实验:熟悉开发环境及工具(代码)

实验内容&#xff1a; 【MFC】计算机图形学实验1&#xff1a;熟悉开发环境及工具_绘制多义线mfc-CSDN博客 画笔和字体只给出两处代码&#xff1a; //创建刷子&#xff0c;设置填充色为黑色 CBrush NewBrush; NewBrush.CreateSolidBrush(RGB(0, 0, 0)); pDC->SelectObjec…

静态网页设计——校园官网(HTML+CSS+JavaScript)

前言 声明&#xff1a;该文章只是做技术分享&#xff0c;若侵权请联系我删除。&#xff01;&#xff01; 使用技术&#xff1a;HTMLCSSJS 主要内容&#xff1a;对学校官网的结构进行模仿&#xff0c;对布局进行模仿。 主要内容 1、首页 首页以多个div对页面进行分割和布局…

Apache Camel笔记

Apache Camel笔记 1. Apache Camel概念 Apache Camel是一个轻量级的应用集成开发框架&#xff0c;专注于简化集成应用的开发。它基于Enterprise Integration Patterns&#xff08;企业集成模式&#xff0c;简称EIP&#xff09;的设计理念&#xff0c;提供了灵活的路由和中介机制…

03、Kafka ------ CMAK(Kafka 图形界面管理工具) 下载、安装、启动

目录 CMAK&#xff08;Kafka 图形界面管理工具&#xff09;下载安装启动打开 cmak 图形界面 CMAK&#xff08;Kafka 图形界面管理工具&#xff09; Kafka本身并没有提供Web管理工具&#xff0c;而是推荐使用bin目录下各种工具命令来管理Kafka&#xff0c; 这些工具命令其实用起…

玻色量子正式获准设立博士后科研工作站!即日起正式开启招聘!

​2023年11月21日&#xff0c;北京市人力资源和社会保障局在亦庄成功举办了“凝聚中国式现代化进程中的博士后力量推进会暨第二届全国博士后创新创业大赛北京赛区总结”。为加强博士后工作对提高企业创新能力的支持力度&#xff0c;推动产学研深度融合&#xff0c;经专家评议、…

Flyweight享元/共享模式(对象性能)

Flyweight 链接&#xff1a;享元模式实例代码 解析 目的 在软件系统采用纯粹对象方案的问题在于大量细粒度的对象会很快充斥在系统中&#xff0c;从而带来很高的运行时代价——主要指内存需求方面的代价。如何在避免大量细粒度对象问题的同时&#xff0c;让外部客户程序仍然…

全国计算机等级考试| 二级Python | 真题及解析(7)

一、选择题 1.python中,表达式5%2 = ( )。 A.2.5 B.2 C.1 D.0 2.已知字符串a="python",则a[ 1 : 3 ]的值为( ) A."pyth" B."pyt" C."py" D…

Java-replaceAll()同时替换多个字符

今天复现了raplaceAll&#xff08;&#xff09;的用法&#xff0c;但是通常都是对一种字符进行替换&#xff0c;我就在想有没有操作可以一次性替换多个不同的字符&#xff0c;百度一搜&#xff0c;果然有。具体情况如下 首先是替换字的 String str1 "小明&#xff0c;小…

JRT实现表格元素

数据结构决定算法基础&#xff0c;良好的设计可以极大的减轻上层的复杂度。以前由于受限M没画笔&#xff0c;都得通过Webservice代理出去&#xff0c;所以实现一些效果比较难。用M控制打印绘制表格就很费劲。但是打印报告结果、药敏等很多都是列表排版。用TextLength控制换行或…

wy的leetcode刷题记录_Day72

wy的leetcode刷题记录_Day72 声明 本文章的所有题目信息都来源于leetcode 如有侵权请联系我删掉! 时间&#xff1a; 前言 目录 wy的leetcode刷题记录_Day72声明前言2397. 被列覆盖的最多行数题目介绍思路代码收获 1137. 第 N 个泰波那契数题目介绍思路代码收获 2397. 被列覆…

MATLAB全局最优搜索函数:GlobalSearch函数

摘要&#xff1a;本文介绍了 GlobalSearch 函数的使用句式&#xff08;一&#xff09;、三个运行案例&#xff08;二&#xff09;、以及 GlobalSearch 函数的参数设置&#xff08;三、四&#xff09;。详细介绍如下&#xff1a; 一、函数句法 Syntax gs GlobalSearch gs Glo…

java 6种深拷贝集合方式及其性能差异对比

文章目录 ArrayList 构造方法拷贝运行1000次耗时 1ms for循环拷贝运行1000次耗时 14ms Stream流 collect实现拷贝运行1000次耗时 54ms Stream流spring的BeanUtils实现拷贝运行1000次耗时 2468 ms Hutool工具实现拷贝Hutool 5.7.13版本运行1000次耗时 64674 msHutool 5.8.24版本…