五、DataX源码分析、性能参数优化

news2025/1/15 17:46:04

DataX源码分析

  • 一、总体流程
  • 二、程序入口
    • 1.datax.py
    • 2.com.alibaba.datax.core.Engine.java
    • 3.切分的逻辑
      • 并发数的确认
    • 3.调度
      • 3.1 确定组数和分组算法
      • 3.2 数据传输
  • 三、DataX性能优化
    • 1.关键参数
    • 2.优化:提升每个 channel 的速度
    • 3.优化:提升 DataX Job 内 Channel 并发数
      • 3.1 配置全局 Byte 限速以及单 Channel Byte 限速
      • 3.2 配置全局 Record 限速以及单 Channel Record 限速
      • 3.3 直接配置 Channel 个数
    • 3.提高 JVM 堆内存

一、总体流程

在这里插入图片描述

  • 黄色: Job 部分的执行阶段
  • 蓝色: Task 部分的执行阶段
  • 绿色:框架执行阶段。

二、程序入口

1.datax.py

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
  • 从这里看出来,java的入口是:com.alibaba.datax.core.Engine

2.com.alibaba.datax.core.Engine.java

  • 路径是:datax/core/src/main/java/com/alibaba/datax/core/Engine.java

参数加载、容器创建、容器启动

public void start(Configuration allConf) {

      // 绑定column转换信息
      ColumnCast.bind(allConf);

      /**
       * 初始化PluginLoader,可以获取各种插件配置
       */
      LoadUtil.bind(allConf);

      boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
              .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
      //JobContainer会在schedule后再行进行设置和调整值
      int channelNumber =0;
      AbstractContainer container;
      long instanceId;
      int taskGroupId = -1;
      if (isJob) {
          // 如果是作业,创建作业容器
          allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
          container = new JobContainer(allConf);
          instanceId = allConf.getLong(
                  CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

      } else {
      	   // 如果不是作业容器,创建作业组容器
          container = new TaskGroupContainer(allConf);
          instanceId = allConf.getLong(
                  CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
          taskGroupId = allConf.getInt(
                  CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
          channelNumber = allConf.getInt(
                  CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
      }

      //缺省打开perfTrace
      boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
      boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);

      //standalone模式的 datax shell任务不进行汇报
      if(instanceId == -1){
          perfReportEnable = false;
      }

      Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
      //初始化PerfTrace
      PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
      perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
      container.start();
 }

容器启动的执行过程

@Override
public void start() {
    LOG.info("DataX jobContainer starts job.");

    boolean hasException = false;
    boolean isDryRun = false;
    try {
        this.startTimeStamp = System.currentTimeMillis();
        isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
        if(isDryRun) {
            LOG.info("jobContainer starts to do preCheck ...");
            this.preCheck();
        } else {
            userConf = configuration.clone();
            LOG.debug("jobContainer starts to do preHandle ...");
            //Job 前置操作
            this.preHandle();

            LOG.debug("jobContainer starts to do init ...");
            //初始化 reader 和 writer
            this.init();
            LOG.info("jobContainer starts to do prepare ...");
            //全局准备工作,比如 odpswriter 清空目标表
            this.prepare();
            LOG.info("jobContainer starts to do split ...");
            // 拆分task是重点要看的
            this.totalStage = this.split();
            LOG.info("jobContainer starts to do schedule ...");
            // 调度是重点要看的
            this.schedule();
            LOG.debug("jobContainer starts to do post ...");
            this.post();

            LOG.debug("jobContainer starts to do postHandle ...");
            this.postHandle();
            LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

            this.invokeHooks();
        }
    } catch (Throwable e) {
        ...
    } finally {
        ...
    }
}

3.切分的逻辑

private int split() {
	// 调整channel数量
       this.adjustChannelNumber();

       if (this.needChannelNumber <= 0) {
           this.needChannelNumber = 1;
       }

       // 切分逻辑:读和写的数量必须要对应。自己写插件的时候需要注意
       List<Configuration> readerTaskConfigs = this
               .doReaderSplit(this.needChannelNumber);
       int taskNumber = readerTaskConfigs.size();
       List<Configuration> writerTaskConfigs = this
               .doWriterSplit(taskNumber);

       List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

       LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
       /**
        * 输入是reader和writer的parameter list,输出是content下面元素的list
        */
       List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
               readerTaskConfigs, writerTaskConfigs, transformerList);


       LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));

       this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);

       return contentConfig.size();
   }

并发数的确认

private void adjustChannelNumber() {
        int needChannelNumberByByte = Integer.MAX_VALUE;
        int needChannelNumberByRecord = Integer.MAX_VALUE;
        
        // 每秒传输的字节数的上限
        // 配置在json文件的 job.setting.speed.byte
        boolean isByteLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
        if (isByteLimit) {
            long globalLimitedByteSpeed = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);

            // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
            Long channelLimitedByteSpeed = this.configuration
                    .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
            if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.CONFIG_ERROR,
                        "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
            }
			
            needChannelNumberByByte =
                    (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
            needChannelNumberByByte =
                    needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
            LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
        }
		
		//这个参数用于设置总TPS(记录每秒)限速。
        // 配置在json文件的 job.setting.speed.record
        boolean isRecordLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
        if (isRecordLimit) {
            long globalLimitedRecordSpeed = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);

            Long channelLimitedRecordSpeed = this.configuration.getLong(
                    CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
            if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
                throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
                        "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
            }

            needChannelNumberByRecord =
                    (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
            needChannelNumberByRecord =
                    needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
            LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
        }

        // 取较小值
        this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
                needChannelNumberByByte : needChannelNumberByRecord;

        // 如果从byte或record上设置了needChannelNumber则退出
        if (this.needChannelNumber < Integer.MAX_VALUE) {
            return;
        }

                 
        // 这个参数用于设置DataX Job内Channel的并发数。
        // 配置在json文件的 job.setting.speed.channel
        boolean isChannelLimit = (this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
        if (isChannelLimit) {
            this.needChannelNumber = this.configuration.getInt(
                    CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

            LOG.info("Job set Channel-Number to " + this.needChannelNumber
                    + " channels.");

            return;
        }

        throw DataXException.asDataXException(
                FrameworkErrorCode.CONFIG_ERROR,
                "Job运行速度必须设置");
    }

3.调度

3.1 确定组数和分组算法

  • datax/core/src/main/java/com/alibaba/datax/core/job/JobContainer.java
private void schedule() {
     /**
      * 这里的全局speed和每个channel的速度设置为B/s
      */
     int channelsPerTaskGroup = this.configuration.getInt(
             CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
     int taskNumber = this.configuration.getList(
             CoreConstant.DATAX_JOB_CONTENT).size();
	
	// this.needChannelNumber参数在split里面计算出来的,和taskNumber任务数 取最小值
     this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
     PerfTrace.getInstance().setChannelNumber(needChannelNumber);

     /**
      * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
      */
	// 公平的分配task到对应的taskGroup中
     List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
             this.needChannelNumber, channelsPerTaskGroup);

     LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());

     ExecuteMode executeMode = null;
     AbstractScheduler scheduler;
     try {
     	executeMode = ExecuteMode.STANDALONE;
         scheduler = initStandaloneScheduler(this.configuration);

         //设置 executeMode
         for (Configuration taskGroupConfig : taskGroupConfigs) {
             taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
         }

         if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
             if (this.jobId <= 0) {
                 throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
                         "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
             }
         }

         LOG.info("Running by {} Mode.", executeMode);

         this.startTransferTimeStamp = System.currentTimeMillis();
         
         // 这里调用了schedule
         scheduler.schedule(taskGroupConfigs);

         this.endTransferTimeStamp = System.currentTimeMillis();
     } catch (Exception e) {
         LOG.error("运行scheduler 模式[{}]出错.", executeMode);
         this.endTransferTimeStamp = System.currentTimeMillis();
         throw DataXException.asDataXException(
                 FrameworkErrorCode.RUNTIME_ERROR, e);
     }

     /**
      * 检查任务执行情况
      */
     this.checkLimit();
 }
  • 如果100个Task和20个Channel,需要几个TaskGroup?
    • 每个TaskGroup默认5个channel,那么需要4个组
public final class JobAssignUtil {
    private JobAssignUtil() {
    }

    public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
        ...
        // 计算需要多少个TaskGrouop组的核心逻辑
        int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
        ...
        // 计算N个Task如何分到这些TaskGroup组
        List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
    }
}

分组算法

	/**
     * /**
     * 需要实现的效果通过例子来说是:
     * <pre>
     * a 库上有表:0, 1, 2
     * b 库上有表:3, 4
     * c 库上有表:5, 6, 7
     *
     * 如果有 4个 taskGroup
     * 则 assign 后的结果为:
     * taskGroup-0: 0,  4,
     * taskGroup-1: 3,  6,
     * taskGroup-2: 5,  2,
     * taskGroup-3: 1,  7
     *
     * </pre>
     */
    private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {
        List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);

        Configuration taskGroupTemplate = jobConfiguration.clone();
        taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);

        List<Configuration> result = new LinkedList<Configuration>();

        List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
        for (int i = 0; i < taskGroupNumber; i++) {
            taskGroupConfigList.add(new LinkedList<Configuration>());
        }

        int mapValueMaxLength = -1;

        List<String> resourceMarks = new ArrayList<String>();
        for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
            resourceMarks.add(entry.getKey());
            if (entry.getValue().size() > mapValueMaxLength) {
                mapValueMaxLength = entry.getValue().size();
            }
        }

        int taskGroupIndex = 0;
        for (int i = 0; i < mapValueMaxLength; i++) {
            for (String resourceMark : resourceMarks) {
                if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
                    int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
                    taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
                    taskGroupIndex++;

                    resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
                }
            }
        }

        Configuration tempTaskGroupConfig;
        for (int i = 0; i < taskGroupNumber; i++) {
            tempTaskGroupConfig = taskGroupTemplate.clone();
            tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
            tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);

            result.add(tempTaskGroupConfig);
        }

        return result;
    }

调度核心代码实现

  • 多线程池
  • datax/core/src/main/java/com/alibaba/datax/core/job/scheduler/AbstractScheduler.java
public abstract class AbstractScheduler {
    private static final Logger LOG = LoggerFactory
            .getLogger(AbstractScheduler.class);

    private ErrorRecordChecker errorLimit;

    private AbstractContainerCommunicator containerCommunicator;

    private Long jobId;

    public Long getJobId() {
        return jobId;
    }

    public AbstractScheduler(AbstractContainerCommunicator containerCommunicator) {
        this.containerCommunicator = containerCommunicator;
    }

    public void schedule(List<Configuration> configurations) {
        ...
        // 核心代码
        startAllTaskGroup(configurations);
        ...
    }
}
  • datax/core/src/main/java/com/alibaba/datax/core/job/scheduler/processinner/ProcessInnerScheduler.java
public abstract class ProcessInnerScheduler extends AbstractScheduler {

    private ExecutorService taskGroupContainerExecutorService;

    public ProcessInnerScheduler(AbstractContainerCommunicator containerCommunicator) {
        super(containerCommunicator);
    }

    @Override
    public void startAllTaskGroup(List<Configuration> configurations) {
        // 使用了线程池
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        for (Configuration taskGroupConfiguration : configurations) {
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }

        this.taskGroupContainerExecutorService.shutdown();
    }
}

3.2 数据传输

  • 线程池执行了TaskGroupContainerRunner对象
  • datax/core/src/main/java/com/alibaba/datax/core/taskgroup/runner/TaskGroupContainerRunner.java
public class TaskGroupContainerRunner implements Runnable {
	@Override
	public void run() {
		try {
            ...
            // 启动了这个组容器
            this.taskGroupContainer.start();
			...
		} catch (Throwable e) {
			...
		}
	}
}
  • 查看一下实现了什么run方法
  • datax/core/src/main/java/com/alibaba/datax/core/taskgroup/TaskGroupContainer.java
public class TaskGroupContainer extends AbstractContainer {
@Override
    public void start() {
        try {
            ...
            while (true) {
            	...
            	
                while(iterator.hasNext() && runTasks.size() < channelNumber){
                    ...
                	TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                    taskStartTimeMap.put(taskId, System.currentTimeMillis());
                    // 这里是真正的执行逻辑
                	taskExecutor.doStart();
                    ...
                }
                ...
            }
            ...
        } catch (Throwable e) {
            ...
        }finally {
            ...
        }
    }

	public void doStart() {
	        // 写的线程启动
            this.writerThread.start();

            // reader没有起来,writer不可能结束
            if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        this.taskCommunication.getThrowable());
            }
			// 读的线程启动
            this.readerThread.start();

            // 这里reader可能很快结束
            if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
                // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        this.taskCommunication.getThrowable());
            }

        }
}
  • writerThread来自WriterRunner,readerThread来自ReaderRunner
    • ReaderRunner跟限速有关系
  • datax/core/src/main/java/com/alibaba/datax/core/taskgroup/runner/ReaderRunner.java
public class ReaderRunner extends AbstractRunner implements Runnable {

   private static final Logger LOG = LoggerFactory
           .getLogger(ReaderRunner.class);

   private RecordSender recordSender;

   public void setRecordSender(RecordSender recordSender) {
       this.recordSender = recordSender;
   }

   public ReaderRunner(AbstractTaskPlugin abstractTaskPlugin) {
       super(abstractTaskPlugin);
   }

   @Override
   public void run() {
       assert null != this.recordSender;

       Reader.Task taskReader = (Reader.Task) this.getPlugin();

       //统计waitWriterTime,并且在finally才end。
       PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
       try {
           channelWaitWrite.start();

           LOG.debug("task reader starts to do init ...");
           PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
           initPerfRecord.start();
           taskReader.init();
           initPerfRecord.end();

           LOG.debug("task reader starts to do prepare ...");
           PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
           preparePerfRecord.start();
           taskReader.prepare();
           preparePerfRecord.end();

           LOG.debug("task reader starts to read ...");
           PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
           // 最核心
           dataPerfRecord.start();
           taskReader.startRead(recordSender);
           recordSender.terminate();

           dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
           dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
           dataPerfRecord.end();

           LOG.debug("task reader starts to do post ...");
           PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
           postPerfRecord.start();
           taskReader.post();
           postPerfRecord.end();
           // automatic flush
           // super.markSuccess(); 这里不能标记为成功,成功的标志由 writerRunner 来标志(否则可能导致 reader 先结束,而 writer 还没有结束的严重 bug)
       } catch (Throwable e) {
           LOG.error("Reader runner Received Exceptions:", e);
           super.markFail(e);
       } finally {
           LOG.debug("task reader starts to do destroy ...");
           PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
           desPerfRecord.start();
           super.destroy();
           desPerfRecord.end();

           channelWaitWrite.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_WRITER_TIME));

           long transformerUsedTime = super.getRunnerCommunication().getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME);
           if (transformerUsedTime > 0) {
               PerfRecord transformerRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME);
               transformerRecord.start();
               transformerRecord.end(transformerUsedTime);
           }
       }
   }

   public void shutdown(){
       recordSender.shutdown();
   }
}
  • 最核心的是 dataPerfRecord.start()
  • 拿一个MySQL的reader案例看:MysqlReader.java
public class MysqlReader extends Reader {

    @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
			
		    // 核心方法
            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }
}
  • 查看 CommonRdbmsReaderTask的startRead函数
public class CommonRdbmsReader {
	public static class Task {
        ...
        public void startRead(Configuration readerSliceConfig,
                              RecordSender recordSender,
                              TaskPluginCollector taskPluginCollector, int fetchSize) {
            ...
            try {
                ...
                while (rs.next()) {
                    rsNextUsedTime += (System.nanoTime() - lastTime);
                    // 核心逻辑:对单个数据的处理逻辑
                    this.transportOneRecord(recordSender, rs,
                            metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
                    lastTime = System.nanoTime();
                }
                ...
            }catch (Exception e) {
                throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
            } finally {
                DBUtil.closeDBResources(null, conn);
            }
        }
    }

	protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
                ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
                TaskPluginCollector taskPluginCollector) {
            Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
            // 每次处理完成,都会发送到writer
            recordSender.sendToWriter(record);
            return record;
        }
}
  • 如何将处理好的数据发送到writer的?
  • datax/core/src/main/java/com/alibaba/datax/core/transport/exchanger/BufferedRecordExchanger.java
public class BufferedRecordExchanger implements RecordSender, RecordReceiver {
	@Override
	public void sendToWriter(Record record) {
		....
		if (isFull) {
		    // 核心代码
			flush();
		}
        ...
	}
   
    @Override
	public void flush() {
		if(shutdown){
			throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
		}
		// 核心代码,将数据通过channel推送给writer
		this.channel.pushAll(this.buffer);
		this.buffer.clear();
		this.bufferIndex = 0;
		this.memoryBytes.set(0);
	}
}
  • 查看channel的pushAll
  • datax/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java
public abstract class Channel {
	public void pushAll(final Collection<Record> rs) {
        Validate.notNull(rs);
        Validate.noNullElements(rs);
        this.doPushAll(rs);
        // rs.size():数据条数
        // this.getByteSize(rs):数据量
        this.statPush(rs.size(), this.getByteSize(rs));
    }

	private void statPush(long recordSize, long byteSize) {
        ...
        if (interval - this.flowControlInterval >= 0) {
            ...

            // 如果是通过速率数限速的
            if (isChannelRecordSpeedLimit) {
                long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
                        CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
                // 当前的速率大于限速的速率,会指定一个睡眠时间
                if (currentRecordSpeed > this.recordSpeed) {
                    // 计算根据recordLimit得到的休眠时间
                    recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
                            - interval;
                }
            }

            // 休眠时间取较大值
            long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
                    recordLimitSleepTime : byteLimitSleepTime;
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            ...
        }
    }
}

三、DataX性能优化

1.关键参数

  • job.setting.speed.channel : channel 并发数
  • job.setting.speed.record : 全局配置 channel 的 record 限速
    • 最终的单个channel的限速 = 全局配置channel的record限速 / channel并发数
  • job.setting.speed.byte:全局配置 channel 的 byte 限速
    • 最终的单个channel的限速 = 全局配置channel的byte限速 / channel并发数
  • core.transport.channel.speed.record:单个 channel 的 record 限速
  • core.transport.channel.speed.byte:单个 channel 的 byte 限速

2.优化:提升每个 channel 的速度

  • 在 DataX 内部对每个 Channel 会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s,可以根据具体硬件情况设置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 Channel 的速度上限配置为 5MB

3.优化:提升 DataX Job 内 Channel 并发数

  • 并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。

3.1 配置全局 Byte 限速以及单 Channel Byte 限速

  • Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速
{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte": 5242880
            }
        },
	 ...
    }
}
  • core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以 Channel个数 = 全局 Byte 限速 / 单 Channel Byte 限速=5242880/1048576=5 个

3.2 配置全局 Record 限速以及单 Channel Record 限速

  • Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速
{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "record": 100
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "record": 500
            }
        },
 	...
    }
}
  • core.transport.channel.speed.record=100 , job.setting.speed.record=500, 所 以 配 置 全 局Record 限速以及单 Channel Record 限速,Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速=500/100=5

3.3 直接配置 Channel 个数

  • 只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。
{
    "job": {
        "setting": {
            "speed": {
                "channel": 5
            }
        },
 	...
    }
}
  • 直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个

3.提高 JVM 堆内存

  • 当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错误,调大 JVM 的堆内存。
  • 建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。
  • 调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1450598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot3 + Vue3 由浅入深的交互 基础交互教学

说明&#xff1a;这篇文章是适用于已经学过SpringBoot3和Vue3理论知识&#xff0c;但不会具体如何实操的过程的朋友&#xff0c;那么我将手把手从教大家从后端与前端交互的过程教学。 目录 一、创建一个SpringBoot3项目的和Vue3项目并进行配置 1.1后端配置: 1.1.1applicatio…

php基础学习之作用域和静态变量

作用域 变量&#xff08;常量&#xff09;能够被访问的区域&#xff0c;变量可以在常规代码中定义&#xff0c;也可以在函数内部定义 变量的作用域 在 PHP 中作用域严格来说分为两种&#xff0c;但是 PHP内部还定义一些在严格意义之外的一种&#xff0c;所以总共算三种—— 局部…

紫微斗数双星组合:廉贞天相在子午

文章目录 前言内容总结 前言 紫微斗数双星组合&#xff1a;廉贞天相在子午 内容 紫微斗数双星组合&#xff1a;廉贞天相在子午 性格分析 廉贞星最喜天相星同度来制其恶&#xff0c;因天相星之水可剋制廉贞星之火。廉贞星最喜天相星同度来制其恶&#xff0c;使廉贞星变为较温…

MySQL 基础知识(六)之数据查询(二)

目录 6 数值型函数 7 字符串函数 8 流程控制函数 9 聚合函数 10 分组查询 (group by) 11 分组过滤 (having) 12 限定查询 (limit) 13 多表查询 13.1 连接条件关键词 (on、using) 13.2 连接算法 13.3 交叉连接 (cross join) 13.4 内连接 (inner join) 13.5 外连接 …

【制作100个unity游戏之25】3D背包、库存、制作、快捷栏、存储系统、砍伐树木获取资源、随机战利品宝箱6(附带项目源码)

效果演示 文章目录 效果演示系列目录前言存储加载物品信息源码完结 系列目录 前言 欢迎来到【制作100个Unity游戏】系列&#xff01;本系列将引导您一步步学习如何使用Unity开发各种类型的游戏。在这第25篇中&#xff0c;我们将探索如何用unity制作一个3D背包、库存、制作、快…

【leetcode994】腐烂的橘子(BFS)

文章目录 一、题目二、思路三、代码 一、题目 二、思路 首先将所有烂橘子入队&#xff0c;然后常规BFS遍历&#xff0c;注意while的截止条件除了队列为空&#xff0c;新鲜橘子数量大于0&#xff08;没新鲜橘子也没必要继续遍历&#xff0c;保证时间计算的正确性&#xff09;&a…

Compose自定义动画API指南

很多动画API都可以自定义其参数达到不同的效果&#xff0c;Compose也提供了相应的API供开发者进行自定义动画规范。 AnimationSpec 主要用存储动画规格&#xff0c;可以自定义动画的行为&#xff0c;在animate*AsState和updateTransition函数中&#xff0c;此函数默认参数为s…

机器学习——聚类问题

&#x1f4d5;参考&#xff1a;西瓜书ysu老师课件博客&#xff08;3&#xff09;聚类算法之DBSCAN算法 - 知乎 (zhihu.com) 目录 1.聚类任务 2.聚类算法的实现 2.1 划分式聚类方法 2.1.1 k均值算法 k均值算法基本原理&#xff1a; k均值算法算法流程&#xff1a; 2.2 基于…

语言与真实世界的关系(超级语言生成能力将促进世界深刻变化)

语言与真实世界之间存在着紧密且复杂的关系。在人类社会中&#xff0c;语言是认知、表达和交流现实世界的主要工具&#xff0c;它帮助我们构建并理解周围环境&#xff0c;并将我们的思维和经验概念化。 1. 符号与指代&#xff1a; 语言是一种符号系统&#xff0c;通过词汇、句…

SG3225EEN晶体振荡器规格书

SG3225EEN 晶振是EPSON/爱普生的一款额定频率25 MHz至500 MHz的石英晶体振荡器&#xff0c;6脚贴片&#xff0c;LV-PECL输出&#xff0c;3225封装常规有源晶振&#xff0c;具有小尺寸&#xff0c;轻薄型&#xff0c;高稳定性&#xff0c;低相位抖动&#xff0c;低电源电压&…

17.3.1.2 曝光

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 基本算法&#xff1a;先定义一个阈值&#xff0c;通常取得是128 原图像&#xff1a;颜色值color&#xff08;R&#xff0c;G&#…

OpenTitan- 开源安全芯片横空出世

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

【前端web入门第六天】02 flex布局

文章目录 Flex布局1.Flex组成2.主轴与侧轴对齐方式2.1 主轴对齐方式(横轴)2.2 侧轴对齐方式 (纵轴)2.3 修改主轴方向 3.弹性伸缩比4.弹性换行与行对齐方式4.1 弹性换行4.2 行对齐方式 Flex布局 1.Flex组成 设置方式:给父元素设置display: flex&#xff0c;子元素可以自动挤压…

找负环(图论基础)

文章目录 负环spfa找负环方法一方法二实际效果 负环 环内路径上的权值和为负。 spfa找负环 两种基本的方法 统计每一个点的入队次数&#xff0c;如果一个点入队了n次&#xff0c;则说明存在负环统计当前每个点中的最短路中所包含的边数&#xff0c;如果当前某个点的最短路所…

【C++】---类和对象(上)入门

一、类的定义 1.那么众所周知&#xff0c;C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解的步骤&#xff0c;通过函数的调用来逐步解决问题 2.而C是基于面向对象的&#xff0c;关注的是对象&#xff0c;将一件事情拆分成不同的对象&#xff0c;靠对象之间交…

【自然语言处理】:实验1布置,Word2VecTranE的实现

清华大学驭风计划 因为篇幅原因实验答案分开上传&#xff0c;答案链接http://t.csdnimg.cn/5cyMG 如果需要详细的实验报告或者代码可以私聊博主 有任何疑问或者问题&#xff0c;也欢迎私信博主&#xff0c;大家可以相互讨论交流哟~~ 实验1&#xff1a; Word2Vec&TranE的…

使用HTML、CSS和JavaScript构建响应式企业官网

使用HTML、CSS和JavaScript构建响应式企业官网&#xff0c;web网页设计与制作-htmlcssjs实现企业官网展示。 页面效果展示 pc端和移动端 动态演示 文件目录 assets文件夹&#xff1a;静态资源目录&#xff0c;主要存放css、fonts、images、js等静态资源文件&#xff1b; fa…

Unity如何修改预制体(预制件)?

文章目录 19 复制复制复制&#xff0c;预制体与变体 19 复制复制复制&#xff0c;预制体与变体 【预制件】 预制件作用&#xff1a;方便复用 【预制件】的制作 直接拖拽&#xff0c;从层级面板 -> 项目面板。层级面板中当前图标会变蓝&#xff0c;子物体名字变蓝色。预制件…

《PCI Express体系结构导读》随记 —— 第II篇 第13章 PCI总线与虚拟化技术(5)

接前一篇文章&#xff1a;《PCI Express体系结构导读》随记 —— 第II篇 第13章 PCI总线与虚拟化技术&#xff08;4&#xff09; 13.1 IOMMU 13.1.3 AMD处理器的IOMMU AMD处理器的IOMMU技术与Intel的VT-d技术类似&#xff0c;其完成的主要功能也类似。AMD率先提出了IOMMU的概念…

(11)Hive调优——explain执行计划

一、explain查询计划概述 explain将Hive SQL 语句的实现步骤、依赖关系进行解析&#xff0c;帮助用户理解一条HQL 语句在底层是如何实现数据的查询及处理&#xff0c;通过分析执行计划来达到Hive 调优&#xff0c;数据倾斜排查等目的。 官网指路&#xff1a; https://cwiki.ap…