HDFS 之 文件流

news2024/9/24 15:21:08
org.apache.hadoop.hdfs.DFSInputStream

read 接口的关键逻辑在以下 pread 接口。

  private int pread(long position, ByteBuffer buffer)
      throws IOException {
    // sanity checks
    dfsClient.checkOpen();
    if (closed.get()) {
      throw new IOException("Stream closed");
    }
    failures = 0;
    long filelen = getFileLength();
    if ((position < 0) || (position >= filelen)) {
      return -1;
    }
    int length = buffer.remaining();
    int realLen = length;
    if ((position + length) > filelen) {
      realLen = (int)(filelen - position);
    }

    // determine the block and byte range within the block
    // corresponding to position and realLen
    List<LocatedBlock> blockRange = getBlockRange(position, realLen);
    int remaining = realLen;
    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
    for (LocatedBlock blk : blockRange) {
      long targetStart = position - blk.getStartOffset();
      int bytesToRead = (int) Math.min(remaining,
          blk.getBlockSize() - targetStart);
      long targetEnd = targetStart + bytesToRead - 1;
      try {
        if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
          hedgedFetchBlockByteRange(blk, targetStart,
              targetEnd, buffer, corruptedBlocks);
        } else {
          fetchBlockByteRange(blk, targetStart, targetEnd,
              buffer, corruptedBlocks);
        }
      } finally {
        // Check and report if any block replicas are corrupted.
        // BlockMissingException may be caught if all block replicas are
        // corrupted.
        reportCheckSumFailure(corruptedBlocks, blk.getLocations().length,
            false);
      }

      remaining -= bytesToRead;
      position += bytesToRead;
    }
    assert remaining == 0 : "Wrong number of bytes read.";
    return realLen;
  }

遇到临界位置, 这边起主要作用

    if ((position + length) > filelen) {
      realLen = (int)(filelen - position);
    }
hadoop 中 H1SeekableInputStream 和 H2SeekableInputStream
  • H1SeekableInputStream
/**
 * SeekableInputStream implementation that implements read(ByteBuffer) for
 * Hadoop 1 FSDataInputStream.
 */
class H1SeekableInputStream extends DelegatingSeekableInputStream {

H1SeekableInputStream 直接使用父类 DelegatingSeekableInputStream 中的 readFully 方法。

@Override
  public int read(ByteBuffer buf) throws IOException {
    if (buf.hasArray()) {
      return readHeapBuffer(stream, buf);
    } else {
      return readDirectBuffer(stream, buf, temp);
    }
  }

在这里插入图片描述

最后到这里:

  // Visible for testing
  static void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {
    readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
    buf.position(buf.limit());
  }

巧妙的转成了 bytes 数组,进行读写。

  • H2SeekableInputStream
/**
 * SeekableInputStream implementation for FSDataInputStream that implements
 * ByteBufferReadable in Hadoop 2.
 */
class H2SeekableInputStream extends DelegatingSeekableInputStream {

H2SeekableInputStream 覆写了 父类 DelegatingSeekableInputStream 中的 readFully 方法。

@Override
  public void readFully(ByteBuffer buf) throws IOException {
    readFully(reader, buf);
  }

org.apache.parquet.hadoop.util
  • org.apache.parquet.hadoop.util.HadoopInputFile
  public SeekableInputStream newStream() throws IOException {
    return HadoopStreams.wrap(fs.open(stat.getPath()));
  }

  • org.apache.parquet.hadoop.util.HadoopStreams

相关返回哪个 SeekableInputStream, 逻辑在这里。

  public static SeekableInputStream wrap(FSDataInputStream stream) {
    Objects.requireNonNull(stream, "Cannot wrap a null input stream");
    if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
        byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
      try {
        return h2SeekableConstructor.newInstance(stream);
      } catch (InstantiationException | IllegalAccessException e) {
        LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
        return new H1SeekableInputStream(stream);
      } catch (InvocationTargetException e) {
        throw new ParquetDecodingException(
            "Could not instantiate H2SeekableInputStream", e.getTargetException());
      }
    } else {
      return new H1SeekableInputStream(stream);
    }
  }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1993290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

24/8/8算法笔记 决策树构建鸢尾花

决策树是一种由算法自动设计的模型。在机器学习中&#xff0c;构建决策树的过程通常遵循以下步骤&#xff1a; 特征选择&#xff1a;算法会评估每个特征&#xff0c;并选择一个特征作为节点分裂的依据。这个选择基于某种准则&#xff0c;如信息增益&#xff08;ID3算法&#xf…

手把手教你去掉WinRAR中的广告?

你是否在使用WinRAR的时候&#xff0c;打开压缩包的时候&#xff0c;它就会给你弹出一个广告窗口&#xff0c;是不是很烦人。本章教程&#xff0c;教你如何将它去除掉。 1、下载所需软件 通过百度网盘分享的文件&#xff1a;reshacker 链接&#xff1a;https://pan.baidu.com/s…

基础复习(前端部分)

MVVM(Model-View-ViewModel)的前端开发思想 Model: 数据模型&#xff0c;特指前端中通过请求从后台获取的数据 View: 视图&#xff0c;用于展示数据的页面&#xff0c;可以理解成我们的htmlcss搭建的页面&#xff0c;但是没有数据 ViewModel: 数据绑定到视图&#xff0c;负责…

SQL Server端口设置完整详细步骤

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; 前面是对SQLserver服务器一些介绍&#xff0c;不想了解的可直接点击目录跳入正题&#xff0c;谢谢&#xff01;&#xff01;&#xff01; SQL Server 是由微软公司开发的关系数据库管理系统 (RDBMS)。它主要…

伪原创改写软件,最便捷的改文章选择

说到改文章&#xff0c;很多人的直接想法就是自己动手去修改&#xff0c;但自己动手改文章的过程中是需要花大量时间阅读并理解透文章写的意思&#xff0c;然后才便于修改&#xff0c;然而伪原创改写软件的出现却在修改文章的工作中提供了非常大多的作用&#xff0c;不管是节省…

揭秘人工智能三大基石:数据、算法与算力的深度融合

在科技日新月异的今天&#xff0c;人工智能&#xff08;AI&#xff09;作为引领未来科技浪潮的核心力量&#xff0c;正以前所未有的速度改变着我们的生活、工作乃至整个社会的面貌。人工智能的快速发展并非偶然&#xff0c;而是建立在三大坚实基石之上&#xff1a;数据、算法与…

Nacos-配置中心

1.为什么要使用配置中心&#xff1f; 2.常用的配置中心组件&#xff1f; 3.如何使用&#xff1f; 在配置中心创建配置文件 启动一个单列的nacos服务 点击发布 在微服务中使用 添加依赖 <!--nacso配置中心的依赖--><dependency><groupId>com.alibaba.cloud&l…

zdppy+vue3+onllyoffice开发文档管理系统项目实战 20240808 上课笔记

遗留的问题 1、实现删除的功能 2、分享的功能暂时往后放&#xff0c;因为目前没有用户&#xff0c;等有了用户之后再考虑做 3、增加新建和导入按钮 zdppy的学习计划 机器学习平台&#xff0c;QQ音乐的开源项目&#xff0c;https://github.com/tencentmusic/cube-studio&#…

Python语法基础常识

01 #中英文格式问题 我们用Python编程时用到的所有字母、符号、函数格式等都应当使用英文格式。 不少同学在刚入门的时候&#xff0c;可能会因为用错格式而频频报错、运行失败&#xff0c;这就需要我们时刻留意啦。 02 #print函数的使用 print函数会是我们接触Python时第一个…

万字解析文件fd,深刻理解:fd文件描述符、位图、标准输入、标准输出、标准错误、文件打开、文件关闭、Linux一切皆文件理解、进程和文件的关系、虚拟软件系统

建议全文阅读&#xff01;&#xff01;&#xff01; 建议全文阅读&#xff01;&#xff01;&#xff01; 建议全文阅读&#xff01;&#xff01;&#xff01; 目录 文章概述 一、文件操作 1、什么叫当前路径 2、常见文件操作 &#xff08;1&#xff09;fopen函数 &…

YASKAWA安川直驱电机DD马达Σ-7系列介绍

随着智能制造的兴起&#xff0c;对设备精度、效率及可靠性的要求日益提升&#xff0c;安川Σ-7系列无需减速机即可直接驱动负载的“直驱伺服电机”&#xff0c;以其独特的优势正逐步成为众多高精度、高效率应用场景下的运动控制解决方案。 一、直驱技术的革命性突破 传统伺服…

When can a sum and integral be interchanged?

https://math.stackexchange.com/questions/83721/when-can-a-sum-and-integral-be-interchanged https://math.stackexchange.com/questions/1334907/reversing-the-order-of-integration-and-summation

【大模型从入门到精通12】openAI API 提示链的力量3

这里写目录标题 实践问题 实践部分场景概述场景步骤初始产品询问故障排除请求保修问题额外产品推荐 示例实现 实践问题 编写一个名为retrieve_model_response的Python函数&#xff0c;该函数接受一个消息序列作为输入&#xff0c;并根据给定参数返回模型的响应。包括模型、温度…

如何去掉el-input自带边框

<style lang"scss" scoped>::v-deep .inputDeep .el-input__inner {border: none !important;box-shadow: none !important;padding: 0px; }</style> //先定义一个类名 <el-input v-model"form.name" class"inputDeep"><…

7.3.1.算法设计与分析-总结及真题讲解

总结 分治法特征&#xff1a;把一个问题拆分成多个小规模的相同子问题&#xff0c;一般可用递归解决。 经典问题&#xff1a;斐波那契数列、归并排序、快速排序、矩阵乘法、二分搜索、大整数乘法、汉诺塔 回溯法特征&#xff1a;系统的搜索一个问题的所有解或任一解。 经典问题…

10个理由告诉你,为什么鸿蒙是下一个职业风口!

在当今科技飞速发展的时代&#xff0c;新的技术和趋势不断涌现&#xff0c;为人们带来了前所未有的机遇和挑战。鸿蒙操作系统作为我国自主研发的创新成果&#xff0c;正逐渐成为科技领域的焦点&#xff0c;被认为是下一个职业风口。 10个理由告诉你&#xff0c;为什么鸿蒙是下一…

C++(week15): C++提高:(五)Redis数据库

文章目录 零、Redis的安装、API1.redis、hiredis、redis-plus-plus安装2.HiRedis的API 一、Redis数据库的基本概念1.关系型数据库与非关系型数据库的区别2.非关系型数据库的分离3.Redis的概念4.Redis的特性5.Redis的优点 二、Redis常用命令三、Redis的五大数据类型及其命令1.st…

清除 Nuxt 状态缓存:clearNuxtState

title: 清除 Nuxt 状态缓存&#xff1a;clearNuxtState date: 2024/8/7 updated: 2024/8/7 author: cmdragon excerpt: 摘要&#xff1a;本文介绍了Nuxt.js框架中clearNuxtState方法的使用&#xff0c;该方法用于清除useState管理的状态缓存&#xff0c;确保应用状态的有效性…

Apache POI 实现 Excel 表格下载

这里以苍穹外卖中数据导出功能为例&#xff0c;记录下 Apache POI 导出 Excel 表格的过程。 首先在 pom.xml 中导入相关依赖 <!-- poi 用于操作 excel 表格--> <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId&…

详细LVS实验配置

一、LVS的NAT模式 1、实验环境 主机名ipVIP角色lvs192.168.0.100172.25.254.100调度器webserver1192.168.0.10&#xff0c;网关192.168.0.100null 真实服务器&#xff08; RS &#xff09; webserver2192.168.0.20&#xff0c;网关192.168.0.100null 真实服务器&#xff08; R…