datax插件开发HdfsReader支持parquet

news2025/1/11 22:58:36

数据仓库HIVE存储数据一般采用parquet格式,但Alibaba datax开源版不支持parquet格式,在网上查了很多资料,写的大多不完整,特此总结出完整版记录一下,供大家参考。

操作步骤

1.从gitee 拉取datax代码,对hdfsreader模块进行改造,主要改造以下4个类。

 pom里面添加parquet支持依赖

 <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-common</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-protobuf</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-protobuf</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.1</version>
        </dependency>

Constant如下

public class Constant {
    public static final String SOURCE_FILES = "sourceFiles";
    public static final String TEXT = "TEXT";
    public static final String ORC = "ORC";
    public static final String CSV = "CSV";
    public static final String SEQ = "SEQ";
    public static final String RC = "RC";
    public static final String PARQUET= "PARQUET";

}

HdfsFileType

public enum HdfsFileType {
    ORC, SEQ, RC, CSV, TEXT,PARQUET,
}

DFSUtil添加读取parquet方法,根据orc读取方法改造

  public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
                                     RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read parquetfile [%s].", sourceParquetFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getParquetAllColumnsCount(sourceParquetFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            Path parquetFilePath = new Path(sourceParquetFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                // 创建 ParquetReader.Builder 实例
                ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), parquetFilePath);
                // 创建 ParquetReader 实例
                ParquetReader<Group> reader = builder.build();
                // 循环读取 Parquet 文件中的记录
                Group line;
                List<Object> recordFields;
                while ((line = reader.read()) != null) {
                    recordFields = new ArrayList<Object>();
                    //从line中获取每个字段
                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = line.getValueToString(i, 0);
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender,
                            taskPluginCollector, isReadAllColumns, nullFormat);
                }
            } catch (Exception e) {
                String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,[%s],请联系系统管理员。"
                        , sourceParquetFilePath, e);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }




//获取parquet文件总列数
    private int getParquetAllColumnsCount(String filePath) {
        Path path = new Path(filePath);
        try {
            org.apache.parquet.hadoop.metadata.ParquetMetadata metadata = org.apache.parquet.hadoop.ParquetFileReader.readFooter(hadoopConf, path);
            List<ColumnChunkMetaData> columns = metadata.getBlocks().get(0).getColumns();
            int columnCount = columns.size();
            return columnCount;
        } catch (IOException e) {
            String message = "读取Parquetfile column列数失败,请联系系统管理员";
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }

    }


   //检查文件类型,添加parquet判断
    public boolean checkHdfsFileType(String filepath, String specifiedFileType) {

        Path file = new Path(filepath);

        try {
            FileSystem fs = FileSystem.get(hadoopConf);
            FSDataInputStream in = fs.open(file);

            if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.CSV)
                    || StringUtils.equalsIgnoreCase(specifiedFileType, Constant.TEXT)) {

                boolean isORC = isORCFile(file, fs, in);// 判断是否是 ORC File
                if (isORC) {
                    return false;
                }
                boolean isRC = isRCFile(filepath, in);// 判断是否是 RC File
                if (isRC) {
                    return false;
                }
                boolean isSEQ = isSequenceFile(filepath, in);// 判断是否是 Sequence File
                if (isSEQ) {
                    return false;
                }
                // 如果不是ORC,RC和SEQ,则默认为是TEXT或CSV类型
                return !isORC && !isRC && !isSEQ;

            } else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.ORC)) {

                return isORCFile(file, fs, in);
            } else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.RC)) {

                return isRCFile(filepath, in);
            } else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.SEQ)) {

                return isSequenceFile(filepath, in);
            } else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.PARQUET)) {
                return isParquetFile(new Path(filepath));
            }
        } catch (Exception e) {
            String message = String.format("检查文件[%s]类型失败,目前支持ORC,SEQUENCE,RCFile,PARQUET,TEXT,CSV六种格式的文件," +
                    "请检查您文件类型和文件是否正确。", filepath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
        }
        return false;
    }


    //判断文件是否是parquet
    private static boolean isParquetFile(Path file) {
        try {
            org.apache.parquet.hadoop.example.GroupReadSupport readSupport = new GroupReadSupport();
            ParquetReader.Builder<org.apache.parquet.example.data.Group> reader = ParquetReader.builder(readSupport, file);
            ParquetReader<Group> build = reader.build();
            if (build.read() != null) {
                return true;
            }
        } catch (IOException e) {

        }
        return false;
    }

最后是HdfsReader,添加parquet

 重新打包hdfsreader,将包替换到datax引擎即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/694181.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一、云尚办公系统:搭建环境

云尚办公系统&#xff1a;搭建环境 B站直达【为尚硅谷点赞】: https://www.bilibili.com/video/BV1Ya411S7aT 本博文以课程相关为主发布&#xff0c;并且融入了自己的一些看法以及对学习过程中遇见的问题给出相关的解决方法。一起学习一起进步&#xff01;&#xff01;&#x…

前端第一期工作梳理总结:实现基础界面

目前前后端总体框架搭建并打通&#xff0c;除了文本检索功能外&#xff0c;均正常实现&#xff0c;后期将进行单元测试、集成测试和功能、性能测试。具体界面展示如下&#xff1a; ①注册、登录、忘记-找回密码 - 注册 - 登录 - 注销&#xff0c;退出当前账号。 - 忘记密码…

word转pdf实现

写一下笔记&#xff0c;以便在以后工作中用到&#xff1a; 导包&#xff1a; <!--word 转 pdf--> <dependency><groupId>com.documents4j</groupId><artifactId>documents4j-local</artifactId><version>1.0.3</version> &l…

油烟机语音方案:NV040D语音芯片,支持MCU输入UART指令

随着人们生活水平的提高和厨房使用频率的增加&#xff0c;油烟机成为现代家庭生活中必不可少的一种家用电器&#xff0c;而语音智能化技术的发展也使得油烟机功能更加智能化。九芯电子的NV040D语音芯片是一种具备MCU输入UART指令功能的专业语音芯片&#xff0c;可以广泛应用于油…

实现微服务中的数据一致性:成功的策略

微服务架构已经彻底改变了我们构建和扩展应用程序的方式&#xff0c;提供了诸多优势&#xff0c;如提高了灵活性、可扩展性和故障隔离性。然而&#xff0c;由于微服务的分散性&#xff0c;跨服务维护数据一致性可能面临重大挑战。在本文中&#xff0c;我们将探讨不同的方法来解…

itextpdf实现word模板生成文件

前言 使用word模板生成文件&#xff0c;如下图&#xff0c;将左侧的模板生成为右侧的填充word文档。 操作方式 引入依赖 <!-- https://mvnrepository.com/artifact/com.itextpdf/itextpdf --><dependency><groupId>com.itextpdf</groupId><arti…

多串口数据采集网关有什么功能

数据采集网关是一种通信终端设备&#xff0c;也称物联网网关&#xff0c;它具备数据采集、存储、传输等功能。物通博联WG系列数据采集网关提供多种数据传输方式和接口通道&#xff0c;包括有线、无线和串口传输等&#xff08;5G、4G、WIFI、以太网&#xff09;&#xff0c;可以…

ModaHub魔搭社区:向量数据库Milvus性能调优教程(二)

目录 索引 其他 存储优化 常见问题 索引 向量索引的基本概念请参考 向量索引概述。 选择合适的索引需要在存储空间、查询性能、查询召回率等多个指标中权衡。 FLAT 索引 FLAT 是对向量的暴力搜索&#xff08;brute-force search&#xff09;&#xff0c;速度最慢&#…

Qt/C++编写监控实时显示和取流回放工具(回放支持切换进度)

一、前言 现在各个监控大厂做的设备&#xff0c;基本上都会支持通过rtsp直接取流显示&#xff0c;而且做的比较好的还支持通过rtsp回放取流&#xff0c;基本上都会约定一个字符串的规则&#xff0c;每个厂家都是不一样的规则&#xff0c;比如回放对应的rtsp地址还要带上时间范…

Spring专家课程Day02_Spring-DI

文章目录 一、依赖注入_Autowired1.配置类中Bean 方式注入1.1&#xff09;注入实例1.2&#xff09;自动注入的匹配原则 2.组件扫描实现自动注入 Autowired3.set方法注入 二、接口解耦_自动注入规则1&#xff09;利用接口解耦2&#xff09;Autowired的注入规则3&#xff09;Qual…

学高性能计算难吗?猿代码科技国内首家专注高性能计算人才培养与推荐 ...

高性能计算&#xff08;HPC&#xff09;作为一门专业领域&#xff0c;涉及到复杂的计算架构、并行计算模型和算法优化等方面的知识。因此&#xff0c;学习高性能计算可能对一些人来说是一项挑战。然而&#xff0c;随着正确的学习方法和适当的资源&#xff0c;掌握高性能计算并不…

【UCOS-III】自我学习笔记→第35讲→软件定时器实验

文章目录 前言实验步骤1.复制消息队列工程&#xff0c;并创建单次定时器和周期定时器&#xff0c;并删除task3及其相关内容2.添加task1按键处理和软件定时器超时函数3.查看串口现象 测试代码工程文件总结 前言 无&#xff0c;仅作记录&#xff0c;不具有参考价值&#xff0c;所…

【MATLAB第43期】基于MATLAB的BO-NAR贝叶斯优化动态神经网络NAR时间序列股票预测模型

【MATLAB第43期】基于MATLAB的BO-NAR贝叶斯优化动态神经网络NAR时间序列股票预测模型 一、效果展示 二、模型介绍 1.数据情况 一列数据&#xff0c;499个值 ratio 0.9;% 训练集比例 MaxEpochs 600;% 最大训练次数 % % 导入股票数据 xall importdata(数据.xlsx);2.优化参…

MySQL MMM 双主架构 主主复制 高可用

MMM&#xff08;Master-Master replication manager for MvSQL&#xff0c;MySQL主主复制管理器&#xff09; 是一套支持双主故障切换和双主日常管理的脚本程序。MMM 使用 Perl 语言开发&#xff0c;主要用来监控和管理 MySQL Master-Master &#xff08;双主&#xff09;复制&…

全志V3S嵌入式驱动开发(windows平台镜像烧入)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 对于很多第一次学习嵌入式linux和开发板的同学来说&#xff0c;如何烧入镜像文件常常成为他们要面对的第一个难题。从拿到板子开始&#xff0c;他们…

每个前端开发者都应知道的25个实用网站

微信搜索 【大迁世界】, 我会第一时间和你分享前端行业趋势&#xff0c;学习途径等等。 本文 GitHub https://github.com/qq449245884/xiaozhi 已收录&#xff0c;有一线大厂面试完整考点、资料以及我的系列文章。 快来免费体验ChatGpt plus版本的&#xff0c;我们出的钱 体验地…

Soybean Admin - 基于 Vue3 / vite3 等最新前端技术栈构建的中后台模板,免费开源、清新优雅,主题丰富

一款专业好看、完成度很高的 admin 开源项目&#xff0c;无论是用于生产还是学习&#xff0c;都非常值得尝试。 关于 Soybean Admin Soybean Admin 是一个基于 Vue3 / Vite3 / TypeScript / NaiveUI / Pinia 和 UnoCSS 的中后台模版&#xff0c;它使用了最新流行的前端技术栈…

poium测试库介绍

poium测试库前身为selenium-page-objects测试库&#xff0c;我在以前的文章中也有介绍过:这可能是最简单的Page Object库&#xff0c;项目的核心是基于Page Objects实现元素定位的封装。该项目由我个人在维护&#xff0c;目前在公司项目中已经得到的应用。 ### poium的优势 Pa…

cmd和android studio同时使用adb,解决冲突的方案

问题&#xff1a; android studio 在Terminal使用adb后&#xff0c;cmd的adb就会掉线&#xff1b;同样cmd的adb使用中&#xff0c;android studio的logcat 的设备就是Offline状态&#xff0c;得重新在Terminal adb connect&#xff0c;并且关闭掉cmd窗口&#xff0c;否则adb反…

Centos ifconfig不显示IP地址解决办法之一

虚拟机使用命令ifconfig不显示IP地址&#xff0c;情况如下 原因&#xff1a;我们的虚拟机网络服务没有设置为开机自启动 解决&#xff1a; 1&#xff0c;进入目录 /etc/sysconfig/network-scripts/ cd /etc/sysconfig/network-scripts/ 2&#xff0c;路径下有一个 ifcfg-en…