1、DFSClient类简介
DFSClient
是 Hadoop 分布式文件系统(HDFS)中的一个核心类,用于客户端与 HDFS 之间的交互。它提供了一组方法,使客户端应用程序可以方便地与 HDFS 进行通信,包括文件的读取、写入、创建、删除、重命名等操作。DFSClient
封装了与 NameNode 和 DataNode 的通信细节,使得客户端开发者可以通过高级 API 进行文件系统操作,而不必关心底层的实现细节。
2、DFSClient主要功能
2.1、文件读取和写入
- 提供方法用于读取和写入 HDFS 上的文件。
- 例如,
open
方法用于打开文件以读取,create
方法用于创建新文件以写入。
2.2、文件操作
- 支持文件的创建、删除、重命名、追加等操作。
- 例如,
delete
方法用于删除文件或目录,rename
方法用于重命名文件或目录。
2.3、目录操作
- 支持创建、删除和列出目录。
- 例如,
mkdirs
方法用于创建目录,listPaths
方法用于列出目录内容。
2.4、获取文件和目录信息
- 提供方法获取文件和目录的元数据信息。
- 例如,
getFileInfo
方法用于获取文件或目录的详细信息,getLocatedBlocks
方法用于获取文件的块位置。
2.5、与NN、DN通信
- 管理与 NameNode 的通信,用于获取文件的元数据和块位置信息。
- 管理与 DataNode 的通信,用于读取和写入实际的数据块。
3、DFSClient核心源码
DFSClient源码主要包括:创建客户端连接(配置获取、令牌处理、连接地址解析)
3.1、构造方法
3.1.1、代码概述
该构造函数已废弃,接受一个Configuration对象,并调用另一个构造函数获取NameNode地址
@Deprecated
public DFSClient(Configuration conf) throws IOException {
this(DFSUtilClient.getNNAddress(conf), conf);
}
该构造函数接受一个InetSocketAddress对象和一个Configuration对象,并将InetSocketAddress 转换为URI然后调用另一个基于URI的构造函数
public DFSClient(InetSocketAddress address, Configuration conf)
throws IOException {
this(DFSUtilClient.getNNUri(address), conf);
}
该构造函数接受一个URI对象和一个Configuration对象,并将FileSystem.Statistics参数设置为 null,然后调用另一个更完整的构造函数
public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {
this(nameNodeUri, conf, null);
}
该构造函数接受一个URI对象、一个Configuration对象和一个FileSystem.Statistics对象,然后调用最完整的构造函数
public DFSClient(URI nameNodeUri, Configuration conf,
FileSystem.Statistics stats) throws IOException {
this(nameNodeUri, null, conf, stats);
}
最底层构造函数,该方法不建议直接调用。
@VisibleForTesting
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats) throws IOException {
// Copy only the required DFSClient configuration
this.tracer = FsTracer.get(conf);
this.dfsClientConf = new DfsClientConf(conf);
this.conf = conf;
this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.dtpReplaceDatanodeOnFailureReplication = (short) conf
.getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION,
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION_DEFAULT);
LOG.debug("Sets {} to {}",
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
MIN_REPLICATION, dtpReplaceDatanodeOnFailureReplication);
this.ugi = UserGroupInformation.getCurrentUser();
this.namenodeUri = nameNodeUri;
this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
ThreadLocalRandom.current().nextInt() + "_" +
Thread.currentThread().getId();
int numResponseToDrop = conf.getInt(
DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
ProxyAndInfo<ClientProtocol> proxyInfo = null;
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
if (numResponseToDrop > 0) {
// This case is used for testing.
LOG.warn("{} is set to {} , this hacked client will proactively drop responses",
DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, numResponseToDrop);
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
nameNodeUri, ClientProtocol.class, numResponseToDrop,
nnFallbackToSimpleAuth);
}
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
String localInterfaces[] =
conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES);
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
LOG.debug("Using local interfaces [{}] with addresses [{}]",
Joiner.on(',').join(localInterfaces),
Joiner.on(',').join(localInterfaceAddrs));
}
Boolean readDropBehind =
(conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
null : conf.getLongBytes(DFS_CLIENT_CACHE_READAHEAD, 0);
this.serverDefaultsValidityPeriod = conf.getTimeDuration(
DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,
DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT,
TimeUnit.MILLISECONDS);
Boolean writeDropBehind =
(conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
this.defaultReadCachingStrategy =
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
this.clientContext = ClientContext.get(
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
dfsClientConf, conf);
if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
this.initThreadsNumForHedgedReads(dfsClientConf.
getHedgedReadThreadpoolSize());
}
this.initThreadsNumForStripedReads(dfsClientConf.
getStripedReadThreadpoolSize());
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}
3.1.2、重点剖析
DFSClient的核心构建方式是传入namenode节点对应的URI以及配置信息,也是我们构建DFSClient通常使用的方法
public DFSClient(URI nameNodeUri, Configuration conf) throws IOException { this(nameNodeUri, conf, null); }
3.2、委托令牌处理
这段源码是一个用于续约和取消 HDFS 委托令牌(Delegation Token)的 Renewer 类,它继承自 TokenRenewer 类。主要功能是通过与 NameNode 通信,维护和管理委托令牌的生命周期。
3.2.1、代码概述
3.2.2、重点剖析
- static静态代码块为初始化hdfs配置文件;
- handleKind方法用于判断是否处理指定类型的委托令牌,在当前源码中会默认判定是否为HDFS的委托令牌类型;
renew
方法用于续约委托令牌。它通过getNNProxy
方法获取到与委托令牌对应的 NameNode 代理,然后调用renewDelegationToken
方法进行委托令牌的续约操作;cancel
方法用于取消委托令牌。它也通过getNNProxy
方法获取 NameNode 代理,然后调用cancelDelegationToken
方法执行委托令牌的取消操作;getNNProxy
方法根据委托令牌获取对应的 NameNode 代理。它首先根据委托令牌的信息构建 URI,然后通过NameNodeProxiesClient
类的静态方法创建 NameNode 的代理对象,并返回该代理对象。
3.3、getLocalInterfaceAddrs
3.3.1、代码概述
这个方法的作用是接受一个接口名称的数组,并根据每个接口名称解析成对应的本地地址(可以是 IP 地址、子网或域名)。它首先尝试将接口名称视为一个 IP 地址,如果不是,则检查它是否是一个有效的子网,如果仍然不是,则假定它是一个域名,并通过 DNS 解析。最终,所有解析出的地址都被封装为 InetSocketAddress
对象,并返回一个包含这些地址的数组。
private static SocketAddress[] getLocalInterfaceAddrs(
String interfaceNames[]) throws UnknownHostException {
List<SocketAddress> localAddrs = new ArrayList<>();
for (String interfaceName : interfaceNames) {
if (InetAddresses.isInetAddress(interfaceName)) {
localAddrs.add(new InetSocketAddress(interfaceName, 0));
} else if (NetUtils.isValidSubnet(interfaceName)) {
for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
localAddrs.add(new InetSocketAddress(addr, 0));
}
} else {
for (String ip : DNS.getIPs(interfaceName, false)) {
localAddrs.add(new InetSocketAddress(ip, 0));
}
}
}
return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
}
3.3.2、重点剖析
- 该方法首先检查interfaceName是否是一个有效的IP地址:
- 如果不是IP地址,检查interfaceName是否是一个有效的子网:
- 如果是有效的子网,获取该子网中的所有IP地址,并将每个IP地址封装为InetSocketAddress对象,添加到localAddrs列表中。
- 如果既不是IP地址也不是子网,假定它是一个域名:
- 通过DNS解析获取该域名的所有IP地址,并将每个IP地址封装为InetSocketAddress对象,添加到localAddrs列表中。
3.4、getRandomLocalInterfaceAddr
3.4.1、代码概述
这个方法的作用是从一组预先配置的本地接口地址 (localInterfaceAddrs
数组) 中随机选择一个地址并返回。
SocketAddress getRandomLocalInterfaceAddr() {
if (localInterfaceAddrs.length == 0) {
return null;
}
final int idx = r.nextInt(localInterfaceAddrs.length);
final SocketAddress addr = localInterfaceAddrs[idx];
LOG.debug("Using local interface {}", addr);
return addr;
}
3.4.2、重点剖析
- 检查
localInterfaceAddrs
数组是否为空,如果为空则返回null
。 - 使用随机数生成器
r
生成一个随机索引idx
。 - 获取并返回
localInterfaceAddrs
数组中对应索引idx
的SocketAddress
对象。 - 在返回之前,记录调试日志以便于跟踪选中的本地接口地址。
3.5、读写超时时间判定
3.5.1、代码概述
这段代码包含两个方法:getDatanodeWriteTimeout
和 getDatanodeReadTimeout
,它们用于计算数据节点写入和读取的超时时间。每个方法都接收一个参数 numNodes
,表示数据节点的数量。
int getDatanodeWriteTimeout(int numNodes) {
final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
}
int getDatanodeReadTimeout(int numNodes) {
final int t = dfsClientConf.getSocketTimeout();
return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
}
3.5.2、重点剖析
- 通过dfsclientconf获取写入\读取超时时间
t;
如果t大于0则
返回t
加上一个扩展超时时间,这个扩展超时时间是常量HdfsConstants.WRITE_TIMEOUT_EXTENSION
乘以numNodes
(数据节点数量)- 如果t<=0,则返回0
3.6、租约管理
3.6.1、代码概述
这段代码定义了三个方法:getLeaseRenewer
、beginFileLease
和 endFileLease
,用于管理HDFS中的文件租约。文件租约机制确保文件在写入过程中不会被其他客户端修改或删除。
public LeaseRenewer getLeaseRenewer() {
return LeaseRenewer.getInstance(
namenodeUri != null ? namenodeUri.getAuthority() : "null", ugi, this);
}
/** Get a lease and start automatic renewal */
private void beginFileLease(final String key, final DFSOutputStream out) {
synchronized (filesBeingWritten) {
putFileBeingWritten(key, out);
LeaseRenewer renewer = getLeaseRenewer();
boolean result = renewer.put(this);
if (!result) {
// Existing LeaseRenewer cannot add another Daemon, so remove existing
// and add new one.
LeaseRenewer.remove(renewer);
renewer = getLeaseRenewer();
renewer.put(this);
}
}
}
/** Stop renewal of lease for the file. */
void endFileLease(final String renewLeaseKey) {
synchronized (filesBeingWritten) {
removeFileBeingWritten(renewLeaseKey);
// remove client from renewer if no files are open
if (filesBeingWritten.isEmpty()) {
getLeaseRenewer().closeClient(this);
}
}
}
3.6.2、重点剖析
- 获取租约续约器:
getLeaseRenewer
方法返回一个LeaseRenewer
实例,用于管理租约的续约。- 获取租约续约器
- 调用 LeaseRenewer.getInstance 方法获取 LeaseRenewer 实例。
- 如果 namenodeUri 不为空,则使用其权限部分(authority),否则使用 "null"。ugi(用户组信息)和当前 DFSClient 实例(this)作为参数传递给 LeaseRenewer.getInstance
- 开始文件租约:
beginFileLease
方法将文件添加到写入记录中,并确保当前客户端的租约续约器能够处理该文件的续约。- 使用 key 和 out(DFSOutputStream 实例)调用 putFileBeingWritten 方法,记录正在写入的文件;
- 获取 LeaseRenewer 实例;
- 调用 renewer.put(this) 方法将当前客户端添加到租约续约器中;
- 如果返回结果为 false(表示现有的 LeaseRenewer 不能添加新的守护线程),则移除现有的 LeaseRenewer,获取新的 LeaseRenewer 实例,并将当前客户端添加到新的 LeaseRenewer 中;
- 结束文件租约:
endFileLease
方法移除文件写入记录,并在没有文件写入时关闭客户端的租约续约- 使用 renewLeaseKey 调用 removeFileBeingWritten 方法,从记录中移除正在写入的文件
- 如果没有文件在写入(filesBeingWritten 为空),则获取 LeaseRenewer 实例,调用 renewer.closeClient(this) 方法,关闭当前客户端的租约续约。
todo,未完待续