DataX读取Hive Orc格式表丢失数据处理记录

news2024/11/25 23:28:11

文章目录

    • 问题
      • 问题概述
      • 问题详细描述
    • 原因
    • 解决方法
      • 修改源码
      • 验证

问题

问题概述

DataX读取Hive Orc存储格式表数据丢失

问题详细描述

同步Hive表将数据发送到Kafka,Hive表A数据总量如下

SQL:select count(1) from A;
数量:19397281

使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450

任务总计耗时                    :               1273s
任务平均流量                    :            2.51MB/s
记录写入速度                    :           9960rec/s
读出记录总数                    :            12649450
读写失败总数                    :                   0

在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)

在这里插入图片描述

原因

DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:

InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);

代码位置:

hdfsreader模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil类334行代码(源码为v202210版本)

这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0],其他数据就会丢失

我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下

$ hdfs dfs -du -h  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05
....
518.4 K   1.5 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0
669.7 M   2.0 G    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0
205.6 K   616.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0
264.6 K   793.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0
1.4 M     4.3 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0
1.5 M     4.6 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0
....

发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0大于BlockSize大小

解决方法

修改源码

修改后方法源码如下,直接替换DFSUtil.javaorcFileStartRead方法即可

public void orcFileStartRead(
            String sourceOrcFilePath,
            Configuration readerSliceConfig,
            RecordSender recordSender,
            TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column =
                UnstructuredStorageReaderUtil.getListColumnEntry(
                        readerSliceConfig,
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat =
                readerSliceConfig.getString(
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector =
                        (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                // If the network disconnected, will retry 45 times, each time the retry interval
                // for 20 seconds
                // Each file as a split
                InputSplit[] splits = in.getSplits(conf, -1);
                for (InputSplit split : splits) {
                    {
                        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                        Object key = reader.createKey();
                        Object value = reader.createValue();
                        // 获取列信息
                        List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                        List<Object> recordFields;
                        while (reader.next(key, value)) {
                            recordFields = new ArrayList<Object>();

                            for (int i = 0; i <= columnIndexMax; i++) {
                                Object field = inspector.getStructFieldData(value, fields.get(i));
                                recordFields.add(field);
                            }
                            transportOneRecord(
                                    column,
                                    recordFields,
                                    recordSender,
                                    taskPluginCollector,
                                    isReadAllColumns,
                                    nullFormat);
                        }
                        reader.close();
                    }
                    // transportOneRecord(column, recordFields, recordSender,
                    //        taskPluginCollector, isReadAllColumns, nullFormat);
                }
                // reader.close();
            } catch (Exception e) {
                String message =
                        String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message =
                    String.format(
                            "请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s",
                            JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

修改完成后将源码打包,打包后在hdfsreader模块下target目录有target/hdfsreader-0.0.1-SNAPSHOT.jar文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader下,替换之前的包.


[hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd
/datax/plugin/reader/hdfsreader
[hadoop@10 /datax/plugin/reader/hdfsreader]$ ll
total 52
-rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar
drwxrwxr-x 2 hadoop hadoop  8192 Dec  9 15:06 libs
-rw-rw-r-- 1 hadoop hadoop   217 Oct 26  2022 plugin_job_template.json
-rw-rw-r-- 1 hadoop hadoop   302 Oct 26  2022 plugin.json

验证

重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/519381.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML小游戏25 —— HTML5拉杆子过关小游戏(附完整源码)

本节教程我会带大家使用 HTML 、CSS和 JS 来制作一个HTML5拉杆子过关小游戏 ✨ 前言 &#x1f579;️ 本文已收录于&#x1f396;️100个HTML小游戏专栏&#xff1a;100个H5游戏专栏https://blog.csdn.net/qq_53544522/category_12064846.html&#x1f3ae; 目前已有100小游戏…

交叉编译--build、--host、--target、--prefix

一、编译例子 ./configure --build编译平台 --host运行平台 --target目标平台 [各种编译参数]build&#xff1a;表示目前我们正在运行的平台名称是什么&#xff0c;如&#xff1a;当前我们是在电脑上编译该系统&#xff0c;那么我们的 --build 就可能是 x86&#xff0c;如果…

如何避免因为 Kubernetes 和 Kafka 而被解雇

本文由 Bing AI 生成。Bing AI 真是尽显程序员本色&#xff0c;我等它生成文章的过程中发现出现了 Markdown 语法&#xff0c;结果点复制过来的就是直接 Markdown 文档。 Kubernetes 和 Kafka 是两个非常流行的技术&#xff0c;它们分别用于容器编排和分布式消息传递。它们的优…

XSD2Code++ Crack

XSD2Code Crack XSD2Code是为那些希望在将复杂的XML和JSON模式转换为NetCore时节省时间的开发人员设计的。它使用简单且灵活&#xff0c;可以很容易地集成到任何项目中&#xff0c;并适应开发人员的需求。它通过直观、可定制的用户界面&#xff0c;真正提高了生产力。使用XSD2C…

【SpringCloud】初步认识微服务

文章目录 1.认识微服务1.1微服务由来1.2为什么需要微服务&#xff1f; 2.两种架构2.1.单体架构2.2.分布式架构 3.微服务的特点4.SpringCloud5.总结最后说一句 1.认识微服务 随着互联网行业的发展&#xff0c;对服务的要求也越来越高&#xff0c;服务架构也从单体架构逐渐演变为…

K8s基础10——数据卷、PV和PVC、StorageClass动态补给、StatefulSet控制器

文章目录 一、数据卷类型1.1 临时数据卷&#xff08;节点挂载&#xff09;1.2 节点数据卷&#xff08;节点挂载&#xff09;1.3 网络数据卷NFS1.3.1 效果测试 1.4 持久数据卷&#xff08;PVC/PV&#xff09;1.4.1 效果测试1.4.2 测试结论 二、PV、PVC生命周期2.1 各阶段工作原理…

华为机试真题 数组奇偶排序

人寄语: 准备面试华为外包德科,记录一下一些面试题; 牛客网代码提交的坑,可以看一下下面的第一道题,ide本地编译通过,牛客网死活不通过,提交代码提示:返回非0。原因分析   查询得知,结果非零的意思的代码退出的时候不是以正常的0退出的,而是非0状态,也就是代码出错…

操作系统进程线程(三)—进程状态、同步互斥、锁、死锁

Linux下同步机制 POSIX信号量&#xff1a;可用于进程同步&#xff0c;也可用于线程同步POSIX互斥锁条件变量&#xff1a;只能用于线程同步。 进程同步的四种方法 临界区 对临界资源进行访问。 同步和互斥 同步&#xff1a;多个进程因为合作产生直接制约关系&#xff0c;使…

教你如何正确使用ChatGPT

目录 前言 一、ChatGPT Sidebar 二、免费镜像 三、共享账号 总结 前言 ChatGPT 是一种基于深度学习技术的自然语言处理工具&#xff0c;能够用于文本生成、语言翻译等任务。然而&#xff0c;其使用需要一定的技术基础和相关知识&#xff0c;不少用户可能会遇到一些问题。…

从功能到自动化,4个月时间我是如何从点工进入互联网大厂的

1、知识体系化 不知不觉&#xff0c;入行软件测试也有五个年头。待过创业公司也待过上市公司。做过功能测试、自动化测试也做过性能测试。做过测试新人也做过测试组长。如果要是从这5年中说出最宝贵的经验&#xff0c;我想应该是知识体系化。那么什么是知识体系化&#xff0c;…

SViT 实验记录

目录 一、网络的搭建 1、Conv Stem 2、各阶段的模块 3、3X3卷积 二、前向传播过程 1、Stem 2、各阶段中的基本模块STT Block 1&#xff09;CPE模块 2&#xff09;STA模块 网络结构 一、网络的搭建 论文中的结构原图 基本模块 1、Conv Stem (patch_embed): PatchEmbed…

算法修炼之练气篇——练气十三层

博主&#xff1a;命运之光 专栏&#xff1a;算法修炼之练气篇 目录 题目 1023: [编程入门]选择排序 题目描述 输入格式 输出格式 样例输入 样例输出 题目 1065: 二级C语言-最小绝对值 题目描述 输入格式 输出格式 样例输入 样例输出 题目 1021: [编程入门]迭代法求…

【Selenium上】——全栈开发——如桃花来

目录索引 Selenium是什么&#xff1a;下载和配置环境变量&#xff1a;1. 基本使用&#xff1a;导入五个常用包&#xff1a;基本代码&#xff1a; 实例引入&#xff1a;声明不同浏览器对象&#xff1a;访问页面&#xff1a; Selenium是什么&#xff1a; Selenium是一个用于Web应…

Cesium入门之四:基于Vue3+Vite+Cesium构建三维地球场景

Cesium官网中提供了基于webpack配置Cesium的方法&#xff0c;但是这种方法太繁琐&#xff0c;而且使用webpack时程序启动没有Vite启动快&#xff0c;因此&#xff0c;这里选择vite创建vue3cesium构建项目 创建vue3项目 新建CesiumProject文件夹&#xff0c;在该文件夹上点击右…

clang-format configurator - 交互式创建 clang-format 格式配置文件

clang-format configurator - 交互式创建 clang-format 格式配置文件 clang-format configurator https://zed0.co.uk/clang-format-configurator/ clang-format-configurator https://github.com/zed0/clang-format-configurator Interactively create a clang-format confi…

minikube,搭建+镜像加速,坚持 3 分钟,带你玩的明明白白

一、 安装 cri-docker 下载安装 # 在 https://github.com/Mirantis/ 下载 https://github.com/Mirantis/tar -xvf cri-dockerd-0.3.1.amd64.tgzcp cri-dockerd/cri-dockerd /usr/bin/chmod x /usr/bin/cri-dockerd# 确认已安装版本 cri-dockerd --version配置启动文件 cri-do…

一篇让你精通JWT,妥妥的避坑指南~

视频教程传送门&#xff1a;JWT 两小时极简入门&#xff1a;JWT实战应用与防坑指南~_哔哩哔哩_bilibiliJWT 两小时极简入门&#xff1a;JWT实战应用与防坑指南~共计12条视频&#xff0c;包括&#xff1a;01.课程介绍与前置知识点、02.JWT概念、03.JWT组成等&#xff0c;UP主更多…

一个例子让你彻底弄懂分布式系统的CAP理论

1 推荐的文章 下面这篇知乎文章是我见过的最简单易懂的一篇&#xff0c;把CAP定义以及为什么AP和CP只能二选一以及场景特定下选AP还是CP作为系统目标等讲解明明白白 谈谈分布式系统的CAP 2 个人对上面这篇文章的的一些补充 可用性可以人为设置一个阈值&#xff0c;比如用户体…

openPOWERLINK源码(最新)在stm32单片机上的移植指南

最近着了powerlink的道&#xff0c;连续几晚十二点前没睡过觉。不得不说兴趣这东西劲太大了&#xff0c;让人睡不着。喜欢上研究POWERLINK&#xff0c;最新版的源码结构挺清晰的&#xff0c;移植并测试了嵌入式linux作为从站和电脑主站之间的通信&#xff0c;挺有趣的。接下来想…

路由器配置方法(固定地址)

前言 由于学校给分配了IP地址&#xff0c;因此我们的路由器接入的时候不能选择自动接入方式&#xff0c;而要选择固定地址方式。 step 1 我们首先先将路由器接上网线&#xff0c;这里注意一定要接wan口 因为路由器分为两个口&#xff0c;wan口是入口&#xff0c;lan口是出口…