datax之channel学习

news2024/11/26 14:28:32

一般来说datax只需要我们设置reader和writer,但是是什么连接了reader和writer呢?

就是channel! 这个有什么用? 慢慢学习。

core.json

[devuser@cdp-node12 /data/DATA_DIR/share/dataingestion/conf]$ cat core.json 

{
    "entry": {
        "jvm": "-Xms1G -Xmx1G",
        "environment": {}
    },
    "common": {
        "column": {
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
            "timeFormat": "HH:mm:ss",
            "dateFormat": "yyyy-MM-dd",
            "extraFormats":["yyyyMMdd"],
            "timeZone": "GMT+8",
            "encoding": "utf-8"
        }
    },
    "core": {
        "dataXServer": {
            "address": "http://localhost:7001/api",
            "timeout": 10000,
            "reportDataxLog": false,
            "reportPerfLog": false
        },
        "transport": {
            "channel": {
                "class": "com.tencent.s2.dataingestion.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": -1,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.tencent.s2.dataingestion.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },
        "container": {
            "job": {
                "reportInterval": 10000
            },
            "taskGroup": {
                "channel": 5
            },
            "trace": {
                "enable": "false"
            }

        },
        "statistics": {
            "collector": {
                "plugin": {
                    "taskClass": "com.tencent.s2.dataingestion.core.statistics.plugin.task.StdoutPluginCollector",
                    "maxDirtyNumber": 10
                }
            }
        }
    }
}

channel类

    public Channel(final Configuration configuration) {
        //channel的queue里默认record为1万条。原来为512条
        //core.json=512 default=2048
        int capacity = configuration.getInt(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);
        //core.json=-1  最后是1M
        long byteSpeed = configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);
        //core.json=01 最后是10000r
        long recordSpeed = configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
        if (capacity <= 0) {
            throw new IllegalArgumentException(String.format(
                    "通道容量[%d]必须大于0.", capacity));
        }

        synchronized (isFirstPrint) {
            if (isFirstPrint) {
                Channel.LOG.info("Channel set byte_speed_limit to " + byteSpeed
                        + (byteSpeed <= 0 ? ", No bps activated." : "."));
                Channel.LOG.info("Channel set record_speed_limit to " + recordSpeed
                        + (recordSpeed <= 0 ? ", No tps activated." : "."));
                isFirstPrint = false;
            }
        }

        this.taskGroupId = configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
        this.capacity = capacity;
        this.byteSpeed = byteSpeed;
        this.recordSpeed = recordSpeed;
        //core.json=20 default=20
        this.flowControlInterval = configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000);
        //channel的queue默认大小为8M,原来为64M datax自己说的
        //core.json=67108864=64M default=8M
        this.byteCapacity = configuration.getInt(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
        this.configuration = configuration;
    }

再看memoryChannel

public MemoryChannel(final Configuration configuration) {
		super(configuration);
		this.queue = new ArrayBlockingQueue<Record>(this.getCapacity());
		this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);

		lock = new ReentrantLock();
		notInsufficient = lock.newCondition();
		notEmpty = lock.newCondition();
	}

1.根据capacity 设置queue 大小

2.bufferSize = core.transport.exchanger.bufferSize=32

我们再看 BufferedRecordExchanger.java

这个sendToWriter 是reader每读取一条记录就send一下

	@Override
	public void sendToWriter(Record record) {
		if(shutdown){
			throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
		}

		Validate.notNull(record, "record不能为空.");
		//判断单条记录的大小是否超过64M
		if (record.getMemorySize() > this.byteCapacity) {
			this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
			return;
		}
		//判断是否满了 bufferSize=32  或者 总记录数的字节大小+ 当前的记录>64M
		//也就是说32个记录 作为一个buffer 放到channel里去,并不是一个一个的, 32个记录push 或者64M push到channel
		boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
		if (isFull) {
			// flush到channel 然后buffer和memoryBiteSize重置-0
			flush();
		}

		this.buffer.add(record);
		this.bufferIndex++;
		memoryBytes.addAndGet(record.getMemorySize());
	}

可以看到 reader是一条条发的,但是中间搞了一个buffer,buffer里只能装32条数据或者64M的而数据,到了这个界限,就会flush到channel里去,

到了channel就会被writer消费。

这个时候我们就可以考虑 这个buffer=32 是否可以调大?(bufferSize=64M不需要考虑 一般没这么大)

现在我们知道reader是怎么发数据到channel了的。

继续研究writer是怎么从channel里获取数据的?

 这里的recordReceiver就是BufferedRecordExchanger。来看getFromReader()方法

//buffer里有数据,就直接从buffer里消费
//buffer没数据,我就从channel里获取32个数据到buffer,然后一个个消费
@Override
public Record getFromReader() {
   if(shutdown){
      throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
   }
  //这里的这个主要是给writer用的。
  //判断依据是 下标已经读到最后一位了
   boolean isEmpty = (this.bufferIndex >= this.buffer.size());
   if (isEmpty) {
   //如果buffer里为空 我就从channel里获取数据
      receive();
   }
   //然后我开始从下标0开始慢慢读
   Record record = this.buffer.get(this.bufferIndex++);
   if (record instanceof TerminateRecord) {
      record = null;
   }
   return record;
}
//之前把数据都存到channel里了,现在来从channel里获取数据
//然后bufferIndex=0开始从下标为0开始消费 
private void receive() {
   this.channel.pullAll(this.buffer);
   this.bufferIndex = 0;
   this.bufferSize = this.buffer.size();
}
//这个channel.pullAll没啥看的
public void pullAll(final Collection<Record> rs) {
    Validate.notNull(rs);
    this.doPullAll(rs);
    this.statPull(rs.size(), this.getByteSize(rs));
}

//memeoryChannel.doPullAll

@Override
protected void doPullAll(Collection<Record> rs) {
   assert rs != null;
   rs.clear();
   try {
      long startTime = System.nanoTime();
      lock.lockInterruptibly();
      //从channel的queue里获取32个数据
      while (this.queue.drainTo(rs, bufferSize) <= 0) {
         notEmpty.await(200L, TimeUnit.MILLISECONDS);
      }
      waitReaderTime += System.nanoTime() - startTime;
      int bytes = getRecordBytes(rs);
      memoryBytes.addAndGet(-bytes);
      notInsufficient.signalAll();
   } catch (InterruptedException e) {
      throw DataXException.asDataXException(
            FrameworkErrorCode.RUNTIME_ERROR, e);
   } finally {
      lock.unlock();
   }
}

我们梳理下。

reader读数据(可能一条条读可能一批批读)读到的数据一条条的写到buffer里,当buffer=32的时候,再放到channel里去

writer从channel一次次获取32条数据,然后一条条的去给writer去执行。

注意channel的限制是512条或64M

       buffer的限制是32条 或64M

这里感觉就不对了呀!channel和buffer都是64M,但是代码是ali写的我也没法。

总感觉这个reader和writer过程有点不对劲

1.buffer=32 能不能搞大点,一般数据都是百万千万级别

2.writer一次获取1条是不是也很蠢?

个人觉得32是可以搞大点的

一般来说有如下三种情况

reader比writer快,这个时候我们可以加大channel的size,保存更多数据,但感觉用处也不大

reader比writer慢,这个时候可以减小channel的size,因为根本不需要那么多内存,还不如给其他人用

reader=writer。建议加大buffer的size,因为大家都很快buffer不够用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/88886.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

应急物资仓库可视化管理系统-智慧应急物资装备管理系统

1. 项目概述 应急物资仓库可视化系统(智慧物资管理系统 DW-S300)是一套成熟系统&#xff0c;依托互 3D 技术、云计算、大数据、RFID 技术、数据库技术、AI、视频分析技术对 RFID 智能仓库进行统管理、分析的信息化、智能化、规范化的系统。 应急物资&#xff0c;是每当灾害发…

Cyanine7 NHS|CY7-N-羟基琥珀酰亚胺|CY7-NHS ester

Cyanine7 NHS|CY7-N-羟基琥珀酰亚胺|CY7-NHS ester 中文名称&#xff1a;CY7-N-羟基琥珀酰亚胺 英文名称&#xff1a;Cyanine7 NHS 性 状&#xff1a;深绿色粉末 分子量&#xff1a;827.94 Abs/Em Maxima&#xff1a;675/694nm 分子式&#xff1a;C41H46N3NaO10S2 溶解性…

【网站架构】项目越迭代越难、严重延期?那是没处理好变化

大家好&#xff0c;欢迎来到停止重构的频道。 本期&#xff0c;我们讨论网站系统的扩展性。 扩展性指的是网站系统应该如何更好地处理需求变化、版本迭代。 对于有几个项目经验的人来说&#xff0c;可能对这样的问题不以为然&#xff0c;毕竟devops、CI/CD、git、敏捷开发、…

Pytest测试框架(一):Pytest介绍与安装,Pytest编写规则及pytest.main()的参数

Pytest测试框架(1)&#xff1a;Pytest介绍与安装 pytest简介&#xff1a; pytest是python的第三方单元测试框架&#xff0c;比自带的unittest更简洁和高效&#xff0c;同时兼容unittest框架。 它还有如下优点&#xff1a; 1、简单灵活&#xff0c;容易上手&#xff0c;文档丰…

计算机毕业设计django基于python精品课程在线学习系统

项目介绍 在各学校的教学过程中,租房系统是一项非常重要的事情。随着计算机多媒体技术的发展和网络的普及。采用当前流行的B/S模式以及3层架构的设计思想通过Python技术来开发此系统的目的是建立一个配合网络环境的精品课程系统的平台,这样可以有效地解决课程学习系统混乱的局…

线上卡顿监控

文章目录1. 卡顿与ANR的关系2. 卡顿原理3. 卡顿监控3.1 WatchDog3.2 Looper Printer3.2.1 监控TouchEvent卡顿3.2.2 监控IdleHandler卡顿3.2.3 监控SyncBarrier泄漏4. 小结平时看博客或者学知识&#xff0c;学到的东西比较零散&#xff0c;没有独立的知识模块概念&#xff0c;而…

世界杯期间,抖音的涨粉秘诀是什么?

纵览11月抖音涨粉趋势&#xff0c;生活、体育、美食等领域有不少账号迅速圈粉。据新抖『粉丝飙升榜』TOP30显示&#xff0c;11月上榜达人的更替率高达83.3%&#xff0c;其中&#xff0c;有达人凭3条人物随拍视频涨粉千万&#xff1b;有达人凭硬核美食教程&#xff0c;被网友戏称…

网络工程毕业设计 SSM音乐管理系统(源码+论文)

文章目录1 项目简介2 实现效果2.1 界面展示3 设计方案3.1 概述3.2 系统流程3.2.1 系统开发流程3.3 系统结构设计4 项目获取1 项目简介 Hi&#xff0c;各位同学好呀&#xff0c;这里是M学姐&#xff01; 今天向大家分享一个今年(2022)最新完成的毕业设计项目作品&#xff0c;【…

【Python】基础语法 5(字典和文件)

1. 字典 1.1 字典是什么 字典是一种存储键值对的结构。 键值对是计算机/生活中一个非常广泛使用的概念。 把 键(key) 和 值(value) 进行一个一对一的映射, 然后就可以根据键, 快速找到值。 1.2 创建字典 创建一个空的字典&#xff0c;使用 { } 表示字典 a { } b dict() prin…

RabbitMQ之交换机

Exchanges概念 RabbitMQ消息传递模型的核心思想是&#xff1a;生产者生产的消息从不会直接发送到队列。实际上&#xff0c;通常生产者甚至都不知道这些消息传递到了哪些队列中。 相反&#xff0c;生产者只能将消息发送到交换机&#xff0c;交换机工作的内容很简单&#xff0c;…

【数据结构】详解七大排序算法(有源码)

目录☀️直接插入排序☀️希尔排序☀️直接选择排序☀️堆排序☀️冒泡排序☀️快速排序☀️归并排序☀️排序算法复杂度及稳定性分析☀️直接插入排序 1、基本思想 把待排序的数按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所以的记录插入完为止…

【设计模式】装饰者模式Decorator(Java)

文章目录1. 装饰者模式定义2. 类图3.Java实现3.1 饮料Beverage3.2 小料CondimentDecorator3.3 椰果Coconut3.4 红豆RedBean3.5 奶茶MikeTea3.6 果茶JuiceTea3.7 商店主方法StoreMain1. 装饰者模式定义 装饰者模式动态地将责任附加到对象上。若要扩展功能&#xff0c;装饰者提供…

享元模式

文章目录享元模式1.享元模式的本质2.何时选用享元模式3.优缺点4.享元模式的结构5.实现最初实现享元模式初步改造享元模式再改进享元模式再优化享元模式 享元模式最开始看就是类似缓存&#xff0c;缓存一些信息&#xff0c;节约查询时间&#xff0c;以空间换时间 但是再理解后才…

一行代码就能完成的事情,为什么要写两行

今天休息休息&#xff0c;复习一下使用的简洁运算方式以及常用的单行代码 三元运算符 用三元运算符代替简单的if else if (age < 18) {me 小姐姐; } else {me 老阿姨; } 改用三元运算符,一行就能搞定 me age < 18 ? 小姐姐 : 老阿姨; 复杂的判断三元运算符就有点…

这才是最适合新手的python教程(最新版python3.10)

前言 这几年&#xff0c;Python 凭借着语法简洁、跨平台、类库丰富、可扩展、开放源码等特点&#xff0c;成为了 AI 和机器学习时代的第一编程语言。甚至击破铁三角&#xff08; Java、C、C&#xff09;的架构&#xff0c;荣登 TIOBE 榜单的榜首。 身边有不少程序员都选择 Pyt…

LEADTOOLS 入门教程: 检测和提取条形码 - .NET Core

LEADTOOLS是一个综合工具包的集合&#xff0c;用于将识别、文档、医疗、成像和多媒体技术整合到桌面、服务器、平板电脑、网络和移动解决方案中&#xff0c;是一项企业级文档自动化解决方案&#xff0c;有捕捉&#xff0c;OCR&#xff0c;OMR&#xff0c;表单识别和处理&#x…

漏洞丨PDF Explorer 1.5.66.2 - Buffer Overflow

作者&#xff1a;黑蛋 一、漏洞简介 这是一个栈溢出漏洞&#xff0c;一个叫PDF Explorer的软件&#xff08;干嘛的咱没必要知道&#xff09;&#xff0c;他对于用户输入内容长度没有限制造成栈溢出漏洞。 二、漏洞环境 虚拟机 目标程序 调试器 win7x86 PDF Explorer x32…

NeurIPS 2022 | MoVQ: 基于Modulating Quantized Vectors的高保真图像生成

原文标题&#xff1a;MoVQ: Modulating Quantized Vectors for High-Fidelity Image Generation 一、问题提出 虽然两级Vector Quantized (VQ)【指VQVAE-2】生成模型允许合成高保真和高分辨率图像&#xff0c;但它们的量化操作符将图像中的相似patch编码到相同的索引中&#…

2023年MBA联考英语(二)大作文:关于某高校大学生的十大主题

2023年管理类联考倒计时9天&#xff01;在历年的联考英语二作文主题中&#xff0c;大学生群体是时长会出现一期的&#xff0c;这既是对联考中部分专业涉及大学生群体的一种反映&#xff0c;也是因为这个群体的话题对大多数考生来讲都相对熟悉&#xff0c;毕竟都是从这个阶段经历…

世界杯竞猜项目Dapp-第四章(subgraph)

subgraph 是什么 subgraph 索引协议作为 Dapp 领域最重要的基建之一&#xff08;如 uniswap、wave 等都在使用&#xff09;&#xff0c;主要用来做链上数据索引&#xff0c;即在链下对链上事件进行捕捉&#xff08;扫链、计算、存储&#xff09;&#xff0c;然后可对存储下来的…