hdfs源码解析之DFSClient

news2024/11/23 19:20:14

1、DFSClient类简介

    DFSClient 是 Hadoop 分布式文件系统(HDFS)中的一个核心类,用于客户端与 HDFS 之间的交互。它提供了一组方法,使客户端应用程序可以方便地与 HDFS 进行通信,包括文件的读取、写入、创建、删除、重命名等操作。DFSClient 封装了与 NameNode 和 DataNode 的通信细节,使得客户端开发者可以通过高级 API 进行文件系统操作,而不必关心底层的实现细节。

2、DFSClient主要功能

2.1、文件读取和写入

  • 提供方法用于读取和写入 HDFS 上的文件。
  • 例如,open 方法用于打开文件以读取,create 方法用于创建新文件以写入。

2.2、文件操作

  • 支持文件的创建、删除、重命名、追加等操作。
  • 例如,delete 方法用于删除文件或目录,rename 方法用于重命名文件或目录。

2.3、目录操作

  • 支持创建、删除和列出目录。
  • 例如,mkdirs 方法用于创建目录,listPaths 方法用于列出目录内容。

2.4、获取文件和目录信息

  • 提供方法获取文件和目录的元数据信息。
  • 例如,getFileInfo 方法用于获取文件或目录的详细信息,getLocatedBlocks 方法用于获取文件的块位置。

2.5、与NN、DN通信

  • 管理与 NameNode 的通信,用于获取文件的元数据和块位置信息。
  • 管理与 DataNode 的通信,用于读取和写入实际的数据块。

3、DFSClient核心源码

    DFSClient源码主要包括:创建客户端连接(配置获取、令牌处理、连接地址解析)

3.1、构造方法

3.1.1、代码概述

该构造函数已废弃,接受一个Configuration对象,并调用另一个构造函数获取NameNode地址

  @Deprecated
  public DFSClient(Configuration conf) throws IOException {
    this(DFSUtilClient.getNNAddress(conf), conf);
  }

该构造函数接受一个InetSocketAddress对象和一个Configuration对象,并将InetSocketAddress 转换为URI然后调用另一个基于URI的构造函数

  public DFSClient(InetSocketAddress address, Configuration conf)
      throws IOException {
    this(DFSUtilClient.getNNUri(address), conf);
  }

该构造函数接受一个URI对象和一个Configuration对象,并将FileSystem.Statistics参数设置为 null,然后调用另一个更完整的构造函数

  public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {
    this(nameNodeUri, conf, null);
  }

该构造函数接受一个URI对象、一个Configuration对象和一个FileSystem.Statistics对象,然后调用最完整的构造函数

  public DFSClient(URI nameNodeUri, Configuration conf,
      FileSystem.Statistics stats) throws IOException {
    this(nameNodeUri, null, conf, stats);
  }

 最底层构造函数,该方法不建议直接调用。

  @VisibleForTesting
  public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats) throws IOException {
    // Copy only the required DFSClient configuration
    this.tracer = FsTracer.get(conf);
    this.dfsClientConf = new DfsClientConf(conf);
    this.conf = conf;
    this.stats = stats;
    this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
    this.dtpReplaceDatanodeOnFailureReplication = (short) conf
        .getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
                MIN_REPLICATION,
            HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
                MIN_REPLICATION_DEFAULT);
    LOG.debug("Sets {} to {}",
        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
            MIN_REPLICATION, dtpReplaceDatanodeOnFailureReplication);
    this.ugi = UserGroupInformation.getCurrentUser();

    this.namenodeUri = nameNodeUri;
    this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
        ThreadLocalRandom.current().nextInt()  + "_" +
        Thread.currentThread().getId();
    int numResponseToDrop = conf.getInt(
        DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
        DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
    ProxyAndInfo<ClientProtocol> proxyInfo = null;
    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);

    if (numResponseToDrop > 0) {
      // This case is used for testing.
      LOG.warn("{} is set to {} , this hacked client will proactively drop responses",
          DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, numResponseToDrop);
      proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
          nameNodeUri, ClientProtocol.class, numResponseToDrop,
          nnFallbackToSimpleAuth);
    }

    if (proxyInfo != null) {
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    } else if (rpcNamenode != null) {
      // This case is used for testing.
      Preconditions.checkArgument(nameNodeUri == null);
      this.namenode = rpcNamenode;
      dtService = null;
    } else {
      Preconditions.checkArgument(nameNodeUri != null,
          "null URI");
      proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
          nameNodeUri, nnFallbackToSimpleAuth);
      this.dtService = proxyInfo.getDelegationTokenService();
      this.namenode = proxyInfo.getProxy();
    }

    String localInterfaces[] =
        conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES);
    localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
    if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
      LOG.debug("Using local interfaces [{}] with addresses [{}]",
          Joiner.on(',').join(localInterfaces),
          Joiner.on(',').join(localInterfaceAddrs));
    }

    Boolean readDropBehind =
        (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
            null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
    Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
        null : conf.getLongBytes(DFS_CLIENT_CACHE_READAHEAD, 0);
    this.serverDefaultsValidityPeriod = conf.getTimeDuration(
        DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,
        DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT,
        TimeUnit.MILLISECONDS);
    Boolean writeDropBehind =
        (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
            null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
    this.defaultReadCachingStrategy =
        new CachingStrategy(readDropBehind, readahead);
    this.defaultWriteCachingStrategy =
        new CachingStrategy(writeDropBehind, readahead);
    this.clientContext = ClientContext.get(
        conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
        dfsClientConf, conf);

    if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
      this.initThreadsNumForHedgedReads(dfsClientConf.
          getHedgedReadThreadpoolSize());
    }

    this.initThreadsNumForStripedReads(dfsClientConf.
        getStripedReadThreadpoolSize());
    this.saslClient = new SaslDataTransferClient(
        conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
        TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
  }

3.1.2、重点剖析

DFSClient的核心构建方式是传入namenode节点对应的URI以及配置信息,也是我们构建DFSClient通常使用的方法

public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {
  this(nameNodeUri, conf, null);
}

3.2、委托令牌处理      

        这段源码是一个用于续约和取消 HDFS 委托令牌(Delegation Token)的 Renewer 类,它继承自 TokenRenewer 类。主要功能是通过与 NameNode 通信,维护和管理委托令牌的生命周期。

3.2.1、代码概述

3.2.2、重点剖析

  • static静态代码块为初始化hdfs配置文件;
  • handleKind方法用于判断是否处理指定类型的委托令牌,在当前源码中会默认判定是否为HDFS的委托令牌类型;
  • renew 方法用于续约委托令牌。它通过 getNNProxy 方法获取到与委托令牌对应的 NameNode 代理,然后调用 renewDelegationToken 方法进行委托令牌的续约操作;
  • cancel 方法用于取消委托令牌。它也通过 getNNProxy 方法获取 NameNode 代理,然后调用 cancelDelegationToken 方法执行委托令牌的取消操作;
  • getNNProxy 方法根据委托令牌获取对应的 NameNode 代理。它首先根据委托令牌的信息构建 URI,然后通过 NameNodeProxiesClient 类的静态方法创建 NameNode 的代理对象,并返回该代理对象。

3.3、getLocalInterfaceAddrs

3.3.1、代码概述

       这个方法的作用是接受一个接口名称的数组,并根据每个接口名称解析成对应的本地地址(可以是 IP 地址、子网或域名)。它首先尝试将接口名称视为一个 IP 地址,如果不是,则检查它是否是一个有效的子网,如果仍然不是,则假定它是一个域名,并通过 DNS 解析。最终,所有解析出的地址都被封装为 InetSocketAddress 对象,并返回一个包含这些地址的数组。

private static SocketAddress[] getLocalInterfaceAddrs(
      String interfaceNames[]) throws UnknownHostException {
    List<SocketAddress> localAddrs = new ArrayList<>();
    for (String interfaceName : interfaceNames) {
      if (InetAddresses.isInetAddress(interfaceName)) {
        localAddrs.add(new InetSocketAddress(interfaceName, 0));
      } else if (NetUtils.isValidSubnet(interfaceName)) {
        for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
          localAddrs.add(new InetSocketAddress(addr, 0));
        }
      } else {
        for (String ip : DNS.getIPs(interfaceName, false)) {
          localAddrs.add(new InetSocketAddress(ip, 0));
        }
      }
    }
    return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
  }

3.3.2、重点剖析

  1. 该方法首先检查interfaceName是否是一个有效的IP地址:
  2. 如果不是IP地址,检查interfaceName是否是一个有效的子网:
  3. 如果是有效的子网,获取该子网中的所有IP地址,并将每个IP地址封装为InetSocketAddress对象,添加到localAddrs列表中。
  4. 如果既不是IP地址也不是子网,假定它是一个域名:
  5. 通过DNS解析获取该域名的所有IP地址,并将每个IP地址封装为InetSocketAddress对象,添加到localAddrs列表中。

3.4、getRandomLocalInterfaceAddr

3.4.1、代码概述

        这个方法的作用是从一组预先配置的本地接口地址 (localInterfaceAddrs 数组) 中随机选择一个地址并返回。

SocketAddress getRandomLocalInterfaceAddr() {
    if (localInterfaceAddrs.length == 0) {
      return null;
    }
    final int idx = r.nextInt(localInterfaceAddrs.length);
    final SocketAddress addr = localInterfaceAddrs[idx];
    LOG.debug("Using local interface {}", addr);
    return addr;
  }

3.4.2、重点剖析

  1. 检查 localInterfaceAddrs 数组是否为空,如果为空则返回 null
  2. 使用随机数生成器 r 生成一个随机索引 idx
  3. 获取并返回 localInterfaceAddrs 数组中对应索引 idxSocketAddress 对象。
  4. 在返回之前,记录调试日志以便于跟踪选中的本地接口地址。

3.5、读写超时时间判定

3.5.1、代码概述

        这段代码包含两个方法:getDatanodeWriteTimeoutgetDatanodeReadTimeout,它们用于计算数据节点写入和读取的超时时间。每个方法都接收一个参数 numNodes,表示数据节点的数量。

int getDatanodeWriteTimeout(int numNodes) {
    final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
    return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
}

int getDatanodeReadTimeout(int numNodes) {
    final int t = dfsClientConf.getSocketTimeout();
    return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
}

3.5.2、重点剖析

  1. 通过dfsclientconf获取写入\读取超时时间t;
  2. 如果t大于0则返回 t 加上一个扩展超时时间,这个扩展超时时间是常量 HdfsConstants.WRITE_TIMEOUT_EXTENSION 乘以 numNodes(数据节点数量)
  3. 如果t<=0,则返回0

3.6、租约管理

3.6.1、代码概述

        这段代码定义了三个方法:getLeaseRenewerbeginFileLeaseendFileLease,用于管理HDFS中的文件租约。文件租约机制确保文件在写入过程中不会被其他客户端修改或删除。

public LeaseRenewer getLeaseRenewer() {
    return LeaseRenewer.getInstance(
        namenodeUri != null ? namenodeUri.getAuthority() : "null", ugi, this);
  }

  /** Get a lease and start automatic renewal */
  private void beginFileLease(final String key, final DFSOutputStream out) {
    synchronized (filesBeingWritten) {
      putFileBeingWritten(key, out);
      LeaseRenewer renewer = getLeaseRenewer();
      boolean result = renewer.put(this);
      if (!result) {
        // Existing LeaseRenewer cannot add another Daemon, so remove existing
        // and add new one.
        LeaseRenewer.remove(renewer);
        renewer = getLeaseRenewer();
        renewer.put(this);
      }
    }
  }

  /** Stop renewal of lease for the file. */
  void endFileLease(final String renewLeaseKey) {
    synchronized (filesBeingWritten) {
      removeFileBeingWritten(renewLeaseKey);
      // remove client from renewer if no files are open
      if (filesBeingWritten.isEmpty()) {
        getLeaseRenewer().closeClient(this);
      }
    }
  }

3.6.2、重点剖析

  • 获取租约续约器getLeaseRenewer 方法返回一个 LeaseRenewer 实例,用于管理租约的续约。        
    • 获取租约续约器
    • 调用 LeaseRenewer.getInstance 方法获取 LeaseRenewer 实例。
    • 如果 namenodeUri 不为空,则使用其权限部分(authority),否则使用 "null"。ugi(用户组信息)和当前 DFSClient 实例(this)作为参数传递给 LeaseRenewer.getInstance
  • 开始文件租约beginFileLease 方法将文件添加到写入记录中,并确保当前客户端的租约续约器能够处理该文件的续约。
    • 使用 key 和 out(DFSOutputStream 实例)调用 putFileBeingWritten 方法,记录正在写入的文件;
    • 获取 LeaseRenewer 实例;
    • 调用 renewer.put(this) 方法将当前客户端添加到租约续约器中;
    • 如果返回结果为 false(表示现有的 LeaseRenewer 不能添加新的守护线程),则移除现有的 LeaseRenewer,获取新的 LeaseRenewer 实例,并将当前客户端添加到新的 LeaseRenewer 中;
  • 结束文件租约endFileLease 方法移除文件写入记录,并在没有文件写入时关闭客户端的租约续约
    • 使用 renewLeaseKey 调用 removeFileBeingWritten 方法,从记录中移除正在写入的文件
    • 如果没有文件在写入(filesBeingWritten 为空),则获取 LeaseRenewer 实例,调用 renewer.closeClient(this) 方法,关闭当前客户端的租约续约。

todo,未完待续

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1840959.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nidhogg:一款专为红队设计的多功能Rootkit

关于Nidhogg Nidhogg是一款专为红队设计的多功能Rootkit&#xff0c;该工具的主要目的是为红队研究人员提供一个多合一的切易于使用的多功能Rootkit&#xff0c;并允许研究人员通过单个头文件来将其引入到自己的C2框架之中。 当前版本的Nidhogg支持任意版本的x64 Windows 10和…

git的Cherry pick

Cherry pick Git Cherry Pick详解 https://blog.csdn.net/jam_yin/article/details/131594716 目标: 将开发分支A中提交的部分内容合并到B分支(可能是测试分支) 步骤: vscode安装 点击下图标进入graph

【golang学习之旅】使用VScode安装配置Go开发环境

1. 下载并安装Go1.1 下载地址1.2 选择版本并下载1.3 安装目录1.4 验证是否安装成功 2. 配置环境变量2.1 配置步骤2.2 GO部分环境变量说明 3. 下载或更新 Vscode3.1 下载地址3.2 安装步骤 4. 为Go开发配置VScode 1. 下载并安装Go 1.1 下载地址 https://studygolang.com/dl 1.…

压缩pdf文件大小,如何压缩pdf

压缩PDF文件是现代办公中常见的需求&#xff0c;因为PDF文件往往包含了大量的图片、文本和格式信息&#xff0c;导致文件体积较大&#xff0c;不利于传输和存储。本文将详细介绍如何压缩PDF文件&#xff0c;我们一起来看一下。 浏览器打开 "轻云处理pdf官网" &#x…

计网重点面试题-TCP三次握手四次挥手

三次握手 第一次握手(syn1) 客户端会随机初始化序号&#xff08;client_isn&#xff09;&#xff0c;将此序号置于 TCP 首部的「序列号」字段中&#xff0c;同时把 SYN 标志位置为 1&#xff0c;表示 SYN 报文。接着把第一个 SYN 报文发送给服务端&#xff0c;表示向服务端发…

微波传感器,人体接近传感器,ATM微波传感器人体存在传感器

史新华 微波传感器&#xff0c;人体接近传感器&#xff0c;ATM微波传感器人体存在传感器 微波传感器&#xff0c;人体接近传感器&#xff0c;ATM微波传感器YTMW8630 1 产品简介 该YTMW8630是根据微波多普勒效应原理&#xff08;也就是雷达基本原理&#xff09;制作成的&…

Graphviz——实现动态更新协议状态机

1、描述 为了实现动态更新协议状态机&#xff0c;首先需要定义类来表示协议状态机。初始化该类后&#xff0c;保存状态机对象。在后续更新过程中&#xff0c;就可以加载保存的状态机对象&#xff0c;添加新的状态或事件。Graphviz的安装过程参考&#xff1a;Graphviz——安装、…

【AI-6】算力和带宽

上述为大模型训练的显卡选项 tensor fp16 算力是什么&#xff1f; Tensor FP16(Float16)算力是指GPU在执行深度学习的张量计算时,使用float16(半精度浮点)数据类型所能达到的性能指标。 为什么要使用Tensor FP16? 提升计算效率: float16数据类型的存储和计算开销比float32…

htb_Editorial

hack the book Editorial 端口扫描 80 22 目录扫描 /upload 是一个上传book information的页面 其中最顶上有一个可以上传书本封面的地方&#xff0c;可以从本地上传&#xff0c;也可以从远程下载 这里可能涉及ssrf和本地文件上传&#xff0c;逐一尝试 随便上传一个图片…

Redis-五种数据结构之列表(ziplist、quicklist)

列表 文章目录 列表压缩列表-ziplistziplist 定义级联更新 快速列表-quicklistquicklistNode 定义quicklist 定义quicklist常用操作其他操作quicklist 相对于普通链表优点quick应用场景在redis 中使用quicklist 列表数据类型可以存储一组按插入顺序排序的字符串&#xff0c;他很…

web前端-CSS

CSS CSS概述: CSS是Cascading Style Sheets&#xff08;级联样式表&#xff09;,是一种样式表语言,用于控制网页布局,外观(比如背景图片,图片高度,文本颜色,文本字体,高级定位等等) 可将页面的内容与样式分离开,样式放于单独的.css文件或者HTML某处 CSS是网页样式,HTML是网页…

什么是进程?

目录 进程 进程的特征, 概念 我们下面先简单介绍一下什么是进程 接下来看看一个程序的运行过程 进程的组成 进程的状态和转换 进程的状态 进程状态的转换 ​编辑 进程的组织方式 进程控制 如何实现进程控制 为什么进程控制的过程需要一气呵成? 进程控制的实现…

操作系统入门 -- 进程的同步与互斥

操作系统入门 – 进程的同步与互斥 在之前的文章中&#xff0c;我们了解了进程是如何被调度的。但在调度之前&#xff0c;进程需要获得资源。而获得这些资源则可能让进程之间陷入冲突。为了高效且平等地调度线程&#xff0c;需要引入同步功能。 1.临界资源 1.1 临界资源的描述…

用友YonSuite打通招银云直联,让企业收付款更便利

在当今数智化浪潮席卷全球的背景下&#xff0c;企业对于高效、便捷的管理系统需求日益增加。作为全球领先的企业云服务与软件提供商&#xff0c;用友始终站在技术前沿&#xff0c;致力于为成长型企业提供全方位的数智化解决方案。 用友网络与招商银行通过联通双方系统&#xf…

Elasticsearch如何聚合查询多个统计值,如何嵌套聚合?并相互引用,统计索引中某一个字段的空值率?语法是怎么样的

文章目录 Elasticsearch聚合查询说明空值率查询DSL Elasticsearch聚合基础知识扩展Elasticsearch聚合概念Script 用法Elasticsearch聚合查询语法指标聚合&#xff08;Metric Aggregations&#xff09;桶聚合&#xff08;Bucket Aggregations&#xff09;矩阵聚合&#xff08;Ma…

《昇思25天学习打卡营第1天 | 认识MindScope AI框架和昇思大模型平台》

活动地址&#xff1a;https://xihe.mindspore.cn/events/mindspore-training-camp 昇思MindSpore学习笔记&#xff1a;探索AI的无限可能 嗨&#xff0c;AI爱好者们&#xff01;今天&#xff0c;我要带你们深入了解一个强大的全场景深度学习框架——昇思MindSpore。 准备好了吗…

信创数据库沙龙 | 全国预告

#数据库沙龙 #国产数据库 #信创数据库

领课教育本地部署教程

一.本地运行环境准备 因node版本需大于18.0所以推荐使用win10及以上系统 1.1MySQL&#xff0c;版本&#xff1a;8.0.x 安装完成导入sql脚本 1.2Redis&#xff0c;版本&#xff1a;3.2&#xff0c;推荐使用最新版本 下载地址&#xff1a;https://github.com/tporadowski/red…

KVB投资安全小知识:如何识别一个货币是避险货币还是风险货币?

摘要 在全球经济不断变化的今天&#xff0c;理解货币的避险属性和风险特征对投资者至关重要。本文将详细探讨如何准确识别一个货币是避险货币还是风险货币&#xff0c;并结合具体的货币案例分析它们的本质差异。通过深入分析不同因素对货币走势的影响&#xff0c;帮助读者在金…

数字孪生技术及其广泛应用场景探讨

通过将实际物理世界中的物体或系统建模、模拟和分析&#xff0c;数字孪生技术可以提供更精确、更可靠、更高效的解决方案。数字孪生技术在智能制造、城市建设、智慧物流等众多领域中得到了广泛的应用。 通过将数据可视化呈现在虚拟环境中&#xff0c;我们可以更清晰地观察和理…