DATAX hdfsreader orc格式读取数据丢失问题

news2025/1/11 10:14:13

最近做一个数据同步任务,从hive仓库同步数据到pg,Hive有4000w多条数据,但datax只同步了280w就结束了,也没有任何报错。

看了下datax源码,找到HdfsReader模块DFSUtil核心实现源码读取orc格式的文件方法:

public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                             RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
    LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
    List<ColumnEntry> column = UnstructuredStorageReaderUtil
            .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
    String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
    StringBuilder allColumns = new StringBuilder();
    StringBuilder allColumnTypes = new StringBuilder();
    boolean isReadAllColumns = false;
    int columnIndexMax = -1;
    // 判断是否读取所有列
    if (null == column || column.size() == 0) {
        int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
        columnIndexMax = allColumnsCount - 1;
        isReadAllColumns = true;
    } else {
        columnIndexMax = getMaxIndex(column);
    }
    for (int i = 0; i <= columnIndexMax; i++) {
        allColumns.append("col");
        allColumnTypes.append("string");
        if (i != columnIndexMax) {
            allColumns.append(",");
            allColumnTypes.append(":");
        }
    }
    if (columnIndexMax >= 0) {
        JobConf conf = new JobConf(hadoopConf);
        Path orcFilePath = new Path(sourceOrcFilePath);
        Properties p = new Properties();
        p.setProperty("columns", allColumns.toString());
        p.setProperty("columns.types", allColumnTypes.toString());
        try {
            OrcSerde serde = new OrcSerde();
            serde.initialize(conf, p);
            StructObjectInspector inspector = (StructObjectInspector)                 serde.getObjectInspector();
            InputFormat<?, ?> in = new OrcInputFormat();
            FileInputFormat.setInputPaths(conf, orcFilePath.toString());

            //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
            //Each file as a split
            //TODO multy threads
            InputSplit[] splits = in.getSplits(conf, -1);

            RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
            Object key = reader.createKey();
            Object value = reader.createValue();
            // 获取列信息
            List<? extends StructField> fields = inspector.getAllStructFieldRefs();

            List<Object> recordFields;
            while (reader.next(key, value)) {
                recordFields = new ArrayList<Object>();

                for (int i = 0; i <= columnIndexMax; i++) {
                    Object field = inspector.getStructFieldData(value, fields.get(i));
                    recordFields.add(field);
                }
                transportOneRecord(column, recordFields, recordSender,
                        taskPluginCollector, isReadAllColumns, nullFormat);
            }
            reader.close();
        } catch (Exception e) {
            String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                    , sourceOrcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }
    } else {
        String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
        throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
    }

}

 读文件时 由于hdfs文件存储 是block 形式的。当单个文件 大于 单个block 的size时,出现一个文件 多个block 存储,仅读取了第一个block,造成了数据的部分丢失。

改成如下,用for循环分割块读取数据

 public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                                 RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector)                 serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, -1);
                for (InputSplit split : splits) {
                    RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                    Object key = reader.createKey();
                    Object value = reader.createValue();
                    // 获取列信息
                    List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                    List<Object> recordFields;
                    while (reader.next(key, value)) {
                        recordFields = new ArrayList<Object>();

                        for (int i = 0; i <= columnIndexMax; i++) {
                            Object field = inspector.getStructFieldData(value, fields.get(i));
                            recordFields.add(field);
                        }
                        transportOneRecord(column, recordFields, recordSender,
                                taskPluginCollector, isReadAllColumns, nullFormat);
                    }
                    reader.close();
                }
            } catch (Exception e) {
                String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                        , sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }

    }

然后重新打包成jar,替换datax引擎的hdfsreader-0.0.1-SNAPSHOT.jar

注意:打包的时候可能会提示maven仓库找不到plugin-unstructured-storage-util-0.0.1-SNAPSHOT.jar

解决办法:从datax引擎hdfsreader/lib中拷贝出plugin-unstructured-storage-util-0.0.1-SNAPSHOT.jar,在hdfsreader模块下面建一个lib目录放进去,然后在pom中本地引用。

<dependency>
    <groupId>com.unstrucatured</groupId>
    <artifactId>unstrucatured</artifactId>
    <version>200</version>
    <scope>system</scope>
    <systemPath>${basedir}/src/main/lib/plugin-unstructured-storage-util-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>

改完后实测完美解决!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/467261.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

应用运行环境实时洞察,亚马逊云科技Cisco AppDynamics展优势

Cisco AppDynamics(APM)产品&#xff0c;现已正式上线亚马逊云科技Marketplace&#xff08;中国区域&#xff09;。可以通过亚马逊云科技Marketplace&#xff08;中国区域&#xff09;网站&#xff0c;灵活便捷地部署该解决方案&#xff0c;以便充分利用云原生APM(应用性能管理…

(上)苹果有开源,但又怎样呢?

苹果&#xff08;Apple Inc.&#xff09;有多伟大&#xff0c;我相信已经无需赘述了。但是&#xff0c;这里的伟大是指用产品和理念对行业进行的革命性颠覆&#xff0c;而不是对开源而言。 相反&#xff0c;在某种程度上&#xff0c;苹果几乎就是开源的反义词。这种骨子里的 “…

8个Wireshark使用技巧

一&#xff1a;数据包过滤 过滤需要的IP地址 ip.addr 在数据包过滤的基础上过滤协议ip.addrxxx.xxx.xxx.xxx and tcp 过滤端口ip.addrxxx.xxx.xxx.xxx and http and tcp.port80 指定源地址 目的地址ip.srcxxx.xxx.xxx.xxx and ip.dstxxx.xxx.xxx.xxx SEQ字段&#xff08;序列号…

浅谈 git 底层工作原理

浅谈 git 底层工作原理 系统复习到这里也快差不多了&#xff0c;大概就剩下两三个 sections&#xff0c;这里学习一下 git 的 hashing 和对象。 当然&#xff0c;跳过问题也不大。 config 文件 这里还是会用 redux 的项目&#xff0c;先看一下基本信息&#xff1a; ➜ re…

短视频矩阵系统---开发技术源码能力

短视频矩阵系统开发涉及到多个领域的技术&#xff0c;包括视频编解码技术、大数据处理技术、音视频传输技术、电子商务及支付技术等。因此&#xff0c;短视频矩阵系统开发人员需要具备扎实的计算机基础知识、出色的编程能力、熟练掌握多种开发工具和框架&#xff0c;并掌握音视…

制冷暖通工业互联网平台孵化

制冷暖通工业互联网平台孵化可以帮助初创企业或者创新项目快速建立和推广制冷暖通工业互联网平台。以下是一些常见的制冷暖通工业互联网平台孵化服务&#xff1a; 创业辅导&#xff1a;孵化器提供创业辅导服务&#xff0c;帮助企业或者项目找到合适的市场和商业模式&#xff0c…

sd卡中病毒的表现及sd文件消失后的恢复方法

sd卡在日常使用中十分常见&#xff0c;但有时也会发生一些意外情况。例如&#xff0c;不小心意外感染病毒&#xff0c;导致sd卡中存储的文件消失。那么对于丢失的文件&#xff0c;我们该如何恢复呢&#xff1f;下面将带您了解sd卡中病毒的表现以及sd卡文件消失怎么恢复的方法。…

【C语言】学习路线大纲思维导图

思维导图下载地址&#xff1a;点击跳转   配套专栏&#xff1a;【C语言】基础语法 思维导图 1. 基础语法1.1 变量和数据类型1.2 运算符和表达式1.3 控制流程结构1.4 函数和递归1.5 数组和指针1.6 字符串和字符处理1.7 文件操作 2. 高级特性标准库和常用函数动态内存分配多文件…

理解龙格库塔法基本C程序

先学习龙格-库塔法&#xff1b; 龙格-库塔&#xff0c;Runge-Kutta&#xff0c;该方法用于数值求解微分方程&#xff1b; 其中包括著名的欧拉法&#xff1b; 经典四阶法 该方法主要是在已知方程导数和初值信息&#xff0c;利用计算机仿真时应用&#xff0c;省去求解微分方…

【LeetCode】213. 打家劫舍 II

213. 打家劫舍 II&#xff08;中等&#xff09; 思路 这道题是 198.打家劫舍 的拓展版&#xff0c;区别在于&#xff1a;本题的房间是环形排列&#xff0c;而198.题中的房间是单排排列。 将房间环形排列&#xff0c;意味着第一间房间和最后一间房间不能同时盗窃&#xff0c;因…

虹科分享|不再受支持的Windows系统如何免受攻击?| 自动移动目标防御

传统的微软操作系统(OS)可能会一直伴随着我们&#xff0c;操作系统使用统计数据显示&#xff0c;传统操作系统的总市场份额仍在10%以上。Windows的总安装基数为13亿&#xff0c;大约有1.5亿个终端仍在运行旧版操作系统。 数十万组织的终端和服务器采用不受支持的操作系统。如果…

curl方式调用电商API接口示例 详细介绍

cURL是一个利用URL语法在命令行下工作的文件传输工具&#xff0c;1997年首次发行。它支持文件上传和下载&#xff0c;所以是综合传输工具&#xff0c;但按传统&#xff0c;习惯称cURL为下载工具。cURL还包含了用于程序开发的libcurl。 cURL支持的通信协议有FTP、FTPS、HTTP、H…

数字化工厂:虹科Vuzix AR眼镜在工业制造中的革新应用

随着现代科学技术和新兴需求的快速增长&#xff0c;增强现实(AR)、各种“现实”产品与技术不断涌入创新市场&#xff0c;新兴用例数量正在快速增长&#xff0c;可以肯定&#xff0c;在可预见的未来&#xff0c;AR技术将成为各行各业的生产与工作主流。 增强现实&#xff08;AR&…

应用scrapy爬虫框架

Scrapy是一个基于Python的开源网络爬虫框架&#xff0c;它可以帮助我们快速、高效地抓取网页数据&#xff0c;并支持数据的自动化处理、存储和导出。Scrapy提供了丰富的扩展机制&#xff0c;可以轻松地实现各种自定义需求。 Scrapy的基本使用流程&#xff1a; 1、安装Scrapy框…

服务(第十五篇)HAproxy负载+高可用

HAProxy负载均衡的调度算法&#xff08;策略&#xff09;&#xff1a; &#xff08;1&#xff09;roundrobin&#xff0c;表示简单的轮询 &#xff08;2&#xff09;static-rr&#xff0c;表示根据权重 &#xff08;3&#xff09;leastconn&#xff0c;表示最少连接者先处理 &…

RestTemplate使用不当引发的504及连接池耗尽问题分析

背景 系统&#xff1a; SpringBoot开发的Web应用&#xff1b;ORM: JPA(Hibernate)接口功能简述&#xff1a; 根据实体类ID到数据库中查询实体信息&#xff0c;然后使用RestTemplate调用外部系统接口获取数据。 问题现象 浏览器页面有时报504 GateWay Timeout错误&#xff0c…

C语言函数大全-- r 开头的函数

C语言函数大全 本篇介绍C语言函数大全-- r 开头的函数 1. raise 1.1 函数说明 函数声明函数功能int raise(int sig);用于向当前进程发送指定的信号。 参数&#xff1a; sig &#xff1a; 指定要发送的信号编号 返回值&#xff1a; 如果调用成功&#xff0c;raise() 函数将返…

霍兰德人格分析雷达图

雷达图 Radar Chart 雷达图是多特性直观展示的重要方式 问题分析 霍兰德认为&#xff1a;人格兴趣与职业之间应有一种内在的对应关系 人格分类&#xff1a;研究型、艺术型、社会型、企业型、传统型、现实性 职业&#xff1a;工程师、实验员、艺术家、推销员、记事员、社会工…

【AUTOSAR】【信息安全】SecOC

目录 一、概述 二、约束和假设 三、依赖模块 四、功能描述 4.1 安全解决方案的规范 4.1.1 安全解决方案的基本实体 4.1.2 安全的I-PDU构建 4.1.3 安全的I-PDU验证 4.2 与PduR的关系 4.3 初始化 4.4 传出PDU的身份验证 4.5 传入pdu的验证 4.6 网关功能 4.7 多核分…

【java】Java中的锁

文章目录 前言一、悲观锁二、乐观锁三、自旋锁原理自旋锁优缺点优点缺点 自旋锁时间阈值(1.6 引入了适应性自旋锁)自旋锁的开启 四、可重入锁(递归锁)五、读写锁六、公平锁七、非公平锁八、共享锁九、独占锁十、轻量级锁十一、重量级锁十二、偏向锁十三、分段锁十四、互斥锁十五…