深入HDFS——数据上传源码

news2025/1/19 20:33:58

引入

就如RPC篇章里提到的观点一样,任何一种能广为传播的技术,都是通过抽象和封装的思想,屏蔽底层底层复杂实现,提供简单且强大的工具,来降低使用门槛的。

HDFS的风靡自然也是如此。

通过前面深入了NameNode和DataNode的启动源码,我们已经是略有体会,但重启毕竟属于工作时几乎遇不到的场景,所以今天我们从HDFS最常用的上传功能入手,去看看HDFS是如何实现的。

数据上传过程

既然是我们要使用的功能,自然需要我们自己动手编写向HDFS中写入数据的代码啦。

下面我写了一个简单的写入数据代码:

public class WriteDataToHDFS {
    public static void main(String[] args) throws IOException, InterruptedException{
        Configuration conf = new Configuration();
  
        //创建FileSystem对象
        FileSystem fs= FileSystem.get(URI.create("hdfs://hadoop1:8020/"),conf,"root");

        //创建HDFS文件路径
        Path path = new Path("/chaos.txt");
        FSDataOutputStream out = fs.create(path);

        //向HDFS中写出数据
        out.write("hello chaos".getBytes());
    }
}

可以看到,实现的代码是很简单的。

使用确实是很容易的,那么底层实现是怎样的呢?

我们先通过前面的了解,先来梳理一下,客户端向 HDFS写入数据的实现流程:

  1. 客户端与NameNode进行通信,获取数据写入HDFS中对应哪些DataNode节点;
  2. 在客户端将数据划分成packet传输到HDFS各个DataNode节点上。

看起来好像也不难的样子?

但实际上底层实现可没那么简单。下面我们会从以下几个模块去深入源码,一起看看HDFS是如何实现数据上传的,里面又有哪些有意思的细节。

  1. 创建文件系统并初始化DFSClient
  2. 连接NameNode创建目录
  3. 启动DataStreamer线程
  4. 向dataQueue队列中写入packet
  5. 设置副本写入策略源码
  6. 客户端与DataNode建立socket通信
  7. 向Datanode中上传数据

1.创建文件系统并初始化DFSClient

操作HDFS前需要创建文件系统,并初始化DFSClient对象,该对象中持有与NameNode通信的NameNode Rpc Proxy。

而DFSClient对象的创建代码如下:

FileSystem fileSystem = FileSystem.get(URI.create("hdfs://hadoop1:8020/"),conf,"root");

这里也可以通过FileSystem.newInstance(conf)创建,不过殊途同归,底层实现是类似的。

FileSystem.get()具体源码如下:

public static FileSystem get(final URI uri, final Configuration conf,
      final String user) throws IOException, InterruptedException {
  String ticketCachePath =
    conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
  UserGroupInformation ugi =
      UserGroupInformation.getBestUGI(ticketCachePath, user);
  return ugi.doAs(new PrivilegedExceptionAction() {
    @Override
    public FileSystem run() throws IOException {
      //创建分布式文件系统及初始化DFSClient
      return get(uri, conf);
    }
  });
}

继续往里走,来看get(uri, conf)方法源码如下:

public static FileSystem get(URI uri, Configuration conf) throws IOException {
  ... ...
  //创建分布式文件系统及初始化DFSClient
  return CACHE.get(uri, conf);
}

CACHE.get(uri,conf)又调用到如下源码:

FileSystem get(URI uri, Configuration conf) throws IOException{
  Key key = new Key(uri, conf);
  //创建分布式文件系统及初始化DFSClient
  return getInternal(uri, conf, key);
}

getInternal方法源码如下:

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
  FileSystem fs;
  synchronized (this) {
    fs = map.get(key);
  }
  if (fs != null) {
    return fs;
  }
  // 创建分布式文件系统及初始化DFSClient
  fs = createFileSystem(uri, conf);
  ... ...
  return fs;
}

从上面源码可以看到,getInternal方法中,核心就在createFileSystem方法,这个方法会创建分布式文件系统及初始化DFSClient。

关于这个文件系统对应的类是什么,其实直接跟着代码找还是很难找到的,但是熟悉面向对象知识的小伙伴,肯定一看就知道,这个类一定是FileSystem的实现类。

那我们就先来看看FileSystem的注释:

An abstract base class for a fairly generic filesystem. It may be implemented as a distributed filesystem, or as a "local" one that reflects the locally-connected disk. The local version exists for small Hadoop instances and for testing.

All user code that may potentially use the Hadoop Distributed File System should be written to use a FileSystem object. The Hadoop DFS is a multi-machine system that appears as a single disk. It's useful because of its fault tolerance and potentially very large capacity.

The local implementation is {@link LocalFileSystem} and distributed implementation is DistributedFileSystem.

翻译:

这是一个相当通用的文件系统的抽象基类。它可能被实现为分布式文件系统,或者是反映本地连接磁盘的 “本地” 文件系统。本地版本适用于小型 Hadoop 实例和测试。

 

所有可能潜在使用 Hadoop 分布式文件系统的用户代码,都应编写为使用文件系统对象。Hadoop DFS 是一个多机系统,表现得像单个磁盘。它很有用,因为其具有容错性和潜在的非常大的容量。

 

本地实现是 {@link LocalFileSystem} ,分布式实现是 DistributedFileSystem 。

这下子就清晰了,我们这里创建的分布式文件系统类自然就是DistributedFileSystem类。(org.apache.hadoop.hdfs.DistributedFileSystem

我们回来接着看createFileSystem的源码如下:

private static FileSystem createFileSystem(URI uri, Configuration conf
    ) throws IOException {
  //创建的 class 为 org.apache.hadoop.hdfs.DistributedFileSystem
  Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
  //初始化分布式文件系统
  FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
  //调用到 DistributedFileSystem 中的 initialize方法,初始化创建DFSClient
  fs.initialize(uri, conf);
  return fs;
}

可以看到,是通过ReflectionUtils.newInstance(clazz, conf) 获取的这个文件系统类。

但在执行ReflectionUtils.newInstance(clazz, conf) 前,通过getFileSystemClass(uri.getScheme(), conf) 获取clazz才是重点,其对应源码如下:

public static Class<? extends FileSystem> getFileSystemClass(String scheme,
    Configuration conf) throws IOException {
  if (!FILE_SYSTEMS_LOADED) {
    //将 FileSystem 抽象类的所有实现类中的 schema,和对应实现类,放入 SERVICE_FILE_SYSTEMS 对象中
    loadFileSystems();
  }
  Class<? extends FileSystem> clazz = null;
  if (conf != null) {
    // 从配置中获取 fs.hdfs.impl
    // 如果配置文件中没有配置 fs.hdfs.impl 那么获取的clazz 为 null
    // 3.x这里会有些小区别
    clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
  }
  if (clazz == null) {
    //获取的clazz 为 org.apache.hadoop.hdfs.DistributedFileSystem
    clazz = SERVICE_FILE_SYSTEMS.get(scheme);
  }
  if (clazz == null) {
    throw new IOException("No FileSystem for scheme: " + scheme);
  }
  return clazz;
}

以上代码中loadFileSystems(),会将 FileSystem 抽象类的所有实现类中的 schema,和对应实现类,放入 SERVICE_FILE_SYSTEMS 对象中。

loadFileSystems()实现源码如下:

private static void loadFileSystems() {
  synchronized (FileSystem.class) {
    if (!FILE_SYSTEMS_LOADED) {
      //ServiceLoader.load(FileSystem.class)会加载所有 FileSystem 实现类中的schema信息
      ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
      for (FileSystem fs : serviceLoader) {
        //将所有文件系统的 schema信息存入 SERVICE_FILE_SYSTEMS Map中
        SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
      }
      FILE_SYSTEMS_LOADED = true;
    }
  }
}

getFileSystemClass 方法中首先从配置文件中获取 fs.hdfs.impl 配置的HDFS类,默认在HDFS中没有配置该属性,该属性也没有默认值,所以得到clazz为null,进而执行 SERVICE_FILE_SYSTEMS.get(scheme) 得到的clazz为org.apache.hadoop.hdfs.DistributedFileSystem。

FileSystem.createFileSystem() 中执行 fs.initialize(uri, conf) 时,这里的fs就是org.apache.hadoop.hdfs.DistributedFileSystem类,所以相当于执行的是DistributedFileSystem.initialize

对应实现与源码如下:

public void initialize(URI uri, Configuration conf) throws IOException {
  super.initialize(uri, conf);
  setConf(conf);

  String host = uri.getHost();
  if (host == null) {
    throw new IOException("Incomplete HDFS URI, no host: "+ uri);
  }
  homeDirPrefix = conf.get(
      DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
      DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
  //创建DFSClient,传入的URI 为NameNode URI
  this.dfs = new DFSClient(uri, conf, statistics);
  this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
  this.workingDir = getHomeDirectory();
}

通过以上代码,会创建DFSClient对象,并在创建DFSClient对象时创建NameNode Rpc Proxy对象,并赋值给其属性namenode,方便后续客户端和NameNode进行通信。

具体的 new DFSClient构造如下:

public DFSClient(URI nameNodeUri, Configuration conf, FileSystem.Statistics stats) throws IOException {
	//创建DFSClient ,传入了 NameNode的URI
    this(nameNodeUri, null, conf, stats);
}

this调用到DFSClient实现,其中创建了NameNode Rpc Proxy 并赋值给了namenode属性。

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
    Configuration conf, FileSystem.Statistics stats) throws IOException {
    ... ...
    //获取NameNode Rpc Proxy
    proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
        nameNodeUri, ClientProtocol.class, numResponseToDrop,
        nnFallbackToSimpleAuth);
    ... ...
    //给DFSClient中的namenode 赋值 NameNode的Rpc Proxy对象
    this.namenode = proxyInfo.getProxy();
    ... ...
}

后续客户端可以通过DFSClient.namenode获取到NameNode的RPC Proxy对象与NameNode进行通信。

2.连接NameNode创建目录

在我们编写的代码执行到 fs.create(path) 时,会在HDFS中创建目录并准备dataQueue,dataQueue用于客户端数据传输队列,并最后返回 FSDataOutputStream 对象,该对象用于向HDFS中写数据。

跟进fs.create() 源码一层层对象包装,会发现该create方法最终实际调到 DistributedFileSystem.create()方法,其源码如下:

@Override
public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite, int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {
  //返回 FSDataOutputStream 对象
  return this.create(f, permission,
      overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
          : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
      blockSize, progress, null);
}

以上create方法会继续调用到DistributedFileSystem.create()方法,只是参数不同,源码如下:

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
  final EnumSet<CreateFlag> cflags, final int bufferSize,
  final short replication, final long blockSize, final Progressable progress,
  final ChecksumOpt checksumOpt) throws IOException {
  ... ...
  return new FileSystemLinkResolver<FSDataOutputStream>() {
    @Override
    public FSDataOutputStream doCall(final Path p)
        throws IOException, UnresolvedLinkException {
        //创建了一个DFSOutputStream,做了很多初始化操作
      /**
       *  1.往文件目录树里面添加了INodeFile
       *  2.添加了契约管理
       *  3.启动了DataStreamer(写数据流程的关键服务)
       */
      //执行dfs.create方法,最终调用到 DFSClient.create方法
      final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
              cflags, replication, blockSize, progress, bufferSize,
              checksumOpt);
      //FSDataOutputStream 是DFSOutputStream 进行了再一次的封装。【装饰模式】
      return dfs.createWrappedOutputStream(dfsos, statistics);
    }
   ... ...
}

3.x版本,在上面代码最后会 return safelyCreateWrappedOutputStream(dfsos),感兴趣的小伙伴可以深入看看它们的区别。

以上代码执行dfs.create方法,最终调用到 DFSClient.create方法:

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication,
		long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException {
	return create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null);
}

DFSClient.create方法又经过一些列参数包装,最终调用到如下源码:

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent,
		short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt,
		InetSocketAddress[] favoredNodes) throws IOException {
	checkOpen();
	if (permission == null) {
		permission = FsPermission.getFileDefault();
	}
	FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
	if (LOG.isDebugEnabled()) {
		LOG.debug(src + ": masked=" + masked);
	}
	// newStreamForCreate中获取到NameNoae Rpc Proxy 代理对象并连接创建目录,然后启动DataStreamer 线程用于接收客户端上传的packet
	/**
	 * 总结:
	 * 1.往文件目录树里面添加了文件
	 * 2.添加了契约
	 * 3.启动了DataStreamer
	 */
	final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent,
			replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt),
			getFavoredNodesStr(favoredNodes));


	// 开启续约(契约)
	beginFileLease(result.getFileId(), result);
	return result;
}

以上代码我补充了很多注释,这里就不赘述了,下面接着看newStreamForCreate实现源码如下:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked,
		EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress,
		int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException {
    ... ...
    //重试的代码结构
    while (shouldRetry) {
        shouldRetry = false;
        try {
            /**
             * HDFS原理总结:
             * 创建目录:就是在 目录树(元数据)上面添加一个子Node (INodeDirectory)
             * 上传文件:
             *     1.在目录树里面添加一个字Node(InodeFile)
             *     2.再往文件里面写数据
             *     更新了元数据
             *     添加了契约
             *  往目录树里添加InodeFile,记录元数据日志和添加契约
             *  这儿都是需要跟Namenode的服务端进行交互的
             */
            // dfsClient.namenode 就是 NameNode Rpc Proxy 对象,调用的create方法,调用到NameNodeRpcServer.create方法
            stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
                    new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize,
                    SUPPORTED_CRYPTO_VERSIONS);
            break;
        } catch (RemoteException re) {
            IOException e = re.unwrapRemoteException(AccessControlException.class,
                    DSQuotaExceededException.class, FileAlreadyExistsException.class,
                    FileNotFoundException.class, ParentNotDirectoryException.class,
                    NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class,
                    UnresolvedPathException.class, SnapshotAccessControlException.class,
                    UnknownCryptoProtocolVersionException.class);
            if (e instanceof RetryStartFileException) {
                //重试
                if (retryCount > 0) {
                    shouldRetry = true;
                    retryCount--;
                } else {
                    throw new IOException("Too many retries because of encryption" + " zone operations", e);
                }
            } else {
                throw e;
            }
        }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");

    //普通写文件策略,out对象是DFSOutputStream
    //该DFSOutputStream构造中会创建DataStreamer 线程,负责向HDFS中写数据
    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum,
            favoredNodes);
    //启动DataStreamer 线程 ,运行run方法
    out.start();
    return out;
	... ...
}

以上代码dfsClient.namenode.create()方法会通过NameNode Rpc Proxy 对象调用到NameNodeRpcServer.create方法,然后在HDFS中经过一些目录和权限判断来创建对应目录。NameNodeRpcServer.create源码如下:

@Override // ClientProtocol 客户端创建文件
public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions)
    throws IOException {
    //检查namenoe启动状态
    checkNNStartup();
    ... ...
    //创建文件核心代码
    status = namesystem.startFile(src, perm, clientName, clientMachine,
        flag.get(), createParent, replication, blockSize, supportedVersions,
        cacheEntry != null);
    ... ...
    return status;
}

以上startFile源码如下:

HdfsFileStatus startFile(String src, PermissionStatus permissions,
    String holder, String clientMachine, EnumSet<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
    throws AccessControlException, SafeModeException,
    FileAlreadyExistsException, UnresolvedLinkException,
    FileNotFoundException, ParentNotDirectoryException, IOException {
    ... ...
    //创建文件目录
    status = startFileInt(src, permissions, holder, clientMachine, flag,
        createParent, replication, blockSize, supportedVersions,
        logRetryCache);
    ... ...
    return status;
}

startFileInt实现源码如下:

private HdfsFileStatus startFileInt(String src,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet flag, boolean createParent, short replication,
    long blockSize, CryptoProtocolVersion[] supportedVersions,
    String ecPolicyName, String storagePolicy, boolean logRetryCache)
    throws IOException {
    ... ...
    //创建文件目录
    stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
        clientMachine, flag, createParent, replication, blockSize, feInfo,
        toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy,
        logRetryCache);
    ... ...
}

以上代码中startFile实现如下:

static HdfsFileStatus startFile(
    FSNamesystem fsn, INodesInPath iip,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet flag, boolean createParent,
    short replication, long blockSize,
    FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
    boolean shouldReplicate, String ecPolicyName, String storagePolicy,
    boolean logRetryEntry)
    throws IOException {
    ... ...
    //创建文件
    iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
        replication, blockSize, holder, clientMachine, shouldReplicate,
        ecPolicyName, storagePolicy);
    ... ...
}

addFile中最终会执行 fsd.addINode(existing, newNode, permissions.getPermission()) 向HDFS中添加目录信息。

3.启动DataStreamer线程

顺着流程就走到 DFSOutputStream.newStreamForCreate() 源码的下半部分:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
    FsPermission masked, EnumSet flag, boolean createParent,
    short replication, long blockSize, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, String ecPolicyName,
    String storagePolicy)
    throws IOException {
    ... ...
    // 普通写文件策略,out对象是DFSOutputStream
    // 该DFSOutputStream构造中会创建DataStreamer 线程,负责向HDFS中写数据
    out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes, true);
    ... ...
    //启动DataStreamer 线程 ,运行run方法
    out.start();
    return out;
}

当向NameNode连接创建目录后,会执行new DFSOutputStream()创建DFSOutputStream对象并最终返回,在创建该对象的构造中同时创建了DataStreamer对象并赋值给streamer属性,DataStreamer对象负责后续接收客户端上传数据并将数据发送pipeline方式发送到DataNode上,该对象为一个线程,创建DFSOutputStream对象完成后会执行out.start()方法进行启动。

new DFSOutputStream实现源码如下:

private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag,
		Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
	this(dfsClient, src, progress, stat, checksum);
	this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
	
    //计算写入数据包的大小,默认每个packetSize大小为64kb
	computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
	 
	//创建 DataStreamer 对象负责 向HDFS中写入数据
	streamer = new DataStreamer(stat, null);
	if (favoredNodes != null && favoredNodes.length != 0) {
		streamer.setFavoredNodes(favoredNodes);
	}
}

3.x对应源码有一些小区别

protected DFSOutputStream(DFSClient dfsClient, String src,
    HdfsFileStatus stat, EnumSet flag, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
    ... ...
    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
        bytesPerChecksum);
    ... ...
    streamer = new DataStreamer(stat, null, dfsClient, src, progress,
        checksum, cachingStrategy, byteArrayManager, favoredNodes,
        addBlockFlags);
    ... ...
}

可以看到,在DFSOutputStream创建同时,获取了后续写入数据时的packet大小(默认为64K),并给其streamer属性初始化了DataStreamer值(DataStreamer是一个线程)。

当创建好 DFSOutputStream对象后赋值给out对象,当执行out.start()方法时,实际上执行的就是streamer.start,由于DataStreamer是一个线程,所以最终调用到其中的run方法。

我们可以先看一下源码注释:

DFSOutputStream creates files from a stream of bytes.

The client application writes data that is cached internally by this stream.Data is broken up into packets, each packet is typically 64K in size. A packet comprises of chunks. Each chunk is typically 512 bytes and has an associated checksum with it.

 
When a client application fills up the currentPacket, it is enqueued into dataQueue. The DataStreamer thread picks up packets from the dataQueue, sends it to the first datanode in the pipeline and moves it from the dataQueue to the ackQueue. The ResponseProcessor receives acks from the datanodes. When an successful ack for a packet is received from all datanodes, the ResponseProcessor removes the corresponding packet from the ackQueue.

In case of error, all outstanding packets and moved from ackQueue. A new pipeline is setup by eliminating the bad datanode from the original pipeline.The DataStreamer now starts sending packets from the dataQueue.

翻译:

DFSOutputStream 从字节流创建文件。

 

客户端应用程序写入的数据由该流在内部进行缓存。数据被分解为数据包,每个数据包通常大小为 64K。一个数据包由数据块组成。每个数据块通常为 512 字节,并带有相关的校验和。

 

当客户端应用程序填满当前数据包时,它会被排入数据队列。数据流式处理线程从数据队列中取出数据包,将其发送到管道中的第一个数据节点,并将其从数据队列移动到确认队列。响应处理器从数据节点接收确认。当从所有数据节点收到一个数据包的成功确认时,响应处理器从确认队列中删除相应的数据包。

 

在出现错误的情况下,所有未完成的数据包都会从确认队列中移出。通过从原始管道中排除有问题的数据节点来设置新的管道。数据流式处理线程现在开始从数据队列发送数据包。

DataStreamer.run方法的源码如下:

/*
 * streamer thread is the only thread that opens streams to datanode, and closes
 * them. Any error recovery is also done by this thread.
 * 数据流式处理线程是唯一打开与数据节点的流并关闭它们的线程。任何错误恢复也由这个线程完成。
 */
@Override
public void run() {
	... ...
    synchronized (dataQueue) {
    // wait for a packet to be sent.
    // 等待packet 放入到  dataQueue,packet当客户端写入数据时才会放入到dataQueue
    long now = Time.monotonicNow();
    //第一次进来的时候,因为没有数据所以代码走的是这儿
    // dataQueue.size() == 0
    while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0
            && (stage != BlockConstructionStage.DATA_STREAMING
                    || stage == BlockConstructionStage.DATA_STREAMING
                            && now - lastPacket < dfsClient.getConf().socketTimeout / 2))
            || doSleep) {
        long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket);
        timeout = timeout <= 0 ? 1000 : timeout;
        timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000;
        try {
            //如果dataQueue里面没有数据,代码就会阻塞在这儿。
            dataQueue.wait(timeout);//notify
        } catch (InterruptedException e) {
            DFSClient.LOG.warn("Caught exception ", e);
        }
        doSleep = false;
        now = Time.monotonicNow();
    }
    ... ...
    ... ...
        //获取待发送的数据包
        one = dataQueue.getFirst(); // regular data packet
        ... ...
        // get new block from namenode.
        /**
         * 建立数据管道
         * 向NameNode申请Block
         */
        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("Allocating new block");
            }
            //步骤一:建立数据管道
            /**
             * nextBlockOutputStream 这个方法里面完成了两个事:
             * 1.向Namenode申请block
             * 2.建立数据管道
             */
            setPipeline(nextBlockOutputStream());
            //步骤二:启动了ResponseProcessor 用来监听我们一个packet发送是否成功
            initDataStreaming();
        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("Append to block " + block);
            }
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
        }

        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
        if (lastByteOffsetInBlock > blockSize) {
            throw new IOException("BlockSize " + blockSize + " is smaller than data size. "
                    + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src);
        }

        if (one.isLastPacketInBlock()) {
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) {
                while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {
                    try {
                        // wait for acks to arrive from datanodes
                        // dataQueue 中目前没有数据,进入等待状态
                        dataQueue.wait(1000);
                    } catch (InterruptedException e) {
                        DFSClient.LOG.warn("Caught exception ", e);
                    }
                }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
                continue;
            }
            stage = BlockConstructionStage.PIPELINE_CLOSE;
        }

        // send the packet
        Span span = null;
        synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
                span = scope.detach();
                one.setTraceSpan(span);
                //步骤三:从dataQueue把要发送的这个packet移除出去
                dataQueue.removeFirst();
                //步骤四:然后往ackQueue里面添加这个packet
                ackQueue.addLast(one);
                dataQueue.notifyAll();
            }
        }
        ... ...
        //这个就是我们写数据代码
        one.writeTo(blockStream);
        blockStream.flush();
        ... ...
}

以上代码中 dataQueue是一个Linkedlist对象,该对象会一直处于while循环中等待客户端上传文件的packet,当有数据放入该LinkedList后,会从该对象中获取一个个的packet写出到DN中。

4.向dataQueue队列中写入packet

向HDFS写入数据是通过执行自己编写代码out.write("hello chaos".getBytes())实现的。out对象为DFSOutputStream对象,所以write方法优先找该对象中的write方法,但是发现DFSOutputStream对象中没有write方法,所以找到DFSOutputStream对象的父类FSOutputSummer.write方法,因此最终执行到FSOutputSummer.write方法实现,其源码如下:

@Override
public synchronized void write(int b) throws IOException {
  buf[count++] = (byte)b;
  if(count == buf.length) {
    //刷新缓冲区,写出数据
    flushBuffer();
  }
}

以上代码中flushBuffer()实现源码如下:

protected synchronized void flushBuffer() throws IOException {
  //向packet中写入数据
  flushBuffer(false, true);
}

flushBuffer方法实现源码如下:

protected synchronized int flushBuffer(boolean keep,
    boolean flushPartial) throws IOException {
    ... ...
    // 调用writeChecksumChunks方法将缓冲区的数据写入到输出流,并进行校验和
    writeChecksumChunks(buf, 0, lenToFlush);
    ... ...
}

以上writeChecksumChunks()方法主要就是对写入buffer数据进行校验和生成并与数据一并写入packet。

writeChecksumChunks实现源码如下:

// 为给定的数据块生成校验和,并将输出块和校验和写入底层输出流
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
    ... ...
    //根据数据块的大小,计算数据块的校验和
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    .. ...
    //将当前数据块和对应的校验块写入到底层输出流中
    writeChunk(b, off + i, chunkLen, checksum, ckOffset,
        getChecksumSize());
    ... ...
}

以上代码中writeChunk()方法最终会调用到DFSOutputStream.writeChunk()实现,其源码如下:

protected synchronized void writeChunk(byte[] b, int offset, int len,
    byte[] checksum, int ckoff, int cklen) throws IOException {
    ... ...
    // 将校验和写入当前数据包
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    // 将数据块写入当前数据包
    currentPacket.writeData(b, offset, len);
    ... ...
    // 如果数据包已满,则将其排队等待传输
    enqueueCurrentPacketFull();
    ... ...
}

以上代码中,随着数据写入到packet中数据量达到默认64K时,会将packet写入到对应的dataQueue中。

enqueueCurrentPacketFull()方法实现源码如下:

synchronized void enqueueCurrentPacketFull() throws IOException {
  LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
          + " appendChunk={}, {}", currentPacket, src, getStreamer()
          .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
      getStreamer());
  //当前数据包排队等待传输
  enqueueCurrentPacket();
  adjustChunkBoundary();
  endBlock();
}

以上enqueueCurrentPacket()方法实现原理如下:

void enqueueCurrentPacket() throws IOException {
    //当前数据包排队等待传输
    getStreamer().waitAndQueuePacket(currentPacket);
    currentPacket = null;
}

waitAndQueuePacket()方法实现如下:

void waitAndQueuePacket(DFSPacket packet) throws IOException {
  synchronized (dataQueue) {
    ... ...
    //将当前packet 放入 dataQueue 中
    queuePacket(packet);
    ... ...
}

queuePacket()实现代码如下:

void queuePacket(DFSPacket packet) {
  synchronized (dataQueue) {
    if (packet == null) return;
    packet.addTraceParent(Tracer.getCurrentSpan());
    //将packet 加入到dataQueue LinkedList 中
    dataQueue.addLast(packet);
    lastQueuedSeqno = packet.getSeqno();
    LOG.debug("Queued {}, {}", packet, this);
    //notifyAll()方法通知所有正在等待dataQueue对象锁的线程,告诉它们数据队列已经有数据包放入,可以继续执行
    dataQueue.notifyAll();
  }
}

以上代码中dataQueue.addLast(packet)就是将packet 加入到dataQueue LinkedList 中,当执行到dataQueue.notifyAll()时,会通知所有正在等待dataQueue对象锁的线程,告诉它们数据队列已经有数据包放入,可以继续执行。

5.副本放置策略源码

下面我们回到DataStreamer.run方法源码,该部分代码3.x和2.x有很多区别,上面已经贴过2.x实现,下面我们看看3.x版本是如何实现的:

public void run() {
    ... ...
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      //等待packet 放入到  dataQueue,packet当客户端写入数据时才会放入到dataQueue
      while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
    ... ...
    //dataQueue 中目前没有数据,进入等待状态
    dataQueue.wait(timeout);
    ... ...
    }
    ... ...
    //获取待发送的数据包
    one = dataQueue.getFirst(); // regular data packet
    ... ...
    //构建写数据管道,通过管道连接到第一个DataNode,该DN将数据发送到管道的第二个DN,以此类推
    //nextBlockOutputStream 方法中连接NameNode申请写入数据的DataNode节点及副本分布策略,并设置客户端与第一个Block块所在的节点的socket连接
    setPipeline(nextBlockOutputStream());
    ... ...
    //将packet 以流的方式写入到DataNode节点
    sendPacket(one);
    ... ...
    //等待所有ack
    waitForAllAcks();
    ... ...
}

以上代码大体逻辑为:当dataQueue中有packet后,会执行one = dataQueue.getFirst()获取packet包并通过sendPacket(one)将packet数据写出到DataNode节点。

客户端向HDFS DataNode写入数据时,默认有3个副本,并且各个DataNode节点之间写出数据都是以pipeline方式依次传递到各个DataNode节点,所以在执行sendPacket(one)写出数据前,会执行setPipeline(nextBlockOutputStream())方法构建写数据管道,通过管道连接到第一个DataNode,将packet数据写入该节点,然后由第二个DataNode依次再将packet传递到第三个DataNode节点,副本多的依次类推。其中nextBlockOutputStream()方法中会连接NameNode申请写入数据的DataNode节点及副本分布策略,并设置客户端与第一个Block块所在的节点的socket连接,方便后续将数据写入到对应的DataNode节点。

nextBlockOutputStream()方法源码如下:

protected LocatedBlock nextBlockOutputStream() throws IOException {
    ... ...
    //locateFollowingBlock方法中向NameNode申请副本写入的DN节点信息并设置副本策略
    lb = locateFollowingBlock(
        excluded.length ]]> 0 ? excluded : null, oldBlock);
    block.setCurrentBlock(lb.getBlock());
    ... ...
    //获取Block块所在的所有节点信息
    nodes = lb.getLocations();
    ... ...
    //连接到节点列表中的第一个 DataNode 节点并建立客户端与DataNode节点的socket连接,方便后续将数据写入到DataNode
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
        0L, false);
    ... ...
}

以上代码中locateFollowingBlock( excluded.length > 0 ? excluded : null, oldBlock)代码中会向NameNode申请副本写入DN节点的信息并设置副本分布策略。

locateFollowingBlock()源码如下:

private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
    ExtendedBlock oldBlock) throws IOException {
  //向NameNode 添加block 块信息
  return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
      stat.getFileId(), favoredNodes, addBlockFlags);
}

以上代码中addBlock方法中会向NameNode申请block分布策略及写入DN节点信息。DFSOutputStream.addBlock()实现源码如下:

static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
    DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
    String[] favoredNodes, EnumSet allocFlags)
    throws IOException {
    ... ...
    //向NameNode申请block分布策略及写入DN节点信息
    return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
        excludedNodes, fileId, favoredNodes, allocFlags);
    ... ...
}

以上代码中dfsClient.namenode获取到NameNode Rpc Proxy,所以addBlock方法最终会调用到NameNodeRpcServer.addBlock()方法。NameNodeRpcServer.addBlock()源码如下:

//客户端写入数据向NameNode 申请block位置
@Override
public LocatedBlock addBlock(String src, String clientName,
    ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
    String[] favoredNodes, EnumSet addBlockFlags)
    throws IOException {
  //检查NameNode是否启动
  checkNNStartup();
  //getAdditionalBlock方法设置副本存储节点策略,返回的 LocatedBlock 对象中包含 block写入数据的DN节点
  LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
      clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
  if (locatedBlock != null) {
    metrics.incrAddBlockOps();
  }
  return locatedBlock;
}

以上代码namesystem.getAdditionalBlock()源码如下:

LocatedBlock getAdditionalBlock(
    String src, long fileId, String clientName, ExtendedBlock previous,
    DatanodeInfo[] excludedNodes, String[] favoredNodes,
    EnumSet flags) throws IOException {
    ... ...
    //为新数据块选择DataNode 节点,有几个副本选择几个节点
    DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
        blockManager, src, excludedNodes, favoredNodes, flags, r);
    ... ...
}

以上代码中chooseTargetForNewBlock()会为block找到存储DN节点,源码如下:

static DatanodeStorageInfo[] chooseTargetForNewBlock(
    BlockManager bm, String src, DatanodeInfo[] excludedNodes,
    String[] favoredNodes, EnumSet flags,
    ValidateAddBlockResult r) throws IOException {
    ... ...
    // 为新数据块选择目标数据节点
    return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
                                  excludedNodesSet, r.blockSize,
                                  favoredNodesList, r.storagePolicyID,
                                  r.blockType, r.ecPolicy, flags);
}

chooseTarget4NewBlock()中会为block选择目标数据节点:

public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
    final int numOfReplicas, final Node client,
    final Set excludedNodes,
    final long blocksize,
    final List favoredNodes,
    final byte storagePolicyID,
    final BlockType blockType,
    final ErasureCodingPolicy ecPolicy,
    final EnumSet flags) throws IOException {
    ... ...
    //存放数据副本的节点数组
    final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
        numOfReplicas, client, excludedNodes, blocksize, 
        favoredDatanodeDescriptors, storagePolicy, flags);
    ... ...
    //返回数据存放节点数组
    return targets;
}

以上代码blockplacement.chooseTarget()方法经过一层层对象封装,最终调用到BlockPlacementPolicyDefault.chooseTarget方法,该方法实现源码如下:

private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
                                  Node writer,
                                  List chosenStorage,
                                  boolean returnChosenNodes,
                                  Set excludedNodes,
                                  long blocksize,
                                  final BlockStoragePolicy storagePolicy,
                                  EnumSet addBlockFlags,
                                  EnumMap sTypes) {
    ... ...
    // 获取每个机架上的最大节点数
    int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
    ... ...
    List results = null;
    ... ...
    //这里的results 与  chosenStorage 完全相同,但是目前没有数据
    results = new ArrayList<]]>(chosenStorage);
    //设置副本分布并返回第一个副本要写入的DN节点
    localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
        blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
        sTypes);
    ... ...
    return getPipeline(
        (writer != null && writer instanceof DatanodeDescriptor) ? writer
            : localNode,
        results.toArray(new DatanodeStorageInfo[results.size()]));
}

进入以上代码中chooseTarget方法,源码如下:

private Node chooseTarget(final int numOfReplicas,
                          Node writer,
                          final Set excludedNodes,
                          final long blocksize,
                          final int maxNodesPerRack,
                          final List results,
                          final boolean avoidStaleNodes,
                          final BlockStoragePolicy storagePolicy,
                          final EnumSet unavailableStorages,
                          final boolean newBlock,
                          EnumMap storageTypes) {
    ... ...
    //准备多副本写入的DN节点分布,返回的writer为第一个副本要写入的DN节点
    writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
        maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
    ... ...
    return writer;
}

以上代码中chooseTargetInOrder中实现副本分布并返回第一个副本要写入的DN节点。chooseTargetInOrder源码如下:

protected Node chooseTargetInOrder(int numOfReplicas, 
                               Node writer,
                               final Set excludedNodes,
                               final long blocksize,
                               final int maxNodesPerRack,
                               final List results,
                               final boolean avoidStaleNodes,
                               final boolean newBlock,
                               EnumMap storageTypes)
                               throws NotEnoughReplicasException {
  // 计算结果列表的大小,默认初始 results 为0,result集合表示副本所在的节点
  final int numOfResults = results.size();
  // 如果结果列表为空
  if (numOfResults == 0) {
    // 选择本地节点作为第一个副本存储位置,并向result中加入节点
    DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storageTypes, true);

    //writer第一个副本要写出的DataNode节点
    writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                                   : null;

    //减去一个副本后,如果为0则返回,writer,否则不返回,继续
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  //第一个副本所在DN节点
  final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();

  if (numOfResults <= 1) {
    //选择远程机架存放第二个副本
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    if (--numOfReplicas == 0) {
      //writer第一个副本要写出的DataNode节点
      return writer;
    }
  }

  if (numOfResults <= 2) {
    //第二个副本所在DN节点
    final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
    if (clusterMap.isOnSameRack(dn0, dn1)) {//如果dn0与dn1是同一机架,第三个副本选择不同机架
      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else if (newBlock){//如果是新块,选择与dn1 第二个副本所在节点相同的机架上放第三个副本
      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else {//随机选择一台节点存储第3个副本
      chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    }
    if (--numOfReplicas == 0) {
      //writer第一个副本要写出的DataNode节点
      return writer;
    }
  }
  //大于3个副本,随机选择节点存放副本
  chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes);
  //writer第一个副本要写出的DataNode节点
  return writer;
}

chooseTargetInOrder方法代码逻辑为block 副本找到存储节点的策略,然后返回block所在的第一个节点,首先第一个block存储在本机,第二个block存储在远程机架,第三个副本存储时先判断是否第一个副本和第二个副本是否在同一机架,如果在同一机架,那么第三个副本选择不同机架进行存储,否则选择与第二个副本相同机架的随机节点进行存储。最终该方法返回存储第一个副本的DataNode节点。

6.客户端与DataNode建立socket通信

在DataNode启动源码部分,DataNode.initDataXceiver()方法进行初始化DataXceiver服务,该服务是 DataNode 接收客户端请求的核心组件,其核心实现源码如下:

private void initDataXceiver() throws IOException {
    ... ...
    //TcpPeerServer 对象用于接收来自客户端的传输流量
    TcpPeerServer tcpPeerServer;
    ... ...
    //DataXceiverServer 是一个线程
    xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
    //创建DataXceiverServer的后台线程,创建好DataNode后会启动
    this.dataXceiverServer = new Daemon(threadGroup, xserver);
    ... ...
}

DataNode.crateDataNode()方法中,当DataNode对象创建完成后,当执行dn.runDatanodeDaemon()时会运行DataXceiverServer对象的run方法。

DataXceiverServer.run方法实现源码如下:

public void run() {
    ... ...
    // 接受客户端的连接请求
    peer = peerServer.accept();
    ... ...
    //创建线程并传入peer参数,然后并启动,会调用到DataXceiver.run 方法
    new Daemon(datanode.threadGroup,
        DataXceiver.create(peer, datanode, this))
        .start();
    ... ...
}

以上代码中我们可以看到peerServer.accept()一直接受来自客户端传输数据socket通信,并且new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start()代码中创建了DataXceiver线程并启动,该线程主要从DataXceiverServer中读取socket传入数据并将数据写入到DataNode节点磁盘。

下面继续回到DataStreamer.nextBlockOutputStream()源码中,查看客户端与DataNode节点建立的连接。

DataStreamer.nextBlockOutputStream()方法源码如下:

protected LocatedBlock nextBlockOutputStream() throws IOException {
    ... ...
    //locateFollowingBlock方法中向NameNode申请副本写入的DN节点信息并设置副本策略
    lb = locateFollowingBlock(
        excluded.length ]]> 0 ? excluded : null, oldBlock);
    block.setCurrentBlock(lb.getBlock());
    ... ...
    //获取Block块所在的所有节点信息
    nodes = lb.getLocations();
    ... ...
    //连接到节点列表中的第一个 DataNode 节点并建立客户端与DataNode节点的socket连接,方便后续将数据写入到DataNode
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
        0L, false);
    ... ...
}

前面执行完locateFollowingBlock()方法,获取到了数据应该写往的DataNode节点后,后续会执行createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,0L, false)方法与第一个写出的DataNode节点建立连接。

createBlockOutputStream()实现部分源码如下:

boolean createBlockOutputStream(DatanodeInfo[] nodes,
    StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
    long newGS, boolean recoveryFlag) {
    ... ...
    // 创建客户端用于数据传输管道的Socket,这里传入的nodes[0]就是第一个DataNode节点
    s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
    ... ...
    //当输出流有数据时,通过socket将数据写出到DataNode中
    ... ... 
}

以上createSocketForPipeline(nodes[0], nodes.length, dfsClient)代码就是获取第一个写出数据的block所在的DataNode节点,并建立socket连接。

createSocketForPipeline源码如下:

static Socket createSocketForPipeline(final DatanodeInfo first,
    final int length, final DFSClient client) throws IOException {
    ... ...
    //获取第一个 DataNode节点 socket地址
    final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
    ... ...
    //客户端连接上 DataNode,DataNode 启动着DataXceiverServer 服务,该服务启动后一直会接收客户端scoket 通信
    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(),
        conf.getSocketTimeout());
    ... ...
     return sock;
}

7.向Datanode中上传数据

回到 DataStreamr.createBlockOutputStream()方法中,核心源码如下:

boolean createBlockOutputStream(DatanodeInfo[] nodes,
    StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
    long newGS, boolean recoveryFlag) {
    ... ...
    // 创建客户端用于数据传输管道的Socket,这里传入的nodes[0]就是第一个DataNode节点
    s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
    ... ...
    // 获取未缓冲的输出流和输入流
    OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
    InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
    ... ...
    //包装 输出流 unbufOut 到 out 对象中
    out = new DataOutputStream(new BufferedOutputStream(unbufOut,
        DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
    ... ...
    //DataNode 启动着DataXceiverServer 服务,该服务启动后一直会接收客户端scoket 通信
    new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
        dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
        nodes.length, block.getNumBytes(), bytesSent, newGS,
        checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
        (targetPinnings != null && targetPinnings[0]), targetPinnings,
        nodeStorageIDs[0], nodeStorageIDs);
    ... ...
}

当写出数据的输出流out中有数据时,会通过new Sender(out).writeBlock()方法将数据发送到DataNode节点,writeBlock()实现具体源码如下:

public void writeBlock(final ExtendedBlock blk,
    final StorageType storageType,
    final Token blockToken,
    final String clientName,
    final DatanodeInfo[] targets,
    final StorageType[] targetStorageTypes,
    final DatanodeInfo source,
    final BlockConstructionStage stage,
    final int pipelineSize,
    final long minBytesRcvd,
    final long maxBytesRcvd,
    final long latestGenerationStamp,
    DataChecksum requestedChecksum,
    final CachingStrategy cachingStrategy,
    final boolean allowLazyPersist,
    final boolean pinning,
    final boolean[] targetPinnings,
    final String storageId,
    final String[] targetStorageIds) throws IOException {
    ... ...
    //包装socket 流和 操作类型 “WRITE_BLOCK” ,通过socket 发送到DataNode 节点
    send(out, Op.WRITE_BLOCK, proto.build());
}

以上send方法会将数据发送到DataNode 中,DataNode启动的DataXceiverServer 服务会接收客户端socket通信。

再次回到DataXceiverServer.run()方法源码中:

public void run() {
    ... ...
    // 接受客户端的连接请求
    peer = peerServer.accept();
    ... ...
    //创建线程并传入peer参数,然后并启动,会调用到DataXceiver.run 方法
    new Daemon(datanode.threadGroup,
        DataXceiver.create(peer, datanode, this))
        .start();
    ... ...
}

new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start()代码中会将接受到客户端的连接包装到DataXceiver线程对象中并启动,在DataXceiver.run方法中会对从客户端接收到的数据进行写出到DataNode磁盘处理。

DataXceiver.run方法源码如下:

public void run() {
    ... ...
    // 初始化操作对象
    Op op = null;
    ... ...
    // 初始化输入流
    InputStream input = socketIn;
    ... ...
    //读取客户端传入的数据给输入流赋值
    input = new BufferedInputStream(saslStreams.in,
        smallBufferSize);
    ... ...
    // 初始化DataXceiver的输入流 ,就是将 input 流赋值给了Receiver 中的 in 属性,后续使用
    super.initialize(new DataInputStream(input));
    ... ...
    //读取输入数据
    op = readOp();
    ... ...
    //处理读取过来的数据流
    processOp(op);
    ... ...
}

以上代码会将从客户端中接收过来的数据包装成数据输入流,最终执行processOp(op)写出到DataNode节点磁盘上。

processOp(op)实现源码如下,op默认从客户端传入类型值为 WRITE_BLOCK:

protected final void processOp(Op op) throws IOException {
    ... ...
    //从客户端获取过来的操作属性为 “WRITE_BLOCK”
    case WRITE_BLOCK:
      //向DataNode中写入Block块操作
      opWriteBlock(in);
      break;
    ... ...
}

opWriteBlock(in)实现代码如下:

private void opWriteBlock(DataInputStream in) throws IOException {
    ... ...
    // 调用writeBlock方法处理写入块操作
    writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelperClient.convertStorageType(proto.getStorageType()),
        PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        targets,
        PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
        PBHelperClient.convert(proto.getSource()),
        fromProto(proto.getStage()),
        proto.getPipelineSize(),
        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
        proto.getLatestGenerationStamp(),
        fromProto(proto.getRequestedChecksum()),
        (proto.hasCachingStrategy() ?
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()),
        (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
        (proto.hasPinning() ? proto.getPinning(): false),
        (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
        proto.getStorageId(),
        proto.getTargetStorageIdsList().toArray(new String[0]));
    ... ...
}

以上 writeBlock最终调用到DataXceiver.writeBlock()方法,其源码实现如下:

public void writeBlock(...){
    ... ...
    // 创建blockReceiver 并赋值给 DataXceiver.blockReceiver,后续使用到该对象写出数据到磁盘
    setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
        peer.getRemoteAddressString(),
        peer.getLocalAddressString(),
        stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
        clientname, srcDataNode, datanode, requestedChecksum,
        cachingStrategy, allowLazyPersist, pinning, storageId));
    ... ...
    //发送数据到下游DN节点,对于下游DataNode节点,仍然要走一遍当前节点的流程,形成DataNode 依次向后写出数据
    new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
        blockToken, clientname, targets, targetStorageTypes,
        srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
        latestGenerationStamp, requestedChecksum, cachingStrategy,
        allowLazyPersist, targetPinnings[0], targetPinnings,
        targetStorageId, targetStorageIds);
    ... ...
    //receiveBlock 会接收packets 将数据写出到磁盘
    blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
        dataXceiverServer.getWriteThrottler(), targets, false);
    ... ...
}

以上代码中new Sender(mirrorOut).writeBlock()这部分代码是将写入到该DataNode节点的packet数据继续写往下个DataNode节点,如果block有多个副本,都是在下一个DataNode节点向后续DN节点发送写出数据。

最终执行blockReceiver.receiveBlock()代码将数据写出到磁盘中,receiverBlock()实现关键源码如下:

void receiveBlock(
    DataOutputStream mirrOut, // output to next datanode
    DataInputStream mirrIn,   // input from next datanode
    DataOutputStream replyOut,  // output to previous datanode
    String mirrAddr, DataTransferThrottler throttlerArg,
    DatanodeInfo[] downstreams,
    boolean isReplaceBlock) throws IOException {
    ... ...
    //receivePacket负责接收上游的packet
    while (receivePacket() ]]>= 0) { /* Receive until the last packet */ }
    ... ...
}

以上代码中receivePacket()会一直接受从客户端发送过来的packet并写入到DataNode节点磁盘,直到客户端数据传输完毕。

reveiverPacket()关键源码实现如下:

private int receivePacket() throws IOException {
    ... ...
    //将数据写出到DataNode节点磁盘
    streams.writeDataToDisk(dataBuf.array(),
        startByteToDisk, numBytesToDisk);
    ... ...
}

总结

我们一步步把数据上传源码梳理了一边,可以看到,HDFS确实帮我们屏蔽了很多底层的复杂实现逻辑。

今天的内容很多,有很多细节我没有去进一步拓展了,感兴趣的小伙伴可以跟着我的思路再深入拓展看看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2279052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

蓝桥杯备考:堆和priority queue(优先级队列)

堆的定义 heap堆是一种特殊的完全二叉树&#xff0c;对于树中的每个结点&#xff0c;如果该结点的权值大于等于孩子结点的权值&#xff0c;就称它为大根堆&#xff0c;小于等于就叫小根堆&#xff0c;如果是大根堆&#xff0c;每个子树也是符合大根堆的特征的&#xff0c;如果是…

力扣682

from typing import Listclass Solution:def calPoints(self, operations: List[str]) -> int:a [] # 用于存储有效得分的列表for op in operations:if op.isdigit() or (op[0] - and op[1:].isdigit()): # 如果是整数&#xff08;包括负数&#xff09;a.append(int(op)…

考研计算机组成原理——零基础学习的笔记

第一章 研究计算机硬件的学科。 1.计算机系统概述 计算机系统硬件软件&#xff08;系统软件&#xff1a;比如操作系统、数据库管理系统、标准程序库等&#xff0c;应用软件&#xff1a;QQ等&#xff09; 1.2计算机的层次结构 1.2.1计算机硬件的基本组成 冯诺伊曼计算机&a…

海康工业相机的应用部署不是简简单单!?

作者&#xff1a;SkyXZ CSDN&#xff1a;SkyXZ&#xff5e;-CSDN博客 博客园&#xff1a;SkyXZ - 博客园 笔者使用的设备及环境&#xff1a;WSL2-Ubuntu22.04MV-CS016-10UC 不会吧&#xff1f;不会吧&#xff1f;不会还有人拿到海康工业相机还是一脸懵叭&#xff1f;不会还有人…

计算机网络 (49)网络安全问题概述

前言 计算机网络安全问题是一个复杂且多维的领域&#xff0c;它涉及到网络系统的硬件、软件以及数据的安全保护&#xff0c;确保这些元素不因偶然的或恶意的原因而遭到破坏、更改或泄露。 一、计算机网络安全的定义 计算机网络安全是指利用网络管理控制和技术措施&#xff0c;保…

STM32 FreeRTOS中断管理

目录 FreeRTOS的中断管理 1、STM32中断优先级管理 2、FreeRTOS任务优先级管理 3、寄存器和内存映射寄存器 4、BASEPRI寄存器 5、FreeRTOS与STM32中断管理结合使用 vPortRaiseBASEPRI vPortSetBASEPRI 6、FromISR后缀 7、在中断服务函数中调用FreeRTOS的API函数需注意 F…

操作系统 期末重点复习

操作系统 期末重点复习 必会 课后题摘要 第二章&#xff1a; 在操作系统中为什么要引入进程概念&#xff1f;它会产生什么样的影响? 为了使程序在多道程序环境下能并发执行&#xff0c;并对并发执行的程序加以控制和描述&#xff0c;在操作系统中引入了进程概念。影响: 使程…

7.5.4 MVCC优化测试

作者&#xff1a; h5n1 原文来源&#xff1a; https://tidb.net/blog/4e02d900 1. 背景 由于MVCC 版本数量过多导致rocksdb扫描key数量过多影响SQL执行时间是tidb经常出现问的问题&#xff0c;tidb也一直在致力于优化该问题。 一些优化方式包括比&#xff1a; (1) 从传统…

2024年AI与大数据技术趋势洞察:跨领域创新与社会变革

目录 引言 技术洞察 1. 大模型技术的创新与开源推动 2. AI Agent 智能体平台技术 3. 多模态技术的兴起:跨领域应用的新风口 4. 强化学习与推荐系统:智能化决策的底层驱动 5. 开源工具与平台的快速发展:赋能技术创新 6. 技术安全与伦理:AI技术的双刃剑 7. 跨领域技…

vulnhub靶场【Lampiao靶机】,主要考察提权,脏牛提权

前言 靶机&#xff1a;lampiao&#xff0c;IP地址为192.168.10.11 攻击&#xff1a;kali&#xff0c;IP地址为192.168.10.2 都采用虚拟机&#xff0c;网卡为桥接模式 该靶机目前只剩下一个了&#xff0c;之前记得是有两台构成系列的。 文章中涉及的靶机&#xff0c;来源于v…

ASP .NET Core 学习(.NET9)配置接口访问路由

新创建的 ASP .NET Core Web API项目中Controller进行请求时&#xff0c;是在地址:端口/Controller名称进行访问的&#xff0c;这个时候Controller的默认路由配置如下 访问接口时&#xff0c;是通过请求方法&#xff08;GET、Post、Put、Delete&#xff09;进行接口区分的&…

构建core模块

文章目录 1.环境搭建1.sunrays-common下新建core模块2.引入依赖&#xff0c;并设置打包常规配置 2.测试使用1.启动&#xff01;1.创建模块2.引入依赖3.application.yml 配置MySQL和Minio4.创建启动类5.启动测试 2.common-web-starter1.目录2.WebController.java3.结果 3.common…

VRTK4 记录抓取错误

左手原本可以正常抓取&#xff0c;但是当右手拿起一个物体时&#xff0c;左手抓取右手的线性驱动器&#xff0c;只有部分区域可以抓取 原因是左手的判定物体的层级错误 应该在Collections下&#xff0c;之前错误的和Collections同一层级&#xff0c;导致抓取有时可以有时不可以…

游戏画质升级史的思考

画质代入感大众玩家对游戏的第一印象与评判标准 大众玩家还没到靠游戏性等内在因素来评判游戏的程度。 画面的重要性&#xff0c;任何时候都不能轻视。 行业就是靠摩尔定律来推动进步的。 NS2机能达到PS4到PS4PRO之间的水准&#xff0c;5050达到8G显存&#xff0c;都会引发连…

Windows11电脑总是一闪一闪的,黑一下亮一些怎么解决

Windows11电脑总是一闪一闪的&#xff0c;黑一下亮一些怎么解决 1. 打开设备管理器2. 点击显示适配器3. 更新下方两个选项的驱动3.1 更新驱动Inter(R) UHD Graphixs3.2 更新驱动NVIDIA GeForce RTX 4060 Laptop GPU 4. 其他文章快来试试吧&#x1f970; 1. 打开设备管理器 在电…

【RAG落地利器】向量数据库Qdrant使用教程

TrustRAG项目地址&#x1f31f;&#xff1a;https://github.com/gomate-community/TrustRAG 可配置的模块化RAG框架 环境依赖 本教程基于docker安装Qdrant数据库&#xff0c;在此之前请先安装docker. Docker - The easiest way to use Qdrant is to run a pre-built Docker i…

【逆境中绽放:万字回顾2024我在挑战中突破自我】

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 一、引言二、个人成长与盘点情感与心理成长学习与技能提升其它荣誉 三、年度创作历程回顾创作内容概…

HTTP / 2

序言 在之前的文章中我们介绍过了 HTTP/1.1 协议&#xff0c;现在再来认识一下迭代版本 2。了解比起 1.1 版本&#xff0c;后面的版本改进在哪里&#xff0c;特点在哪里&#xff1f;话不多说&#xff0c;开始吧⭐️&#xff01; 一、 HTTP / 1.1 存在的问题 很多时候新的版本的…

于灵动的变量变幻间:函数与计算逻辑的浪漫交织(下)

大家好啊&#xff0c;我是小象٩(๑ω๑)۶ 我的博客&#xff1a;Xiao Xiangζั͡ޓއއ 很高兴见到大家&#xff0c;希望能够和大家一起交流学习&#xff0c;共同进步。 这一节我们主要来学习单个函数的声明与定义&#xff0c;static和extern… 这里写目录标题 一、单个函数…

pthread_create函数

函数原型 pthread_create 是 POSIX 线程&#xff08;pthread&#xff09;库中的一个函数&#xff0c;用于在程序中创建一个新线程。 #include <pthread.h>int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine) (void *), void *a…