DataX 原理解析和性能优化

news2024/11/26 12:44:46

datax简介

datax是阿里开源的用于异构数据源之间的同步工具,由于其精巧的设计和抽象,数据同步效率极高,在很多公司数据部门都有广泛的使用。本司基于datax在阿里云普通版的rds服务器上实现了通过公网,从阿里云杭州到美国西部俄勒冈aws emr集群峰值30M以上带宽的传输效率。全量传输上亿条记录、大小30G的数据,最快不到30分钟。要知道如果拉跨洋专线的话,1M带宽每个月至少需要1千大洋呢。走公网照样能达到类似的稳定性,本文通过原理设计来阐述我们是如何基于datax做到的。

datax工作原理

在讲解datax原理之前,需要明确一些概念:

  • Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到hive的一个表的特定分区。

  • Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,若干个任务并发执行。或者将一个大表按照id拆分成1024个分片,若干个分片任务并发执行。

  • TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup。

  • JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。

  • TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元。

job和task是datax两种维度的抽象,后面源码分析中还会涉及到。

datax的处理过程可描述为:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

上述流程可图像化描述为:


其中Channel是连接Reader和Writer的数据交换通道,所有的数据都会经由Channel进行传输,一个channel代表一个并发传输通道,通过该通道实现传输速率控制。接下来我们通过源码的角度,在抽取其核心逻辑,以mysql到hdfs的传输为例分析其工作流程。通过分析源码将会有以下几点收获:

  • datax 工作流程

  • datax 插件机制

  • datax 同步实现

datax源码分析

datax 工作流程

public class Engine {
    private static final Logger LOG = LoggerFactory.getLogger(Engine.class);

    private static String RUNTIME_MODE;

    public void start(Configuration allConf) {
        boolean isJob = !("taskGroup".equalsIgnoreCase(allConf.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
        //JobContainer会在schedule后再行进行设置和调整值
        int channelNumber =0;
        AbstractContainer container;
        long instanceId;
        int taskGroupId = -1;
            if (isJob) {
            allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
            container = new JobContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

        } else {
            container = new TaskGroupContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
            taskGroupId = allConf.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
            channelNumber = allConf.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
        }
        container.start();
    }

job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不做实际的数据同步操作

public class JobContainer extends AbstractContainer {
    private static final Logger LOG = LoggerFactory
            .getLogger(JobContainer.class);

    public JobContainer(Configuration configuration) {
        super(configuration);
    }
    /**
     * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler以及destroy和statistics
     */
    @Override
    public void start() {
        LOG.info("DataX jobContainer starts job.");
        try{
            userConf = configuration.clone();
            this.init();
            this.prepare();
            this.totalStage = this.split();
            this.schedule();
        } catch (Throwable e) {
            Communication communication = super.getContainerCommunicator().collect();
            // 汇报前的状态,不需要手动进行设置
            // communication.setState(State.FAILED);
            communication.setThrowable(e);
            communication.setTimestamp(this.endTimeStamp);

            Communication tempComm = new Communication();
            tempComm.setTimestamp(this.startTransferTimeStamp);

            Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
            super.getContainerCommunicator().report(reportCommunication);

            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    /**
     * reader和writer的初始化
    */
    private void init() {
        Thread.currentThread().setName("job-" + this.jobId);

        JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
                this.getContainerCommunicator());
        //必须先Reader ,后Writer
        this.jobReader = this.initJobReader(jobPluginCollector);
        this.jobWriter = this.initJobWriter(jobPluginCollector);
    }

    /**
     *schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
     * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来
     */
    private void schedule() {
        /**
         * 这里的全局speed和每个channel的速度设置为B/s
         */
        int channelsPerTaskGroup = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
        int taskNumber = this.configuration.getList(
                CoreConstant.DATAX_JOB_CONTENT).size();

        this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
        /**
         * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务。
         会考虑 task 中对资源负载作的 load 标识进行更均衡的作业分配操作。
         */
        List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
                this.needChannelNumber, channelsPerTaskGroup);
                        LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());

        AbstractScheduler scheduler;
        try {
            scheduler = initStandaloneScheduler(this.configuration);

            this.startTransferTimeStamp = System.currentTimeMillis();

            scheduler.schedule(taskGroupConfigs);

            this.endTransferTimeStamp = System.currentTimeMillis();
        } catch (Exception e) {
            LOG.error("运行scheduler出错.");
            this.endTransferTimeStamp = System.currentTimeMillis();
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    private AbstractScheduler initStandaloneScheduler(Configuration configuration) {
        AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);
        super.setContainerCommunicator(containerCommunicator);

        return new StandAloneScheduler(containerCommunicator);
    }
}    

public abstract class AbstractScheduler {
    private static final Logger LOG = LoggerFactory
            .getLogger(AbstractScheduler.class);
    public void schedule(List<Configuration> configurations) {
        /**
         * 给 taskGroupContainer 的 Communication 注册
         */
        this.containerCommunicator.registerCommunication(configurations);
        int totalTasks = calculateTaskCount(configurations);
        startAllTaskGroup(configurations);
        try {
            while (true) {
                Communication nowJobContainerCommunication = this.containerCommunicator.collect();
                //汇报周期
                long now = System.currentTimeMillis();
                if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
                    Communication reportCommunication = CommunicationTool
                            .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);

                    this.containerCommunicator.report(reportCommunication);
                     if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
                    LOG.info("Scheduler accomplished all tasks.");
                    break;
                }
                if (nowJobContainerCommunication.getState() == State.FAILED) {
                    dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
                }

                Thread.sleep(jobSleepIntervalInMillSec);
            }
        } catch (InterruptedException e) {
            // 以 failed 状态退出
            LOG.error("捕获到InterruptedException异常!", e);

            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    @Override
    public void startAllTaskGroup(List<Configuration> configurations) {
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        for (Configuration taskGroupConfiguration : configurations) {
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }

        this.taskGroupContainerExecutorService.shutdown();
    }


    @Override
    public void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) {
        this.taskGroupContainerExecutorService.shutdownNow();
    }

}

public class TaskGroupContainer extends AbstractContainer {
    private static final Logger LOG = LoggerFactory
            .getLogger(TaskGroupContainer.class);
            @Override
    public void start() {
        try {
            while (true) {
                //1.判断task状态
                boolean failedOrKilled = false;
                Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
                for(Map.Entry<Integer, Communication> entry :         communicationMap.entrySet()){
                    Integer taskId = entry.getKey();
                    Communication taskCommunication = entry.getValue();
                    if(!taskCommunication.isFinished()){
                        continue;
                    }
                    TaskExecutor taskExecutor = removeTask(runTasks, taskId);
                    if(taskCommunication.getState() == State.FAILED){
                        failedOrKilled = true;
                            break;
                    }
                    else if(taskCommunication.getState() == State.SUCCEEDED){
                                               Long taskStartTime = taskStartTimeMap.get(taskId);
                        if(taskStartTime != null){
                            Long usedTime = System.currentTimeMillis() - taskStartTime;
                            LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
                                    this.taskGroupId, taskId, usedTime);
                            //usedTime*1000*1000 
                            taskStartTimeMap.remove(taskId);
                            taskConfigMap.remove(taskId);
                        }
                    }
                }
                // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
                if (failedOrKilled) {
                    lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

                    throw DataXException.asDataXException(
                            FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
                }
                //3.有任务未执行,且正在运行的任务数小于最大通道限制
                Iterator<Configuration> iterator = taskQueue.iterator();
                while(iterator.hasNext() && runTasks.size() < channelNumber){
                    Configuration taskConfig = iterator.next();
                    Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
                    Configuration taskConfigForRun =taskConfig.clone()
                    TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun);
                    taskStartTimeMap.put(taskId, System.currentTimeMillis());
                    taskExecutor.doStart();
                    terator.remove();
                    runTasks.add(taskExecutor);
                    LOG.info("taskGroup[{}] taskId[{}] is started",
                            this.taskGroupId, taskId);
            }
            //4.任务列表为空,executor已结束, 搜集状态为success--->成功
            if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
                // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
                lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                        lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

                LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
                break;
            }
        } catch (Throwable e) {
            Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();

            if (nowTaskGroupContainerCommunication.getThrowable() == null) {
                nowTaskGroupContainerCommunication.setThrowable(e);
            }
            nowTaskGroupContainerCommunication.setState(State.FAILED);
            this.containerCommunicator.report(nowTaskGroupContainerCommunication);

            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

}
/**
 * TaskExecutor是一个完整task的执行器
 * 其中包括1:1的reader和writer
 */
class TaskExecutor {

    private Thread readerThread;

    private Thread writerThread;

    private ReaderRunner readerRunner;

    private WriterRunner writerRunner;

    public TaskExecutor(Configuration taskConf, int attemptCount) {
        writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
        //生成writerThread
        this.writerThread = new Thread(writerRunner,
                    String.format("%d-%d-%d-writer",
                            jobId, taskGroupId, this.taskId));
        //生成readerThread
        readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
        this.readerThread = new Thread(readerRunner,
                    String.format("%d-%d-%d-reader",
                            jobId, taskGroupId, this.taskId));
    }

    public void doStart() {
        this.writerThread.start();
        // reader没有起来,writer不可能结束
        if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    this.taskCommunication.getThrowable());
        }

        this.readerThread.start();

        // 这里reader可能很快结束
        if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
            // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    this.taskCommunication.getThrowable());
        }
    }
}

从上面总体流程中可以看到JobContainer通过线程池调度起所有的TaskGroupContainer,然后轮训TaskGroupContainer的运行状态。每个TaskGroupContainer则是根据分配的chanel并发数量依次执行分配的Task实例。

插件机制

在工作流程中的init步骤,我们看到的jobReader和jobWriter的实现就是通过插件动态生成的。jobReader和jobWriter就对应datax中的Job概念模型。而在TaskExecutor中初始化的readerRunner和writerRunner对应的是Task模型。通过插件datax插件机制支持了数十种不同的数据源之间的读写同步,同时也很方便的支持新的数据源接入。

Job初始化过程

public class JobContainer extends AbstractContainer {

    //reader job的初始化,返回Reader.Job
    private Reader.Job initJobReader(
            JobPluginCollector jobPluginCollector) {
        this.readerPluginName = this.configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);

        Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
                PluginType.READER, this.readerPluginName);

        // 设置reader的jobConfig
        jobReader.setPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

        // 设置reader的readerConfig
        jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

        jobReader.setJobPluginCollector(jobPluginCollector);
        jobReader.init();

        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return jobReader;
    }
}  

插件加载器,大体上分reader、transformer(还未实现)和writer三中插件类型,
reader和writer在执行时又可能出现Job和Task两种运行时(加载的类不同)

public class LoadUtil {
    //加载JobPlugin,reader、writer都可能要加载
    public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
                                                  String pluginName) {
        Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
                pluginType, pluginName, ContainerType.Job);

        try {
            AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
                    .newInstance();
            jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
            return jobPlugin;
        } catch (Exception e) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    String.format("DataX找到plugin[%s]的Job配置.",
                            pluginName), e);
        }
    }

    //反射出具体plugin实例
    private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
            PluginType pluginType, String pluginName,
            ContainerType pluginRunType) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);
        JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
        try {
            return (Class<? extends AbstractPlugin>) jarLoader
                    .loadClass(pluginConf.getString("class") + "$"
                            + pluginRunType.value());
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

    public static synchronized JarLoader getJarLoader(PluginType pluginType,
                                                      String pluginName) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);

        JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
                pluginName));
        if (null == jarLoader) {
            String pluginPath = pluginConf.getString("path");
            if (StringUtils.isBlank(pluginPath)) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        String.format(
                                "%s插件[%s]路径非法!",
                                pluginType, pluginName));
            }
            jarLoader = new JarLoader(new String[]{pluginPath});
            jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
                    jarLoader);
        }

        return jarLoader;
    }

}

//提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
public class JarLoader extends URLClassLoader {
    public JarLoader(String[] paths) {
        this(paths, JarLoader.class.getClassLoader());
    }

    public JarLoader(String[] paths, ClassLoader parent) {
        super(getURLs(paths), parent);
    }

    private static URL[] getURLs(String[] paths) {
        Validate.isTrue(null != paths && 0 != paths.length,
                "jar包路径不能为空.");

        List<String> dirs = new ArrayList<String>();
        for (String path : paths) {
            dirs.add(path);
            JarLoader.collectDirs(path, dirs);
        }

        List<URL> urls = new ArrayList<URL>();
        for (String path : dirs) {
            urls.addAll(doGetURLs(path));
        }

        return urls.toArray(new URL[0]);
    }

    private static void collectDirs(String path, List<String> collector) {
        if (null == path || StringUtils.isBlank(path)) {
            return;
        }

        File current = new File(path);
        if (!current.exists() || !current.isDirectory()) {
            return;
        }

        for (File child : current.listFiles()) {
            if (!child.isDirectory()) {
                continue;
            }

            collector.add(child.getAbsolutePath());
            collectDirs(child.getAbsolutePath(), collector);
        }
    }
}

Task 初始化过程

class TaskExecutor {
private AbstractRunner generateRunner(PluginType pluginType) {
            return generateRunner(pluginType, null);
        }

        private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
            AbstractRunner newRunner = null;
            TaskPluginCollector pluginCollector;

            switch (pluginType) {
                case READER:
                    newRunner = LoadUtil.loadPluginRunner(pluginType,
                            this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
                    newRunner.setJobConf(this.taskConfig.getConfiguration(
                            CoreConstant.JOB_READER_PARAMETER));

                    pluginCollector = ClassUtil.instantiate(
                            taskCollectorClass, AbstractTaskPluginCollector.class,
                            configuration, this.taskCommunication,
                            PluginType.READER);

                    RecordSender recordSender;
                    if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
                        recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
                    } else {
                        recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
                    }

                    ((ReaderRunner) newRunner).setRecordSender(recordSender);

                    /**
                     * 设置taskPlugin的collector,用来处理脏数据和job/task通信
                     */
                    newRunner.setTaskPluginCollector(pluginCollector);
                    break;
                case WRITER:
                    newRunner = LoadUtil.loadPluginRunner(pluginType,
                            this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
                    newRunner.setJobConf(this.taskConfig
                            .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));

                    pluginCollector = ClassUtil.instantiate(
                            taskCollectorClass, AbstractTaskPluginCollector.class,
                            configuration, this.taskCommunication,
                            PluginType.WRITER);
                    ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
                            this.channel, pluginCollector));
                    /**
                     * 设置taskPlugin的collector,用来处理脏数据和job/task通信
                     */
                    newRunner.setTaskPluginCollector(pluginCollector);
                    break;
                default:
                    throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
            }

            newRunner.setTaskGroupId(taskGroupId);
            newRunner.setTaskId(this.taskId);
            newRunner.setRunnerCommunication(this.taskCommunication);

            return newRunner;
        }
}

public class LoadUtil {
  /**
     * 根据插件类型、名字和执行时taskGroupId加载对应运行器
     *
     * @param pluginType
     * @param pluginName
     * @return
     */
    public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {
        AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,
                pluginName);

        switch (pluginType) {
            case READER:
                return new ReaderRunner(taskPlugin);
            case WRITER:
                return new WriterRunner(taskPlugin);
            default:
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        String.format("插件[%s]的类型必须是[reader]或[writer]!",
                                pluginName));
        }
    }
}

同步实现

这部分就是经过split后的具体的Task的执行逻辑。Task的划分逻辑如下:

public class JobContainer extends AbstractContainer {
    private static final Logger LOG = LoggerFactory
            .getLogger(JobContainer.class);
    /**
     * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
     * 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,
     * 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉
     */
    private int split() {
        this.adjustChannelNumber();

        if (this.needChannelNumber <= 0) {
            this.needChannelNumber = 1;
        }

        List<Configuration> readerTaskConfigs = this
                .doReaderSplit(this.needChannelNumber);
        int taskNumber = readerTaskConfigs.size();
        List<Configuration> writerTaskConfigs = this
                .doWriterSplit(taskNumber);

        List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

        LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
        /**
         * 输入是reader和writer的parameter list,输出是content下面元素的list
         */
        List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
                readerTaskConfigs, writerTaskConfigs, transformerList);


        LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));

        this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);

        return contentConfig.size();
    }

}

 每个Task都执行相同的逻辑和流程,下面以读mysql和写hdfs为例,展示其读写过程。

 //单个slice的reader执行调用
public class ReaderRunner extends AbstractRunner implements Runnable {
   @Override
    public void run() {
        Reader.Task taskReader = (Reader.Task) this.getPlugin();
        taskReader.init();
        taskReader.prepare();
        taskReader.startRead(recordSender);
        recordSender.terminate();
    }
}

public class MysqlReader extends Reader {
        @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);

            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }
}

public class CommonRdbmsReader {
        public static class Task {
        private static final Logger LOG = LoggerFactory
                .getLogger(Task.class);
        public void startRead(Configuration readerSliceConfig,
                              RecordSender recordSender,
                              TaskPluginCollector taskPluginCollector, int fetchSize) {
            String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
            String table = readerSliceConfig.getString(Key.TABLE);

            PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);

            LOG.info("Begin to read record by Sql: [{}\n] {}.",
                    querySql, basicMsg);

            Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
                    username, password);
            int columnNumber = 0;
            ResultSet rs = null;
            try {
                rs = DBUtil.query(conn, querySql, fetchSize);
                while (rs.next()) {
                    //将数据记录放入channel通道,writer从中获取写数据
                    this.transportOneRecord(recordSender, rs,
                            metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
                }
            }catch (Exception e) {
                throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
            } finally {
                DBUtil.closeDBResources(null, conn);
            }
        }
    }
}

 //单个slice的writer执行调用

public class WriterRunner extends AbstractRunner implements Runnable {
    @Override
    public void run() {
        Writer.Task taskWriter = (Writer.Task) this.getPlugin();
        taskWriter.init();
        taskWriter.prepare();
        taskWriter.startWrite(recordReceiver);
    }
}

public class HdfsWriter extends Writer {
    public static class Task extends Writer.Task {
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);

        @Override
        public void startWrite(RecordReceiver lineReceiver) {
            LOG.info("begin do write...");
            LOG.info(String.format("write to file : [%s]", this.fileName));
            if(fileType.equalsIgnoreCase("TEXT")){
                //写TEXT FILE
                hdfsHelper.textFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            }else if(fileType.equalsIgnoreCase("ORC")){
                //写ORC FILE
                hdfsHelper.orcFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            }
            LOG.info("end do write");
        }
    }
}

public  class HdfsHelper {
    public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,TaskPluginCollector taskPluginCollector){
        try {
            RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {
                MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector);
                if (!transportResult.getRight()) {
                    writer.write(NullWritable.get(),transportResult.getLeft());
                }
            }
            writer.close(Reporter.NULL);
        } catch (Exception e) {
            String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
            LOG.error(message);
            Path path = new Path(fileName);
            deleteDir(path.getParent());
            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
        }
    }
}

reader和writer通过BufferedRecordExchanger建立联系,在其内部实现了基于ArrayBlockingQueue的MemoryChannel。

public class BufferedRecordExchanger implements RecordSender, RecordReceiver {
    @Override
    public void sendToWriter(Record record) {
        if(shutdown){
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }

        Validate.notNull(record, "record不能为空.");

        if (record.getMemorySize() > this.byteCapacity) {
            this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
            return;
        }

        boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
        if (isFull) {
            flush();
        }

        this.buffer.add(record);
        this.bufferIndex++;
        memoryBytes.addAndGet(record.getMemorySize());
    }

    @Override
    public void flush() {
        if(shutdown){
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        this.channel.pushAll(this.buffer);
        this.buffer.clear();
        this.bufferIndex = 0;
        this.memoryBytes.set(0);
    }

    @Override
    public Record getFromReader() {
        if(shutdown){
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        boolean isEmpty = (this.bufferIndex >= this.buffer.size());
        if (isEmpty) {
            receive();
        }

        Record record = this.buffer.get(this.bufferIndex++);
        if (record instanceof TerminateRecord) {
            record = null;
        }
        return record;
    }

datax性能优化

通过datax原理和实现的理解,自然可以知道如何提升datax的同步效率。以mysql同步hdfs为例,自然最直接的方式就是提高mysql和hdfs的硬件性能如cpu、内存、IOPS、网络带宽等。当硬件资源受限的情况下,可以有如下几种办法:

  1. 将不同的集群划分到同一个网络或者区域内,减少跨网络的不稳定性,如将阿里云集群迁移到amazon集群,或者同一个amazon集群中不同区域划分到同一个子网络内。

  2. 对数据库按照主键划分。datax对单个表默认一个通道,如果指定拆分主键,将会大大提升同步并发数和吞吐量。

  3. 在cpu、内存以及mysql负载满足的情况下,提升通道并发数。通道并发数意味着更多的内存开销,jvm调优是重中之重。

  4. 当无法提升通道数量时,而且每个拆分依然很大的时候,可以考虑对每个拆分再次拆分。

  5. 设定合适的参数,如mysql超时等。

总结

本文通过原理介绍和源码分析,逐步理清datax的工作流程和实现原理,并结合实际经验给出几点优化建议。然而在实际中涉及到更多的分库分表和特大量级的表,数据库的承载压力也是一大考虑因素,否则遭到dba的吊打肯定会在所难免。尤其是我们涉及到跨大洋数据同步,网络的稳定性也是一大挑战,此时基于增量同步方案或许是更好的选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90254.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一种基于Spark深度随机森林的网络入侵检测模型

一种基于Spark深度随机森林的网络入侵检测模型学习目标学习内容目前存在的不足为了解决这个问题特征分片深度并行随机森林Deep Parallel Random Forest(DPRF)投票策略Spark 上的并行化高复用缓存计算每个RDD的权重分层替换模型评估局限性参考论文申明&#xff1a; 未经许可&…

ASP.NET Core Web API 学习笔记

目录 一、Demo 1. 创建项目 2. 启动项目 3. 编写 api demo 二、C# .NET WEB 程序结构 一、Demo 1. 创建项目 创建的项目结构如下: Properties 配置文件&#xff0c;存放了一些 .json 文件用于配置 ASP.NET Core 项目 Propertics/launchSettings.json 启动配置文件&…

以岭药业:连花清瘟火爆背后,数字化重塑人力资源管理

近日&#xff0c;随着疫情防控政策“国十条”发布&#xff0c;新冠防疫政策逐步进入后防疫时代&#xff0c;每个人要做自己健康的“第一责任人”。而连花清瘟作为中医药治疗新冠肺炎筛选出的“三药三方”之一&#xff0c;也成为新疫情防控模式下的家庭常备药。连花清瘟自上市以…

怎么将图片内容转换成文字?这两种方法可以轻松实现

如何将图片的内容转换成文字呢&#xff1f;大家在使用图片文件的时候&#xff0c;遇到那种图片中包含一些有用的文字信息时&#xff0c;没有办法直接复制下来使用&#xff0c;只能对照着图片将文字信息给记录下来&#xff0c;这样会很耗费我们的时间。其实是有方法能够直接将图…

Python学习中常见的几个报错,看你踩雷没

前言 嗨嗨 今天给大家统计一下, 在学习Python中遇到的常见报错, 不一定会很全面, 但是应该会持续更新, 有用的话, 记得收藏哦~ 下面我会总结遇到的报错截图, 给出解决办法, 如果有需要补充的报错, 或者你解决不了的报错, 欢迎来文章最下方QQ群里面找我~ 1. 模块未安装 (Mod…

面试中这样介绍自己的项目经验,轻松拿Offer

面试时7分靠能力&#xff0c;3分靠技能&#xff0c;而刚开始时的介绍项目又是技能中的重中之重&#xff0c;所以本文将从“介绍”和“引导”两大层面告诉大家如何准备面试时的项目介绍。 在面试时&#xff0c;经过寒暄后&#xff0c;一般面试官会让介绍项目经验 。常见的问法是…

Go C 编程 第4课 变色魔法(魔法学院的奇幻之旅 有Go C编程绘图)

慧通教育 慧通教育 34.画彩色旗帜 (魔法学院第4课)--2022.12.15 登录 35.画转动的方形 (魔法学院第4课) 登录 36.画wifi信号 (魔法学院第4课) 登录 888.哪个大&#xff1f; (课程6&#xff09; 难度&#xff1a;1 登录 889.余数大小 (课程6) 难度&#xff1a;1 登录 适合…

Java学习—网络编程

网络编程 目的&#xff1a;数据交换、通信 1. 网络通信的要素 通信双方地址&#xff08;IP端口号&#xff09;网络通信协议 Java万物皆对象 2. IP地址 IP地址的类&#xff1a;InetAddress 唯一定位一台网络上的计算机127.0.0.1 本机地址 package com.xiaozhang.lesson01;…

第二证券|钠电池三种技术路线谁更将率先取代锂电池?

锂电网讯&#xff1a;近段时刻&#xff0c;钠离子电池遭到资本商场重视&#xff0c;不少公司公布了在钠电池范畴的最新进展。11月29日&#xff0c;中科海钠(阜阳)全球首条GWh级钠离子电池出产线产品下线。中科海钠总经理李树军透露&#xff0c;阜阳产线计划在下一年扩产至3GWh-…

大数据学习:shell基础(2)

文章目录tail命令选项参数任务一&#xff1a;显示文件最后4行内容任务二&#xff1a;显示文件最后4个字符内容任务三&#xff1a;显示文件修改行sort命令选项参数任务一&#xff1a;对文件按行排序任务二&#xff1a;对文件按第4节排序cut命令参数说明任务一&#xff1a;提取ip…

想要查询数据表中的前几名该怎么实现?看这篇文章吧

一. 需求分析 我们在学习数据库查询时&#xff0c;经常会遇到关于分组和聚合函数的查询&#xff0c;比如查询每门课程的最高分&#xff0c;每位同学的平均分&#xff0c;其实这些都是比较一般的问题。但如果遇到查询每门课程成绩的前几名问题&#xff0c;就会变的很棘手&#…

【深度学习】Tensorflow、MindSpore框架介绍及张量算子操作实战(超详细 附源码)

一、Tensorflow、MindSpore Google公司于2015年开源了深度学习框架TensorFlow&#xff0c;推动了深度学习的发展&#xff0c;得到了广泛应用&#xff0c;用户数量庞大。 华为公司于2020年开源了自己的深度学习框架MindSpore&#xff0c;现处于快速发展中。 TensorFlow2深度学…

直播 | 新一代极速云原生湖仓的技术内核,StarRocks PMC 今天下午为你揭秘!

12 月 14-16 日&#xff0c;第 13 届中国数据库技术大会&#xff08;DTCC 2022&#xff09;将在线上隆重召开。本届大会重点围绕云原生数据库、分布式数据库、时序数据库、图数据技术、实时数仓技术与应用实践、金融业数据库应用实践等内容展开分享和探讨。 12 月 14 日 14:20…

ESP32 与 ESP32-CAM 的关系

ESP32 与 ESP32-CAM 的关系 以下分别介绍 ESP32 与 ESP32-CAM&#xff0c;两者之间的关系可以简单用下表来说明&#xff1a; ESP32ESP32-CAM开发公司乐鑫信息科技安信可科技模块关系ESP32 系列 ( 共有ESP32-S 系列、ESP32-C 系列与ESP32 系列)ESP32 系列中的ESP32-WROVER 模组…

(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!

引言 现如今的开发环境中&#xff0c;分布式/微服务架构大行其道&#xff0c;而分布式/微服务的根基在于网络编程&#xff0c;而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过&#xff0c;其在Java网络编程中的地位&#xff0c;好比JavaEE中的Sprin…

Java中的二维数组

一、介绍 应用场景&#xff1a;比如我们要开发一个五子棋游戏&#xff0c;期盼就是需要用二维数组来表示。即一维数组的每一个元素也是数组。 例子&#xff1a;请用二维数组输出如下图形&#xff1a; 000000 001000 020300 000000 package com.hspedu.array;public class Tw…

Java做UI自动化和app自动化中动态代理@FindBy的工作原理【杭州多测师_王sir】【杭州多测师】...

Java做UI自动化和app自动化中动态代理FindBy的工作原理一、背景简介由于Selenium框架采用PageObject设计模式让测试代码与被测页面对象代码分离&#xff0c;因而提供了不少很方便的注解来达到目的&#xff0c;其中有一个注解就是FindBy。在使用中&#xff0c;只要通过在field中…

ruoyi 在页面上增加一个显示字段(数据库增加字段,页面后端处理)

例如&#xff0c;上图所示&#xff0c; 【用户工资表】现有字段为用户id&#xff0c;用户工资&#xff0c;生效时间&#xff0c;备注信息 这些字段也就是你设计数据库时&#xff0c;数据库工资表字段 &#xff0c;现在&#xff0c;要让显示这个页面&#xff0c;增加一个用户姓…

Flutter for Web 首次首屏优化——JS 分片优化

作者&#xff1a;马坤乐(坤吾) Flutter for Web&#xff08;FFW&#xff09;从 2021 年发布至今&#xff0c;在国内外互联网公司已经得到较多的应用。作为 Flutter 技术在 Web 领域的有力扩充&#xff0c;FFW 可以让熟悉 Flutter 的客户端同学直接上手写 H5&#xff0c;复用 A…

2023年有哪些半入耳蓝牙耳机?半入耳式蓝牙耳机排行榜

工作生活中最常用的真无线蓝牙耳机来说&#xff0c;各式各样、价格悬殊的产品&#xff0c;很多人不知道该如何选择&#xff0c;半入耳式的佩戴舒适度一直都是公认的好&#xff0c;下面小编分享几个性能表现、续航时间都非常优秀的半入耳式蓝牙耳机 TOP1:南卡小音舱蓝牙耳机 音…