org.apache.hadoop.hdfs.DFSInputStream
read 接口的关键逻辑在以下 pread 接口。
private int pread(long position, ByteBuffer buffer)
throws IOException {
// sanity checks
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
}
int length = buffer.remaining();
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}
// determine the block and byte range within the block
// corresponding to position and realLen
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
int bytesToRead = (int) Math.min(remaining,
blk.getBlockSize() - targetStart);
long targetEnd = targetStart + bytesToRead - 1;
try {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk, targetStart,
targetEnd, buffer, corruptedBlocks);
} else {
fetchBlockByteRange(blk, targetStart, targetEnd,
buffer, corruptedBlocks);
}
} finally {
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
// corrupted.
reportCheckSumFailure(corruptedBlocks, blk.getLocations().length,
false);
}
remaining -= bytesToRead;
position += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
return realLen;
}
遇到临界位置, 这边起主要作用
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}
hadoop 中 H1SeekableInputStream 和 H2SeekableInputStream
- H1SeekableInputStream
/**
* SeekableInputStream implementation that implements read(ByteBuffer) for
* Hadoop 1 FSDataInputStream.
*/
class H1SeekableInputStream extends DelegatingSeekableInputStream {
H1SeekableInputStream 直接使用父类 DelegatingSeekableInputStream 中的 readFully 方法。
@Override
public int read(ByteBuffer buf) throws IOException {
if (buf.hasArray()) {
return readHeapBuffer(stream, buf);
} else {
return readDirectBuffer(stream, buf, temp);
}
}
最后到这里:
// Visible for testing
static void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {
readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
buf.position(buf.limit());
}
巧妙的转成了 bytes 数组,进行读写。
- H2SeekableInputStream
/**
* SeekableInputStream implementation for FSDataInputStream that implements
* ByteBufferReadable in Hadoop 2.
*/
class H2SeekableInputStream extends DelegatingSeekableInputStream {
H2SeekableInputStream 覆写了 父类 DelegatingSeekableInputStream 中的 readFully 方法。
@Override
public void readFully(ByteBuffer buf) throws IOException {
readFully(reader, buf);
}
org.apache.parquet.hadoop.util
- org.apache.parquet.hadoop.util.HadoopInputFile
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
}
- org.apache.parquet.hadoop.util.HadoopStreams
相关返回哪个 SeekableInputStream, 逻辑在这里。
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
try {
return h2SeekableConstructor.newInstance(stream);
} catch (InstantiationException | IllegalAccessException e) {
LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
return new H1SeekableInputStream(stream);
} catch (InvocationTargetException e) {
throw new ParquetDecodingException(
"Could not instantiate H2SeekableInputStream", e.getTargetException());
}
} else {
return new H1SeekableInputStream(stream);
}
}