PolarDB-X源码解读:DDL的一生(下)

news2024/12/24 3:27:05

概述

在《DDL的一生(上)》中,我们以添加全局二级索引为例,从DDL开发者的视角介绍了如何在DDL引擎框架下实现一个逻辑DDL。在本篇,作者将从DDL引擎的视角出发,向读者介绍DDL引擎的架构、实现,以及DDL引擎与DDL Job的交互逻辑。

在阅读本文之前,建议读者先阅读

  • 《DDL的一生(上)》
  • 《PolarDB-X DDL也要追求ACID?》

DDL引擎相关概念

DDL Job

DDL Job是DDL引擎中的概念,它用于描述一个逻辑DDL。DDL引擎中,一个DDL Job对应一个逻辑DDL,DDL Job内部包含了执行一个逻辑DDL需要的一系列动作,因此在DDL引擎框架下,开发者新支持一条逻辑DDL,实质就是定义一个新的DDL Job。

DDL开发者定义的是静态的DDL Job,然而,DDL Job在运行时,还拥有状态属性。这一属性主要由DDL引擎负责管理。当然,用户也可以执行有限的DDL运维指令以管理DDL Job的状态,实现对DDL执行过程的管理。下图是DDL Job的状态转移图,图中黑色加粗线框代表DDL Job执行的初态和终态,每个DDL Job状态之间的连线上的标注了可以执行的运维指令。

DDL Task

DDL Task是对DDL Job内部一系列行为的封装,如读写metaDb、在内存中计算、进程通信、向DN下发需执行的物理DDL等,这些行为都会被分别封装为DDL Task。因此一个DDL Job是由若干DDL Task构成的,这些Task需要按一定顺序被DDL引擎调度执行,DDL开发者可以使用Polardb-X的DDL引擎提供的DAG图框架描述Task之间的依赖关系和执行顺序。在DDL引擎框架下,开发者定义一个新的DDL Job,实质就是定义若干DDL Task,然后用DAG图把它们组合起来。

DDL Task是DDL引擎实现DDL近似原子性的重要工具,而DDL原子性是DDL引擎追求的目标。执行一条逻辑DDL涉及到一系列操作,原子性要求这些操作要么都全部生效,要么全都不生效。具体来说,DDL引擎要求每个DDL Task都是幂等的,每个Task必须有对应的反向幂等方法(此方法在回滚Task时被DDL引擎调用)。DDL引擎执行DDL之前,会为该DDL生成由DDL Task组成的DAG图,并将其持久化到MetaDb,这相当于保证DDL原子性的undo Log。DDL引擎按照DAG图依次执行Task直到整个DDL Job执行成功或者彻底回滚。

Worker和Leader

在DDL引擎的视角下,CN节点被分为Worker节点和Leader节点(在集群中唯一)。Worker节点负责接收用户发来的DDL请求,它将收到的请求进行简单的本地校验,然后把DDL转换成DDL Job并推送至MetaDb,最后通知Leader节点从MetaDb拉取DDL任务。

Leader节点负责DDL的执行,它从MetaDb拉取到DDL Job后,恢复成DAG图的形式,并对Job中的Task进行拓扑排序,然后按照一定的并行度进行调度、执行Task。

DDL 引擎源码目录

为了方面下文描述,本文先向读者说明DDL引擎源码的目录。PolarDB-X的DDL引擎的源码位于com.alibaba.polardbx.executor.ddl.newengine,各模块说明如下:

子目录或关键类功能
jobjob和task对象的定义
dag通用DAG及拓扑排序的实现,包括节点和图的定义、拓扑排序、DAG的维护和更新
meta读写GMS中的持久化对象的接口,持久化对象包括job和task的状态、系统资源(持久化读写锁)
sync提供sync接口实现Leader节点和Follower节点之间的信息同步
utils线程、线程间通信及线程池的封装
serializablejob和task对象的序列化接口
DdlEngineDagExecutorjob的执行器,包含Task调度、Task状态监测、异常处理的主要逻辑
DdlEngineSchedulerjob的调度器,将job置入执行队列并调用job的执行器
DdlEngineRequesterddl引擎处理ddl请求的入口,持久化ddl job并通知Leader节点处理ddl请求。

例子

下面,本文从DDL引擎的视角出发,向读者展示一条逻辑DDL是如何被DDL引擎调度并执行的。

DDL 任务调度

一条DDL语句由用户端的Mysql Client发出后,Worker节点接收到该DDL语句,经过简单的优化器解析后得到LogicalPlan,然后把该LogicalPlan分派到对应的DDL Handler,这个DDL Handler负责生成DDL Job。然后DDL Handler的公共基类的接口com.alibaba.polardbx.executor.handler.ddl.LogicalCommonDdlHandler#handleDdlRequest处理这个DDL请求,该函数调用com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute方法将之前生成的DDL Job及执行DDL所需的上下文写入MetaDB,并通知Leader节点处理。至此,Worker节点完成了自己的工作,如果该DDL是阻塞型的,Worker节点会等待Leader执行完DDL后,返回Response给用户端;如果该DDL是非阻塞型的,Worker节点会直接返回。

Leader节点上运行着com.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlDispatcherThreadcom.alibaba.polardbx.executor.ddl.newengine.DdlEngineScheduler#ddlSchedulerThread两个线程,它们分别对应着实例级别的DdlJobDispatcher和Schema级别的DdlJobScheduler。其中DdlJobDispatcher从全局唯一的Ddl Request 队列中取出Ddl Request,然后将其分配到Schema级别的Ddl Job队列。DdlJobScheduler是Schema级别的,它负责从Schema级别的Ddl Job队列中不断消费Ddl Job,这个过程中,DdlJobScheduler利用Schema级别的信号量对并行消费Ddl Job的并行度进行控制(同一Schema上的最大线程数为10)。DdlJobScheduler消费Ddl Job,实质上是从Schema级别的Ddl Job队列中取出Ddl Job,然后分派给DdlJobExecutor(Job级别),DdlJobExecutor负责将DDL Job转交给DdlEngineDagExecutor。至此,DDL Job正式进入DDL引擎中的执行器DdlEngineDagExecutor,由后者接管DDL Job的执行。

需要补充说明的是,从上文可以看出DDL引擎支持多个DDL并发执行,为保证需要相同资源的DDL之间互斥执行,DDL引擎提供了持久化的读写锁机制。作为DDL开发者,只需要在定义DDL Job的时候,提前声明该DDL所需的Schema、Table资源。当执行DDL的时候,DDL引擎会在com.alibaba.polardbx.executor.ddl.newengine.DdlEngineRequester#execute生成DDL Job并保存至MetaDB之前,先根据该DDL Job所需的资源进行读写锁的acquire。

DDL 任务执行

DdlEngineDagExecutor负责DDL任务的执行,它会调用restoreAndRun方法,从MetaDb中拉取并恢复DDL Job为DAG形式。然后调用run方法,根据DDL Job的当前状态执行相应的回调方法。

public class DdlEngineDagExecutor {
 
    public static void restoreAndRun(String schemaName, Long jobId, ExecutionContext executionContext){
        boolean restoreSuccess = DdlEngineDagExecutorMap.restore(schemaName, jobId, executionContext);
        DdlEngineDagExecutor dag = DdlEngineDagExecutorMap.get(schemaName, jobId);
        dag.run();
    }
 
    private void run() {
        // Start the job state machine.
        if (ddlContext.getState() == DdlState.QUEUED) {
            onQueued();
        }
        if (ddlContext.getState() == DdlState.RUNNING) {
            onRunning();
        }
        if (ddlContext.getState() == DdlState.ROLLBACK_RUNNING) {
            onRollingBack();
        }
        // Handle the terminated states.
        switch (ddlContext.getState()) {
        case ROLLBACK_PAUSED:
        case PAUSED:
            onTerminated();
            break;
        case ROLLBACK_COMPLETED:
        case COMPLETED:
            onFinished();
            break;
        default:
            break;
        }
 }
}

com.alibaba.polardbx.executor.ddl.newengine.DdlEngineDagExecutor#run会根据DDL Job当前的状态,执行对应的回调方法,这本质上是一个在DDL Job的状态转移图上游走的过程。

DDL Job的初始状态一般为QUEUED,它表示当前被DDL引擎新调度到Schema级别队列。此时run方法会依据此状态调用onQueued()方法。onQueued()方法的作用是将DDL Job的状态修改为RUNNING。

当DDL Job当前的状态是RUNNING时,run方法就会调用onRunning回调方法,按照DAG图的依赖关系执行DDL Job内部的Task。

private void onRunning() {
    while (true) {
        if (hasFailureOnState(DdlState.RUNNING)) {
            if (waitForAllTasksToStop(50L, TimeUnit.MILLISECONDS)) {
                LOGGER.info(String.format("JobId:[%s], all tasks stopped", ddlContext.getJobId()));
                return;
            } else {
                continue;
            }
        }
        if (executingTaskScheduler.isAllTaskDone()) {
            updateDdlState(DdlState.RUNNING, DdlState.COMPLETED);
            return;
        }
        if (executingTaskScheduler.hasMoreExecutable()) {
            // fetch & execute next batch
            submitDdlTask(executingTaskScheduler.pollBatch(), true, executingTaskScheduler);
            continue;
        }
        //get some rest
        sleep(50L);
}

onRunning的流程如下:

  • 先检查当前DDL Job的状态是否为RUNNING,如果不是则直接返回。
  • 检查当前DAG图上是否还有待执行的Task节点,如果没有,则更新Job状态为COMPLETED,然后返回。
  • 如果当前DAG图上存在可以执行的Task,则用拓扑排序的方式,从DAG图上取出所有可执行的Task,按照并行度的限制,调用submitDdlTask方法并发执行。注意,Task并不一定能执行成功,如果有Task执行失败,submitDdlTask方法会按照Task的开发者预先定义的失败策略,修改当前DDL Job的状态。最典型的,当有Task失败时,修改当前DDL Job状态为 PAUSED 或 ROLLBACK_RUNNING。详细的错误处理与恢复机制,将在下一小节介绍。

如果有DDL Job的状态为ROLLBACK_RUNNING,run方法就会调用onRollingBack()回调方法,实现DDL的回滚。相关代码如下

private void onRollingBack() {
    if (!allowRollback()) {
        updateDdlState(DdlState.ROLLBACK_RUNNING, DdlState.ROLLBACK_PAUSED);
        return;
    }
 
    reverseTaskDagForRollback();
 
    // Rollback the tasks.
    while (true) {
        if (hasFailureOnState(DdlState.ROLLBACK_RUNNING)) {
            if (waitForAllTasksToStop(50L, TimeUnit.MILLISECONDS)) {
                LOGGER.info(String.format("JobId:[%s], all tasks stoped", ddlContext.getJobId()));
                return;
            } else {
                continue;
            }
        }
        if (reveredTaskScheduler.isAllTaskDone()) {
            updateDdlState(DdlState.ROLLBACK_RUNNING, DdlState.ROLLBACK_COMPLETED);
            return;
        }
        if (reveredTaskScheduler.hasMoreExecutable()) {
            // fetch & execute next batch
            submitDdlTask(reveredTaskScheduler.pollBatch(), false, reveredTaskScheduler);
            continue;
        }
        //get some rest
        sleep(50L);
    }
}

onRollingBack的流程如下:

  • 首先检查,在当前DAG图的执行进度下,是否允许回滚(一旦越过了fail point task,则不允许回滚)。如果不可回滚,则标记当前DDL Job的状态为PAUSED,然后退出。
  • 当DDL Job的状态为ROLLBACK_RUNNING时,可能还存在其他正在执行中的Task。此时DDL引擎将不再允许新的Task开始执行,并且会等待正在执行中的Task成功或失败,此时该DDL Job就到达了一个一致性的状态。
  • 达了一致性状态后可以开始回滚流程,首先逆转DAG图的所有有向边,使整个DDL Job的执行流程反过来。然后按照逆转后的DAG图进行拓扑排序,取出之前执行完毕或执行过但未完成的Task,执行它们的反向幂等方法。
  • 当DAG图中没有可执行的Task节点时,标记DDL Job状态为ROLLBACK_COMPLETED,回滚成功。

其余状态的回调函数逻辑较为简单,这里不再赘述,请感兴趣的读者自行阅读代码。

错误处理与恢复

DDL引擎追求的目标之一是DDL的原子性,如果在执行DDL的过程中部分Task失败,DDL引擎需要采取适当措施让DDL Job变成完全未执行或执行成功的状态(即状态转移图中的终态)。DDL引擎采取的办法是给Task添加DdlExceptionAction属性,该属性用于指示DDL引擎执行Task出现异常时如何处置。DDL开发者可以在定义DDL Task的时候设置该属性。

DdlExceptionAction一共有4种取值

  • TRY_RECOVERY_THEN_PAUSE:执行该Task出现异常后,重试3次,如果仍失败,则将Task对应的DDL Job状态设置为PAUSED。
  • ROLLBACK:执行Task出现异常后,将该Task所在DDL Job状态设置为ROLLBACK_RUNNING,随后DDL引擎会根据该状态进行回滚DDL。
  • TRY_RECOVERY_THEN_ROLLBACK:执行该Task出现异常后,重试3次,如果仍失败,将该Task所在DDL Job状态设置为ROLLBACK_RUNNING,随后由DDL引擎回滚该DDL。
  • PAUSE:执行该Task出现异常后,将Task对应的DDL Job状态设置为PAUSED。

一般来说,PAUSED状态意味着该DDL Job没有达到终态,需要开发者介入处理,这常用于出现异常后无法恢复的Task,或者对外界产生了影响以致无法回滚的Task。前者举例,如drop table指令,一旦执行了删除元信息或删除物理表的Task,就无法再恢复到删除前的状态了,这时如果某Task失败且重试3次后仍失败,就会导致该DDL Job进入PAUSED状态;后者举例,如Polardb-X中大部分DDL Job都含有一个CDC打标的Task,用于对外生成bin log,该Task执行完成意味着外界已经可以获取相应DDL的bin log,因此无法回滚。

总结

本文从DDL引擎的视角,向读者介绍了DDL引擎的架构、实现,以及DDL引擎与DDL Job的交互逻辑。了解更多关于Polardb-X源码的解析,请持续关注我们后续发布的文章。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/100569.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

应用于高速收发模块的并行光学WDM波分光学技术

光模块的传输距离分为短距、中距、长距。通常短距离传输是指2km以下的传输距离,中距为10-20km。≥30km的则为长距离传输。根据不同的传输距离,光模块类型分为SR(100m)、DR(500m)、FR(2km&#x…

实战三十二:基于knn算法的用户购物消费预测代码+数据

K近邻算法通过计算被分类对象与训练集对象之间的距离,确定其k个临近点,然后使用这k个临近点中最多的分类作为分类结果。 如上图,当K=3时,它会被分类为 Class B。因为K=3时,3个临近点里有2个是B类的。 同理,K=7时它会被分类为 Class A,因为K=7时,7个临近点里4个是A类的…

Clion开发stm32之下载程序记录

Clion开发stm32之下载程序 前提条件 安装openocd安装clion安装arm-gcc环境安装 MinGW(或Mysys2) 注意事项 !!! 开发路径必须要选择英文路径(中文路径会编译不通过的) 说明 这里为了在之后的项目里面使用配置文件。我们需要到openocd提供的board目录下添加自己的配置信息(…

gcc编译

gcc编译可执行程序有4个步骤:预处理、编译、汇编、链接。编译阶段消耗时间、系统资源最多。 从源文件hello.c到目标可执行文件hello,可以按照下面的执行命令,一步一步生成。 gcc -E hello.c -o hello.i gcc -S hello.i -o hello.s gcc -c he…

信息采编功能扩展开发心得

AEAI Portal门户为前端页面集成层而设计,在使用上简单、便捷,即使是非技术人员,通过操作文档也能够很好地将网站配置出来,不需要自身有很强的代码能力。同时门户平台搭配数通畅联的其他产品和组合方案,能够帮助企业快速…

nodejs+vue080大学社团管理系统

本系统主要有社团成员,社团团长和管理人员三个角色。 社团成员可以查看。新闻公告招新信息,并可在招新信息中申请加入喜欢的社团。可以在社团活动中申请自己想要参加的社团活动。 社团团长可以对自己所负责的社团内容进行管理。 管理人员可以对整个系统进…

kafka 的使用原理及通过spring-kafka 自定义封装包的原理

目录: Kafka 封装包接入 1.Kafka 工作原理2.Spring Kafka 介绍3. kafka封装包的设计及使用 Kafka 封装包接入 1.Kafaka 工作原理 1).kafka 的定义: 消息队列的两种模式: 1).点对点模式(一对一,消费者主动拉取数据&…

Arduino框架下联盛德W801开发环境搭建教程

Arduino框架下联盛德W801开发环境搭建教程联盛德W801拥有自己的SDK集成开发工具,能做到这一点非常令人敬佩和了不起。国内好多芯片厂商都需要依托第三方开发工具集来实现对自己产品的开发。多元化开发方式可以满足不同层次开发人员的需求。对于芯片本身来说&#xf…

机器学习100天(十一):011 回归模型评估指标

机器学习100天,今天讲的是:线性回归评估指标! 一、哪个模型更好? 我们之前已经对房价预测的问题构建了线性模型,并对测试集进行了预测。 如图所示,横坐标是地区人口,纵坐标是房价,红色的点是实际样本分布。 使用不同的算法或策略构建了两个线性回归模型,如图,分别是…

Web3 的开发者,如何评估以及选择调用链上数据的解决方案

FP是链上数据分析平台以及数据处理基础设施,使命是让链上数据分析以及使用随手可得。目前,Footprint 从 22 条公链上收集、解析和清理数据,把无语义以及无序的链上数据,转化成让用户能使用无代码拖放界面、SQL等多种形式构建图表以…

TestStand-用户界面

文章目录简易用户界面全功能用户界面除序列编辑器外&#xff0c;TestStand自带的两类用户界面&#xff0c;分别是SimpleUI&#xff08;简易用户界面&#xff09;及Full-Featured UI&#xff08;全功能用户界面&#xff09;。简易用户界面 简易用户界面的源代码位于< TestSt…

[附源码]计算机毕业设计Python的专业技能认证系统(程序+源码+LW文档)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

分享:广电用户服务大数据解决方案

1 方案背景 随着三网融合的深入推进&#xff0c;跨域竞争激烈&#xff0c;电信运营商、互联网企业、硬件设备商以及内容生产商等都将自身定位于视频产业的参与者并获取相应价值&#xff0c;视频服务已经不再成为广电网络运营商的专利&#xff0c;整个视频产业已经开始新一轮颠…

Understanding and Increasing Efficiency of Frank-Wolfe Adversarial Training

AT存在灾难性的过拟合&#xff0c;在训练过程中对抗精度下降&#xff0c;尽管已经提出了改进&#xff0c;但它们增加了训练时间&#xff0c;鲁棒性与多步 AT 相去甚远。我们开发了一个使用 FW 优化 (FW-AT) 进行对抗训练的理论框架&#xff0c;揭示了损失情况与 ℓ∞ FW 攻击的…

vue - vue中的publicPath讲解

vue.config.js里面的publicPath是部署应用包时的基本 URL&#xff1b; 从 Vue CLI 3.3 起baseUrl已被publicPath替代&#xff1b; 如果想要了解vue的环境变量 process.env 可以阅读这篇文章&#xff1a; vue中的process.env.NODE_ENV讲解 1&#xff0c;publicPath publicPath是…

七、ref引用与数组的常用方法

一、ref 引用 1.1、什么是 ref 引用 ref 用来辅助开发者在不依赖于jQuery的情况下&#xff0c;获取 DOM 元素或组件的引用。 每个vue的组件实例上&#xff0c;都包含一个$refs对象&#xff0c;里面存储着对应的 DOM 元素或组件的引用。默认情况下&#xff0c;组件的$refs指向…

语音识别芯片LD3320介绍

语音识别芯片LD3320简介 LD3320 芯片是一款“语音识别”芯片,集成了语音识别处理器和一些外部电路&#xff0c;包括AD、DA 转换器、麦克风接口、声音输出接口等。LD3320不需要外接任何的辅助芯片如Flash、RAM 等&#xff0c;直接集成在LD3320中即可以实现语音识别/声控/人机对…

黑*头条_第6章_kafka及异步通知文章上下架(新版)

黑*头条_第6章_kafka及异步通知文章上下架(新版) 文章目录黑*头条_第6章_kafka及异步通知文章上下架(新版)1)自媒体文章上下架2)kafka概述3)kafka安装配置4)kafka入门5)kafka高可用设计5.1)集群5.2)备份机制(Replication&#xff09;6)kafka生产者详解6.1)发送类型6.2)参数详解…

Linux|Ubuntu-18.04上安装discord(二进制安装)

前言&#xff1a; Discord是由两个沉迷游戏的玩家为了解决游戏种玩家交流问题而开发的一个语音聊天软件&#xff0c;它从游戏社交起家。 起初&#xff0c;Discord被定义为“永远在线的聊天室”&#xff0c;专门为游戏玩家设计&#xff0c;可以简单快捷的加入或退出某个群聊进…

【图像去雾】颜色衰减先验图像去雾【含Matlab源码 2036期】

⛄一、颜色衰减先验去雾算法简介 2015年Zhu等的颜色衰减先验去雾算法利用颜色衰减先验建立有雾图像的景深模型, 采用有监督学习的方式得到模型的参数, 结合大气散射模型得到去雾图像。具体内容如下: 1 大气散射模型 Zhu等的颜色衰减先验去雾算法利用计算机视觉和图形图像领域…