文章目录
- 问题
- 问题概述
- 问题详细描述
- 原因
- 解决方法
- 修改源码
- 验证
问题
问题概述
DataX读取Hive Orc存储格式表数据丢失
问题详细描述
同步Hive表将数据发送到Kafka,Hive表A数据总量如下
SQL:select count(1) from A;
数量:19397281
使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450
任务总计耗时 : 1273s
任务平均流量 : 2.51MB/s
记录写入速度 : 9960rec/s
读出记录总数 : 12649450
读写失败总数 : 0
在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)
原因
DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
代码位置:
hdfsreader
模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil
类334行代码(源码为v202210版本)
这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0]
,其他数据就会丢失
我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下
$ hdfs dfs -du -h /usr/hive/warehouse/dwd.db/A/dt=2022-05-05
....
518.4 K 1.5 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0
669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0
205.6 K 616.9 K /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0
264.6 K 793.9 K /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0
1.4 M 4.3 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0
1.5 M 4.6 M /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0
....
发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0
大于BlockSize大小
解决方法
修改源码
修改后方法源码如下,直接替换DFSUtil.java
中orcFileStartRead
方法即可
public void orcFileStartRead(
String sourceOrcFilePath,
Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
List<ColumnEntry> column =
UnstructuredStorageReaderUtil.getListColumnEntry(
readerSliceConfig,
com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat =
readerSliceConfig.getString(
com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判断是否读取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path orcFilePath = new Path(sourceOrcFilePath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
OrcSerde serde = new OrcSerde();
serde.initialize(conf, p);
StructObjectInspector inspector =
(StructObjectInspector) serde.getObjectInspector();
InputFormat<?, ?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, orcFilePath.toString());
// If the network disconnected, will retry 45 times, each time the retry interval
// for 20 seconds
// Each file as a split
InputSplit[] splits = in.getSplits(conf, -1);
for (InputSplit split : splits) {
{
RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
}
transportOneRecord(
column,
recordFields,
recordSender,
taskPluginCollector,
isReadAllColumns,
nullFormat);
}
reader.close();
}
// transportOneRecord(column, recordFields, recordSender,
// taskPluginCollector, isReadAllColumns, nullFormat);
}
// reader.close();
} catch (Exception e) {
String message =
String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
} else {
String message =
String.format(
"请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s",
JSON.toJSONString(column));
throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
}
}
修改完成后将源码打包,打包后在hdfsreader
模块下target
目录有target/hdfsreader-0.0.1-SNAPSHOT.jar
文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader
下,替换之前的包.
[hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd
/datax/plugin/reader/hdfsreader
[hadoop@10 /datax/plugin/reader/hdfsreader]$ ll
total 52
-rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar
drwxrwxr-x 2 hadoop hadoop 8192 Dec 9 15:06 libs
-rw-rw-r-- 1 hadoop hadoop 217 Oct 26 2022 plugin_job_template.json
-rw-rw-r-- 1 hadoop hadoop 302 Oct 26 2022 plugin.json
验证
重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。