Hadoop3.x源码解析

news2025/1/11 9:01:21

文章目录

  • 一、RPC通信原理解析
    • 1、概要
    • 2、代码demo
  • 二、NameNode启动源码解析
    • 1、概述
    • 2、启动9870端口服务
    • 3、加载镜像文件和编辑日志
    • 4、初始化NN的RPC服务端
    • 5、NN启动资源检查
    • 6、NN对心跳超时判断
    • 7、安全模式
  • 三、DataNode启动源码解析
    • 1、概述
    • 2、初始化DataXceiverServer
    • 3、初始化HTTP服务
    • 4、初始化DN的RPC服务端
    • 5、DN向NN注册
    • 6、向NN发送心跳
  • 四、HDFS上传源码解析
    • 1、概述
    • 2、create创建过程
      • 2.1 DN向NN发起创建请求
      • 2.2 NN处理DN的创建请求
      • 2.3 DataStreamer启动流程
    • 3、write上传过程
      • 3.1 向DataStreamer的队列里面写数据
      • 3.2 建立管道之机架感知(块存储位置)
      • 3.3 建立管道之Socket发送
      • 3.4 建立管道之Socket接收
      • 3.5 客户端接收DN写数据应答Response
  • 五、Yarn源码解析
    • 1、概述
    • 2、Yarn客户端向RM提交作业
    • 3、RM启动MRAppMaster
    • 4、调度器任务执行(YarnChild)
  • 六、MapReduce源码解析
    • 1、Job提交流程源码和切片源码详解
    • 2、MapTask & ReduceTask 源码解析
  • 七、Hadoop源码编译
    • 1、环境准备
    • 2、工具包安装
    • 3、编译源码

一、RPC通信原理解析

1、概要

模拟RPC的客户端、服务端、通信协议三者如何工作的

2、代码demo

在HDFSClient项目基础上创建包名com.atguigu.rpc,创建RPC协议

public interface RPCProtocol {

    long versionID = 666;

    void mkdirs(String path);
}

创建RPC服务端

public class NNServer implements RPCProtocol{

    @Override
    public void mkdirs(String path) {
        System.out.println("服务端,创建路径" + path);
    }

    public static void main(String[] args) throws IOException {

        Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(8888)
                .setProtocol(RPCProtocol.class)
                .setInstance(new NNServer())
                .build();

        System.out.println("服务器开始工作");

        server.start();
    }
}

创建RPC客户端

public class HDFSClient {

    public static void main(String[] args) throws IOException {
        RPCProtocol client = RPC.getProxy(
                RPCProtocol.class,
                RPCProtocol.versionID,
                new InetSocketAddress("localhost", 8888),
                new Configuration());

        System.out.println("我是客户端");

        client.mkdirs("/input");
    }
}

测试,启动服务端,观察控制台打印:服务器开始工作,在控制台Terminal窗口输入,jps,查看到NNServer服务

启动客户端,观察客户端控制台打印:我是客户端,观察服务端控制台打印:服务端,创建路径/input

二、NameNode启动源码解析

1、概述

然后首先需要环境准备,导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
  </dependency>

  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.1.3</version>
  </dependency>

  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
    <version>3.1.3</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

ctrl+h或者双击shift全局查找namenode,进入NameNode.java,然后ctrl + f,查找main方法,点击createNameNode,点击最后default返回的NameNode,点击initialize初始化,核心方法就在里面

protected void initialize(Configuration conf) throws IOException {
  ... ...

  if (NamenodeRole.NAMENODE == role) {
  // 启动HTTP服务端(9870)
    startHttpServer(conf);
  }

  // 加载镜像文件和编辑日志到内存
  loadNamesystem(conf);
  startAliasMapServerIfNecessary(conf);

  // 创建NN的RPC服务端
  rpcServer = createRpcServer(conf);

  initReconfigurableBackoffKey();

  if (clientNamenodeAddress == null) {
    // This is expected for MiniDFSCluster. Set it now using 
    // the RPC server's bind address.
    clientNamenodeAddress = 
        NetUtils.getHostPortString(getNameNodeAddress());
    LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
        + " this namenode/service.");
  }
  if (NamenodeRole.NAMENODE == role) {
    httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }

  // NN启动资源检查
  startCommonServices(conf);
  startMetricsLogger(conf);
}

2、启动9870端口服务

点击startHttpServer

private void startHttpServer(final Configuration conf) throws IOException {
  httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
  httpServer.start();
  httpServer.setStartupProgress(startupProgress);
}

protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
  InetSocketAddress bindAddress = getHttpServerAddress(conf);

  ... ...
  return bindAddress;
}

protected InetSocketAddress getHttpServerAddress(Configuration conf) {
  return getHttpAddress(conf);
}

public static InetSocketAddress getHttpAddress(Configuration conf) {
  return  NetUtils.createSocketAddr(
      conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}

public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;

public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT =
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;

int  DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;

点击startHttpServer方法中的httpServer.start();

void start() throws IOException {
  ... ...
  // Hadoop自己封装了HttpServer,形成自己的HttpServer2
  HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
      httpAddr, httpsAddr, "hdfs",
      DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
  ... ...

  httpServer = builder.build();

  ... ...

  httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
  httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
  setupServlets(httpServer, conf);
  httpServer.start();

  ... ...
}

点击setupServlets,这里就是一些控制台的各个功能页跳转

private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
  httpServer.addInternalServlet("startupProgress",
    StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
  httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
    true);
  httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
      ImageServlet.class, true);
}

3、加载镜像文件和编辑日志

点击loadNamesystem

protected void loadNamesystem(Configuration conf) throws IOException {
  this.namesystem = FSNamesystem.loadFromDisk(conf);
}

static FSNamesystem loadFromDisk(Configuration conf) throws IOException {

  checkConfiguration(conf);

  FSImage fsImage = new FSImage(conf,
      FSNamesystem.getNamespaceDirs(conf),
      FSNamesystem.getNamespaceEditsDirs(conf));

  FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
  StartupOption startOpt = NameNode.getStartupOption(conf);
  if (startOpt == StartupOption.RECOVER) {
    namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
  }

  long loadStart = monotonicNow();
  try {
    namesystem.loadFSImage(startOpt);
  } catch (IOException ioe) {
    LOG.warn("Encountered exception loading fsimage", ioe);
    fsImage.close();
    throw ioe;
  }
  long timeTakenToLoadFSImage = monotonicNow() - loadStart;
  LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
  NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
  if (nnMetrics != null) {
    nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
  }
  namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
  return namesystem;
}

4、初始化NN的RPC服务端

点击createRpcServer,如第一章的服务端RPC开启,为客户端提供服务支持,客户端可以通过rpc协议发送指令

protected NameNodeRpcServer createRpcServer(Configuration conf)
    throws IOException {
  return new NameNodeRpcServer(conf, this);
}

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
  ... ....  
    serviceRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(serviceRpcAddr.getPort())
        .setNumHandlers(serviceHandlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();
  ... ....  
}

5、NN启动资源检查

点击startCommonServices

private void startCommonServices(Configuration conf) throws IOException {

  namesystem.startCommonServices(conf, haContext);

  registerNNSMXBean();
  if (NamenodeRole.NAMENODE != role) {
    startHttpServer(conf);
    httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
  rpcServer.start();
  try {
    plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
        ServicePlugin.class);
  } catch (RuntimeException e) {
    String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
    LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
        pluginsValue, e);
    throw e;
  }
  ......
}

点击startCommonServices

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBean for the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
    checkAvailableResources();

    assert !blockManager.isPopulatingReplQueues();
    StartupProgress prog = NameNode.getStartupProgress();
    prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();

    // 安全模式
    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);

    // 启动块服务
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }
  
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
    dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress != null) ?
      serviceAddress.getHostName() : "";
}

点击NameNodeResourceChecker

public NameNodeResourceChecker(Configuration conf) throws IOException {
  this.conf = conf;
  volumes = new HashMap<String, CheckedVolume>();
  
  // dfs.namenode.resource.du.reserved默认值 1024 * 1024 * 100 =》100m
  duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
      DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
  
  Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
      .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
  
  Collection<URI> localEditDirs = Collections2.filter(
      FSNamesystem.getNamespaceEditsDirs(conf),
      new Predicate<URI>() {
        @Override
        public boolean apply(URI input) {
          if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
            return true;
          }
          return false;
        }
      });

  // 对所有路径进行资源检查
  for (URI editsDirToCheck : localEditDirs) {
    addDirToCheck(editsDirToCheck,
        FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
            editsDirToCheck));
  }

  // All extra checked volumes are marked "required"
  for (URI extraDirToCheck : extraCheckedVolumes) {
    addDirToCheck(extraDirToCheck, true);
  }
  
  minimumRedundantVolumes = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
}

点击checkAvailableResources

void checkAvailableResources() {
  long resourceCheckTime = monotonicNow();
  Preconditions.checkState(nnResourceChecker != null,
    "nnResourceChecker not initialized");

  // 判断资源是否足够,不够返回false
  hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();

  resourceCheckTime = monotonicNow() - resourceCheckTime;
  NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
}

public boolean hasAvailableDiskSpace() {
  return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
      minimumRedundantVolumes);
}

static boolean areResourcesAvailable(
    Collection<? extends CheckableNameNodeResource> resources,
    int minimumRedundantResources) {

  // TODO: workaround:
  // - during startup, if there are no edits dirs on disk, then there is
  // a call to areResourcesAvailable() with no dirs at all, which was
  // previously causing the NN to enter safemode
  if (resources.isEmpty()) {
    return true;
  }
  
  int requiredResourceCount = 0;
  int redundantResourceCount = 0;
  int disabledRedundantResourceCount = 0;

  // 判断资源是否充足
  for (CheckableNameNodeResource resource : resources) {
    if (!resource.isRequired()) {
      redundantResourceCount++;
      if (!resource.isResourceAvailable()) {
        disabledRedundantResourceCount++;
      }
    } else {
      requiredResourceCount++;
      if (!resource.isResourceAvailable()) {
        // Short circuit - a required resource is not available. 不充足返回false
        return false;
      }
    }
  }
  
  if (redundantResourceCount == 0) {
    // If there are no redundant resources, return true if there are any
    // required resources available.
    return requiredResourceCount > 0;
  } else {
    return redundantResourceCount - disabledRedundantResourceCount >=
        minimumRedundantResources;
  }
}

interface CheckableNameNodeResource {
  
  public boolean isResourceAvailable();
  
  public boolean isRequired();
}


if (!resource.isResourceAvailable()),ctrl+alt+B可以查看其实现类

public boolean isResourceAvailable() {

  // 获取当前目录的空间大小
  long availableSpace = df.getAvailable();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Space available on volume '" + volume + "' is "
        + availableSpace);
  }

  // 如果当前空间大小,小于100m,返回false
  if (availableSpace < duReserved) {
    LOG.warn("Space available on volume '" + volume + "' is "
        + availableSpace +
        ", which is below the configured reserved amount " + duReserved);
    return false;
  } else {
    return true;
  }
}

6、NN对心跳超时判断

Ctrl + n 搜索namenode,ctrl + f搜索startCommonServices,点击namesystem.startCommonServices(conf, haContext);点击blockManager.activate(conf, completeBlocksTotal);点击datanodeManager.activate(conf);

void activate(final Configuration conf) {
  datanodeAdminManager.activate(conf);
  heartbeatManager.activate();
}

void activate() {
  // 启动的线程,搜索run方法
  heartbeatThread.start();
}

public void run() {
  while(namesystem.isRunning()) {
    restartHeartbeatStopWatch();
    try {
      final long now = Time.monotonicNow();
      if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
    // 心跳检查
        heartbeatCheck();
        lastHeartbeatCheck = now;
      }
      if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
        synchronized(HeartbeatManager.this) {
          for(DatanodeDescriptor d : datanodes) {
            d.setNeedKeyUpdate(true);
          }
        }
        lastBlockKeyUpdate = now;
      }
    } catch (Exception e) {
      LOG.error("Exception while checking heartbeat", e);
    }
    try {
      Thread.sleep(5000);  // 5 seconds
    } catch (InterruptedException ignored) {
    }
    // avoid declaring nodes dead for another cycle if a GC pause lasts
    // longer than the node recheck interval
    if (shouldAbortHeartbeatCheck(-5000)) {
      LOG.warn("Skipping next heartbeat scan due to excessive pause");
      lastHeartbeatCheck = Time.monotonicNow();
    }
  }
}

void heartbeatCheck() {
  final DatanodeManager dm = blockManager.getDatanodeManager();

  boolean allAlive = false;
  while (!allAlive) {
    // locate the first dead node.
    DatanodeDescriptor dead = null;

    // locate the first failed storage that isn't on a dead node.
    DatanodeStorageInfo failedStorage = null;

    // check the number of stale nodes
    int numOfStaleNodes = 0;
    int numOfStaleStorages = 0;
    synchronized(this) {
      for (DatanodeDescriptor d : datanodes) {
        // check if an excessive GC pause has occurred
        if (shouldAbortHeartbeatCheck(0)) {
          return;
        }
    // 判断DN节点是否挂断
        if (dead == null && dm.isDatanodeDead(d)) {
          stats.incrExpiredHeartbeats();
          dead = d;
        }
        if (d.isStale(dm.getStaleInterval())) {
          numOfStaleNodes++;
        }
        DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
        for(DatanodeStorageInfo storageInfo : storageInfos) {
          if (storageInfo.areBlockContentsStale()) {
            numOfStaleStorages++;
          }

          if (failedStorage == null &&
              storageInfo.areBlocksOnFailedStorage() &&
              d != dead) {
            failedStorage = storageInfo;
          }
        }
      }
      
      // Set the number of stale nodes in the DatanodeManager
      dm.setNumStaleNodes(numOfStaleNodes);
      dm.setNumStaleStorages(numOfStaleStorages);
    }
    ... ...
  }
}

boolean isDatanodeDead(DatanodeDescriptor node) {
  return (node.getLastUpdateMonotonic() <
          (monotonicNow() - heartbeatExpireInterval));
}

private long heartbeatExpireInterval;
// 10分钟 + 30秒
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds;

private volatile int heartbeatRecheckInterval;
heartbeatRecheckInterval = conf.getInt(
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes

private volatile long heartbeatIntervalSeconds;
heartbeatIntervalSeconds = conf.getTimeDuration(
        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;

7、安全模式

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBean for the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
    checkAvailableResources();

    assert !blockManager.isPopulatingReplQueues();
    StartupProgress prog = NameNode.getStartupProgress();

    // 开始进入安全模式
    prog.beginPhase(Phase.SAFEMODE);

    // 获取所有可以正常使用的block
long completeBlocksTotal = getCompleteBlocksTotal();

    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);

    // 启动块服务
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }
  
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
    dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress != null) ?
      serviceAddress.getHostName() : "";
}

点击getCompleteBlocksTotal

public long getCompleteBlocksTotal() {
  // Calculate number of blocks under construction
  long numUCBlocks = 0;
  readLock();
  try {
    // 获取正在构建的block
    numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
  // 获取所有的块 - 正在构建的block = 可以正常使用的block
    return getBlocksTotal() - numUCBlocks;
  } finally {
    readUnlock("getCompleteBlocksTotal");
  }
}

点击activate

public void activate(Configuration conf, long blockTotal) {
  pendingReconstruction.start();
  datanodeManager.activate(conf);

  this.redundancyThread.setName("RedundancyMonitor");
  this.redundancyThread.start();

  storageInfoDefragmenterThread.setName("StorageInfoMonitor");
  storageInfoDefragmenterThread.start();
  this.blockReportThread.start();

  mxBeanName = MBeans.register("NameNode", "BlockStats", this);

  bmSafeMode.activate(blockTotal);
}

void activate(long total) {
  assert namesystem.hasWriteLock();
  assert status == BMSafeModeStatus.OFF;

  startTime = monotonicNow();

  // 计算是否满足块个数的阈值
  setBlockTotal(total);

  // 判断DataNode节点和块信息是否达到退出安全模式标准
  if (areThresholdsMet()) {
    boolean exitResult = leaveSafeMode(false);
    Preconditions.checkState(exitResult, "Failed to leave safe mode.");
  } else {
    // enter safe mode
status = BMSafeModeStatus.PENDING_THRESHOLD;

initializeReplQueuesIfNecessary();

    reportStatus("STATE* Safe mode ON.", true);
    lastStatusReport = monotonicNow();
  }
}

点击setBlockTotal

void setBlockTotal(long total) {
  assert namesystem.hasWriteLock();
  synchronized (this) {
    this.blockTotal = total;
  // 计算阈值:例如:1000个正常的块 * 0.999 = 999
    this.blockThreshold = (long) (total * threshold);
  }
  
  this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}

this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
        DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);

public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;

点击areThresholdsMet

private boolean areThresholdsMet() {
  assert namesystem.hasWriteLock();
  // Calculating the number of live datanodes is time-consuming
  // in large clusters. Skip it when datanodeThreshold is zero.
  int datanodeNum = 0;

  if (datanodeThreshold > 0) {
    datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes();
  }
  synchronized (this) {
  // 已经正常注册的块数 》= 块的最小阈值 》=最小可用DataNode
    return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;
  }
}

三、DataNode启动源码解析

1、概述

工作机制

启动源码流程

查找DataNode.class

public static void main(String args[]) {
  if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
    System.exit(0);
  }

  secureMain(args, null);
}

public static void secureMain(String args[], SecureResources resources) {
  int errorCode = 0;
  try {
    StringUtils.startupShutdownMessage(DataNode.class, args, LOG);

    DataNode datanode = createDataNode(args, null, resources);

    ......
  } catch (Throwable e) {
    LOG.error("Exception in secureMain", e);
    terminate(1, e);
  } finally {
    LOG.warn("Exiting Datanode");
    terminate(errorCode);
  }
}

public static DataNode createDataNode(String args[], Configuration conf,
    SecureResources resources) throws IOException {
  // 初始化DN
  DataNode dn = instantiateDataNode(args, conf, resources);

  if (dn != null) {
    // 启动DN进程
    dn.runDatanodeDaemon();
  }
  return dn;
}

public static DataNode instantiateDataNode(String args [], Configuration conf,
    SecureResources resources) throws IOException {
  ... ...
  
  return makeInstance(dataLocations, conf, resources);
}

static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  ... ...
  return new DataNode(conf, locations, storageLocationChecker, resources);
}

DataNode(final Configuration conf,
         final List<StorageLocation> dataDirs,
         final StorageLocationChecker storageLocationChecker,
         final SecureResources resources) throws IOException {
  super(conf);
  ... ...

  try {
    hostName = getHostName(conf);
    LOG.info("Configured hostname is {}", hostName);
  // 启动DN
    startDataNode(dataDirs, resources);
  } catch (IOException ie) {
    shutdown();
    throw ie;
  }
  ... ...
}

void startDataNode(List<StorageLocation> dataDirectories,
                   SecureResources resources
                   ) throws IOException {
  ... ...
  // 创建数据存储对象
  storage = new DataStorage();
  
  // global DN settings
  registerMXBean();
  // 初始化DataXceiver
  initDataXceiver();
  
  // 启动HttpServer
  startInfoServer();

  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(getConf());
  pauseMonitor.start();

  // BlockPoolTokenSecretManager is required to create ipc server.
  this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

  // Login is done by now. Set the DN user name.
  dnUserName = UserGroupInformation.getCurrentUser().getUserName();
  LOG.info("dnUserName = {}", dnUserName);
  LOG.info("supergroup = {}", supergroup);
  
  // 初始化RPC服务
  initIpcServer();

  metrics = DataNodeMetrics.create(getConf(), getDisplayName());
  peerMetrics = dnConf.peerStatsEnabled ?
      DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  ecWorker = new ErasureCodingWorker(getConf(), this);
  blockRecoveryWorker = new BlockRecoveryWorker(this);
  
  // 创建BlockPoolManager
  blockPoolManager = new BlockPoolManager(this);
  // 心跳管理
  blockPoolManager.refreshNamenodes(getConf());

  // Create the ReadaheadPool from the DataNode context so we can
  // exit without having to explicitly shutdown its thread pool.
  readaheadPool = ReadaheadPool.getInstance();
  saslClient = new SaslDataTransferClient(dnConf.getConf(),
      dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
  saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  startMetricsLogger();

  if (dnConf.diskStatsEnabled) {
    diskMetrics = new DataNodeDiskMetrics(this,
        dnConf.outliersReportIntervalMs);
  }
}

2、初始化DataXceiverServer

点击initDataXceiver

private void initDataXceiver() throws IOException {
// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务
  this.dataXceiverServer = new Daemon(threadGroup, xserver);
  this.threadGroup.setDaemon(true); // auto destroy when empty

  ... ...
}

3、初始化HTTP服务

点击startInfoServer();

private void startInfoServer()
  throws IOException {
  // SecureDataNodeStarter will bind the privileged port to the channel if
  // the DN is started by JSVC, pass it along.
  ServerSocketChannel httpServerChannel = secureResources != null ?
      secureResources.getHttpServerChannel() : null;

  httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
  httpServer.start();
  if (httpServer.getHttpAddress() != null) {
    infoPort = httpServer.getHttpAddress().getPort();
  }
  if (httpServer.getHttpsAddress() != null) {
    infoSecurePort = httpServer.getHttpsAddress().getPort();
  }
}

public DatanodeHttpServer(final Configuration conf,
    final DataNode datanode,
    final ServerSocketChannel externalHttpChannel)
  throws IOException {
  
  ... ...
  HttpServer2.Builder builder = new HttpServer2.Builder()
      .setName("datanode")
      .setConf(confForInfoServer)
      .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
      .hostName(getHostnameForSpnegoPrincipal(confForInfoServer))
      .addEndpoint(URI.create("http://localhost:" + proxyPort))
      .setFindPort(true);
  ... ...
}

4、初始化DN的RPC服务端

点击initIpcServer

private void initIpcServer() throws IOException {
  InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
      getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
  
  ... ...
  ipcServer = new RPC.Builder(getConf())
      .setProtocol(ClientDatanodeProtocolPB.class)
      .setInstance(service)
      .setBindAddress(ipcAddr.getHostName())
      .setPort(ipcAddr.getPort())
      .setNumHandlers(
          getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
              DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
      .setSecretManager(blockPoolTokenSecretManager).build();
  ... ...
}

5、DN向NN注册

点击refreshNamenodes

void refreshNamenodes(Configuration conf)
    throws IOException {
  ... ...

  synchronized (refreshNamenodesLock) {
    doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
  }
}

private void doRefreshNamenodes(
    Map<String, Map<String, InetSocketAddress>> addrMap,
    Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
    throws IOException {
  ......
  
  synchronized (this) {
   ......

    // Step 3. Start new nameservices
    if (!toAdd.isEmpty()) {

      for (String nsToAdd : toAdd) {
        … …
        BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
        bpByNameserviceId.put(nsToAdd, bpos);
        offerServices.add(bpos);
      }
    }
    startAll();
  }

 ......
}

protected BPOfferService createBPOS(
    final String nameserviceId,
    List<InetSocketAddress> nnAddrs,
    List<InetSocketAddress> lifelineNnAddrs) {
  // 根据NameNode个数创建对应的服务
  return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}

点击startAll()

synchronized void startAll() throws IOException {
  try {
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            for (BPOfferService bpos : offerServices) {
        // 启动服务
              bpos.start();
            }
            return null;
          }
        });
  } catch (InterruptedException ex) {
    ... ...
  }
}

void start() {
  for (BPServiceActor actor : bpServices) {
    actor.start();
  }
}

void start() {
  ... ...
  bpThread = new Thread(this);
  bpThread.setDaemon(true); // needed for JUnit testing
// 表示开启一个线程,所有查找该线程的run方法
  bpThread.start();

  if (lifelineSender != null) {
    lifelineSender.start();
  }
}

ctrl + f 搜索run方法

public void run() {
  LOG.info(this + " starting to offer service");

  try {
    while (true) {
      // init stuff
      try {
        // setup storage
    // 向NN 注册
        connectToNNAndHandshake();
        break;
      } catch (IOException ioe) {
        // Initial handshake, storage recovery or registration failed
        runningState = RunningState.INIT_FAILED;
        if (shouldRetryInit()) {
          // Retry until all namenode's of BPOS failed initialization
          LOG.error("Initialization failed for " + this + " "
              + ioe.getLocalizedMessage());
      // 注册失败,5s后重试
          sleepAndLogInterrupts(5000, "initializing");
        } else {
          runningState = RunningState.FAILED;
          LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
          return;
        }
      }
    }
    … …
    while (shouldRun()) {
      try {
        // 发送心跳
        offerService();
      } catch (Exception ex) {
        ... ...
      }
    }
}

private void connectToNNAndHandshake() throws IOException {
  // get NN proxy 获取NN的RPC客户端对象
  bpNamenode = dn.connectToNN(nnAddr);

  // First phase of the handshake with NN - get the namespace
  // info.
  NamespaceInfo nsInfo = retrieveNamespaceInfo();

  // Verify that this matches the other NN in this HA pair.
  // This also initializes our block pool in the DN if we are
  // the first NN connection for this BP.
  bpos.verifyAndSetNamespaceInfo(this, nsInfo);

  /* set thread name again to include NamespaceInfo when it's available. */
  this.bpThread.setName(formatThreadName("heartbeating", nnAddr));

  // 注册
  register(nsInfo);
}

DatanodeProtocolClientSideTranslatorPB connectToNN(
    InetSocketAddress nnAddr) throws IOException {
  return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
}

public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
    Configuration conf) throws IOException {
  RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
      ProtobufRpcEngine.class);
  UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}

private static DatanodeProtocolPB createNamenode(
    InetSocketAddress nameNodeAddr, Configuration conf,
    UserGroupInformation ugi) throws IOException {
  return RPC.getProxy(DatanodeProtocolPB.class,
      RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
      conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
}

返回,点击register

void register(NamespaceInfo nsInfo) throws IOException {
  // 创建注册信息
  DatanodeRegistration newBpRegistration = bpos.createRegistration();

  LOG.info(this + " beginning handshake with NN");

  while (shouldRun()) {
    try {
      // Use returned registration from namenode with updated fields
    // 把注册信息发送给NN(DN调用接口方法,执行在NN)
      newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
      newBpRegistration.setNamespaceInfo(nsInfo);
      bpRegistration = newBpRegistration;
      break;
    } catch(EOFException e) {  // namenode might have just restarted
      LOG.info("Problem connecting to server: " + nnAddr + " :"
          + e.getLocalizedMessage());
      sleepAndLogInterrupts(1000, "connecting to server");
    } catch(SocketTimeoutException e) {  // namenode is busy
      LOG.info("Problem connecting to server: " + nnAddr);
      sleepAndLogInterrupts(1000, "connecting to server");
    }
  }
  … …
}

回到NN,搜索NameNodeRpcServer

public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  checkNNStartup();
  verifySoftwareVersion(nodeReg);
  // 注册DN
  namesystem.registerDatanode(nodeReg);

  return nodeReg;
}

void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
  writeLock();
  try {
    blockManager.registerDatanode(nodeReg);
  } finally {
    writeUnlock("registerDatanode");
  }
}

public void registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  assert namesystem.hasWriteLock();
  datanodeManager.registerDatanode(nodeReg);
  bmSafeMode.checkSafeMode();
}

public void registerDatanode(DatanodeRegistration nodeReg)
    throws DisallowedDatanodeException, UnresolvedTopologyException {
  ... ...
  // register new datanode 注册DN
    addDatanode(nodeDescr);
    blockManager.getBlockReportLeaseManager().register(nodeDescr);
    // also treat the registration message as a heartbeat
    // no need to update its timestamp
    // because its is done when the descriptor is created
  // 将DN添加到心跳管理
    heartbeatManager.addDatanode(nodeDescr);
    heartbeatManager.updateDnStat(nodeDescr);
    incrementVersionCount(nodeReg.getSoftwareVersion());
    startAdminOperationIfNecessary(nodeDescr);
    success = true;
  ... ...
}

void addDatanode(final DatanodeDescriptor node) {
  // To keep host2DatanodeMap consistent with datanodeMap,
  // remove  from host2DatanodeMap the datanodeDescriptor removed
  // from datanodeMap before adding node to host2DatanodeMap.
  synchronized(this) {
    host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
  }

  networktopology.add(node); // may throw InvalidTopologyException
  host2DatanodeMap.add(node);
  checkIfClusterIsNowMultiRack(node);
  resolveUpgradeDomain(node);
  ......
}

6、向NN发送心跳

点击BPServiceActor.java中的run方法中的offerService方法

private void offerService() throws Exception {

  while (shouldRun()) {
        ... ...
        HeartbeatResponse resp = null;
        if (sendHeartbeat) {

          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          if (!dn.areHeartbeatsDisabledForTests()) {
        // 发送心跳信息
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
        ... ...
              }
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            ... ...
          }
        }
    ... ...
  }
}

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
    throws IOException {
  ... ...
  // 通过NN的RPC客户端发送给NN
  HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        slowPeers,
        slowDisks);
  ... ...
}

回到NN,搜索NameNodeRpcServer类,ctrl + f 在NameNodeRpcServer.java中搜索sendHeartbeat

public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
    int xmitsInProgress, int xceiverCount,
    int failedVolumes, VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {

  checkNNStartup();
  verifyRequest(nodeReg);

  // 处理DN发送的心跳
  return namesystem.handleHeartbeat(nodeReg, report,
      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
      failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
      slowPeers, slowDisks);
}

HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int xmitsInProgress, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  readLock();
  try {
    //get datanode commands
    final int maxTransfer = blockManager.getMaxReplicationStreams()
        - xmitsInProgress;
  // 处理DN发送过来的心跳
    DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
        nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
        xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
        slowPeers, slowDisks);

    long blockReportLeaseId = 0;
    if (requestFullBlockReportLease) {
      blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
    }
    //create ha status
    final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
        haContext.getState().getServiceState(),
        getFSImage().getCorrectLastAppliedOrWrittenTxId());

  // 响应DN的心跳
    return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
        blockReportLeaseId);
  } finally {
    readUnlock("handleHeartbeat");
  }
}


public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, final String blockPoolId,
    long cacheCapacity, long cacheUsed, int xceiverCount, 
    int maxTransfers, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  ... ...
  heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
      cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
  ... ...  
}

synchronized void updateHeartbeat(final DatanodeDescriptor node,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {
  stats.subtract(node);
  blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
      xceiverCount, failedVolumes, volumeFailureSummary);
  stats.add(node);
}


void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
    long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {

  for (StorageReport report: reports) {
    providedStorageMap.updateStorage(node, report.getStorage());
  }
  node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
      failedVolumes, volumeFailureSummary);
}


void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  heartbeatedSinceRegistration = true;
}

void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  // 更新存储
  updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  // 更新心跳时间
  setLastUpdate(Time.now());
  setLastUpdateMonotonic(Time.monotonicNow());
  rollBlocksScheduled(getLastUpdateMonotonic());
}

private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  long totalCapacity = 0;
  long totalRemaining = 0;
  long totalBlockPoolUsed = 0;
  long totalDfsUsed = 0;
  long totalNonDfsUsed = 0;
  … …

  setCacheCapacity(cacheCapacity);
  setCacheUsed(cacheUsed);
  setXceiverCount(xceiverCount);
  this.volumeFailures = volFailures;
  this.volumeFailureSummary = volumeFailureSummary;
  for (StorageReport report : reports) {

    DatanodeStorageInfo storage =
        storageMap.get(report.getStorage().getStorageID());
    if (checkFailedStorages) {
      failedStorageInfos.remove(storage);
    }

    storage.receivedHeartbeat(report);
    // skip accounting for capacity of PROVIDED storages!
    if (StorageType.PROVIDED.equals(storage.getStorageType())) {
      continue;
    }

    totalCapacity += report.getCapacity();
    totalRemaining += report.getRemaining();
    totalBlockPoolUsed += report.getBlockPoolUsed();
    totalDfsUsed += report.getDfsUsed();
    totalNonDfsUsed += report.getNonDfsUsed();
  }

  // Update total metrics for the node.
  // 更新存储相关信息
  setCapacity(totalCapacity);
  setRemaining(totalRemaining);
  setBlockPoolUsed(totalBlockPoolUsed);
  setDfsUsed(totalDfsUsed);
  setNonDfsUsed(totalNonDfsUsed);
  if (checkFailedStorages) {
    updateFailedStorage(failedStorageInfos);
  }
  long storageMapSize;
  synchronized (storageMap) {
    storageMapSize = storageMap.size();
  }
  if (storageMapSize != reports.length) {
    pruneStorageMap(reports);
  }
}

四、HDFS上传源码解析

1、概述

HDFS的写数据流程

HDFS上传源码解析

2、create创建过程

2.1 DN向NN发起创建请求

@Test
public void testPut2() throws IOException {
  FSDataOutputStream fos = fs.create(new Path("/input"));

  fos.write("hello world".getBytes());
}

//点击create,一直到抽象方法
public abstract FSDataOutputStream create(Path f,
  FsPermission permission,
  boolean overwrite,
  int bufferSize,
  short replication,
  long blockSize,
  Progressable progress) throws IOException;

ctrl+alt+B选择DistributedFileSystem实现方法

@Override
public FSDataOutputStream create(Path f, FsPermission permission,
  boolean overwrite, int bufferSize, short replication, long blockSize,
  Progressable progress) throws IOException {
  
  return this.create(f, permission,
  overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
    : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
  blockSize, progress, null);
}

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
  final EnumSet<CreateFlag> cflags, final int bufferSize,
  final short replication, final long blockSize,
  final Progressable progress, final ChecksumOpt checksumOpt)
  throws IOException {
  
  statistics.incrementWriteOps(1);
  storageStatistics.incrementOpCounter(OpType.CREATE);
  Path absF = fixRelativePart(f);
  
  return new FileSystemLinkResolver<FSDataOutputStream>() {

    @Override
    public FSDataOutputStream doCall(final Path p) throws IOException {

    // 创建获取了一个输出流对象
    final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
      cflags, replication, blockSize, progress, bufferSize,
      checksumOpt);
    // 这里将上面创建的dfsos进行包装并返回
    return dfs.createWrappedOutputStream(dfsos, statistics);
    }

    @Override
    public FSDataOutputStream next(final FileSystem fs, final Path p)
      throws IOException {
    return fs.create(p, permission, cflags, bufferSize,
      replication, blockSize, progress, checksumOpt);
    }
  }.resolve(this, absF);
}

public DFSOutputStream create(String src, FsPermission permission,
  EnumSet<CreateFlag> flag, short replication, long blockSize,
  Progressable progress, int buffersize, ChecksumOpt checksumOpt)
  throws IOException {
  
  return create(src, permission, flag, true,
  replication, blockSize, progress, buffersize, checksumOpt, null);
}

public DFSOutputStream create(String src, FsPermission permission,
  EnumSet<CreateFlag> flag, boolean createParent, short replication,
  long blockSize, Progressable progress, int buffersize,
  ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)
  throws IOException {
  
  return create(src, permission, flag, createParent, replication, blockSize,
  progress, buffersize, checksumOpt, favoredNodes, null);
}

public DFSOutputStream create(String src, FsPermission permission,
  EnumSet<CreateFlag> flag, boolean createParent, short replication,
  long blockSize, Progressable progress, int buffersize,
  ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
  String ecPolicyName) throws IOException {
  
  checkOpen();
  
  final FsPermission masked = applyUMask(permission);
  LOG.debug("{}: masked={}", src, masked);
  
  final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
    src, masked, flag, createParent, replication, blockSize, progress,
    dfsClientConf.createChecksum(checksumOpt),
    getFavoredNodesStr(favoredNodes), ecPolicyName);
    
  beginFileLease(result.getFileId(), result);
  
  return result;
}

点击newStreamForCreate,进入DFSOutputStream.java

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
  FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
  short replication, long blockSize, Progressable progress,
  DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
  throws IOException {
  
  try (TraceScope ignored =
       dfsClient.newPathTraceScope("newStreamForCreate", src)) {
    HdfsFileStatus stat = null;

    // Retry the create if we get a RetryStartFileException up to a maximum
    // number of times
    boolean shouldRetry = true;
    int retryCount = CREATE_RETRY_COUNT;

    while (shouldRetry) {
    shouldRetry = false;
    try {
      // DN将创建请求发送给NN(RPC)
      stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
        new EnumSetWritable<>(flag), createParent, replication,
        blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
      break;
    } catch (RemoteException re) {
      … ….
    }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
    final DFSOutputStream out;

    if(stat.getErasureCodingPolicy() != null) {
    out = new DFSStripedOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes);
    } else {
    out = new DFSOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes, true);
    }

    // 开启线程run,DataStreamer extends Daemon extends Thread
    out.start();

    return out;
  }
}

2.2 NN处理DN的创建请求

点击create

HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    throws IOException;

查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create

public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    throws IOException {
  // 检查NN启动
  checkNNStartup();
  ... ...

  HdfsFileStatus status = null;
  try {
    PermissionStatus perm = new PermissionStatus(getRemoteUser()
        .getShortUserName(), null, masked);
  // 重要
    status = namesystem.startFile(src, perm, clientName, clientMachine,
        flag.get(), createParent, replication, blockSize, supportedVersions,
        ecPolicyName, cacheEntry != null);
  } finally {
    RetryCache.setState(cacheEntry, status != null, status);
  }

  metrics.incrFilesCreated();
  metrics.incrCreateFileOps();
  return status;
}

HdfsFileStatus startFile(String src, PermissionStatus permissions,
    String holder, String clientMachine, EnumSet<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
    boolean logRetryCache) throws IOException {

  HdfsFileStatus status;
  try {
    status = startFileInt(src, permissions, holder, clientMachine, flag,
        createParent, replication, blockSize, supportedVersions, ecPolicyName,
        logRetryCache);
  } catch (AccessControlException e) {
    logAuditEvent(false, "create", src);
    throw e;
  }
  logAuditEvent(true, "create", src, status);
  return status;
}

private HdfsFileStatus startFileInt(String src,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet<CreateFlag> flag, boolean createParent, short replication,
    long blockSize, CryptoProtocolVersion[] supportedVersions,
    String ecPolicyName, boolean logRetryCache) throws IOException {       
  ... ...
  stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
        clientMachine, flag, createParent, replication, blockSize, feInfo,
        toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
  ... ...
}

static HdfsFileStatus startFile(
    ... ...)
    throws IOException {
  
  ... ...
  FSDirectory fsd = fsn.getFSDirectory();

  // 文件路径是否存在校验
  if (iip.getLastINode() != null) {
    if (overwrite) {
      List<INode> toRemoveINodes = new ChunkedArrayList<>();
      List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
      long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
                                      toRemoveINodes, toRemoveUCFiles, now());
      if (ret >= 0) {
        iip = INodesInPath.replace(iip, iip.length() - 1, null);
        FSDirDeleteOp.incrDeletedFileCount(ret);
        fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
      }
    } else {
      // If lease soft limit time is expired, recover the lease
      fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
                               src, holder, clientMachine, false);
      throw new FileAlreadyExistsException(src + " for client " +
          clientMachine + " already exists");
    }
  }
  fsn.checkFsObjectLimit();
  INodeFile newNode = null;
  INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
  if (parent != null) {
    // 添加文件元数据信息
    iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
        replication, blockSize, holder, clientMachine, shouldReplicate,
        ecPolicyName);
    newNode = iip != null ? iip.getLastINode().asFile() : null;
  }
  ... ...
  setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
  fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
  if (NameNode.stateChangeLog.isDebugEnabled()) {
    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
        src + " inode " + newNode.getId() + " " + holder);
  }
  return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
}

private static INodesInPath addFile(
    FSDirectory fsd, INodesInPath existing, byte[] localName,
    PermissionStatus permissions, short replication, long preferredBlockSize,
    String clientName, String clientMachine, boolean shouldReplicate,
    String ecPolicyName) throws IOException {

  Preconditions.checkNotNull(existing);
  long modTime = now();
  INodesInPath newiip;
  fsd.writeLock();
  try {
    … …

    newiip = fsd.addINode(existing, newNode, permissions.getPermission());
  } finally {
    fsd.writeUnlock();
  }
  ... ...
  return newiip;
}

INodesInPath addINode(INodesInPath existing, INode child,
                      FsPermission modes)
    throws QuotaExceededException, UnresolvedLinkException {
  cacheName(child);
  writeLock();
  try {
    // 将数据写入到INode的目录树中
    return addLastINode(existing, child, modes, true);
  } finally {
    writeUnlock();
  }
}

2.3 DataStreamer启动流程

NN处理完DN请求后,再次回到DN端,启动对应的线程

//DFSOutputStream.java
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
  FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
  short replication, long blockSize, Progressable progress,
  DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
  throws IOException {
  ... ...
  // DN将创建请求发送给NN(RPC)
  stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
    new EnumSetWritable<>(flag), createParent, replication,
    blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
  ... ...
  
  // 创建输出流
  out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
  // 开启线程run,DataStreamer extends Daemon extends Thread
  out.start();

  return out;
}
//点击DFSOutputStream
protected DFSOutputStream(DFSClient dfsClient, String src,
    HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
  this(dfsClient, src, flag, progress, stat, checksum);
  this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);

  // Directory => File => Block(128M) => packet(64K) => chunk(chunk 512byte + chunksum 4byte)
  computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
      bytesPerChecksum);

  if (createStreamer) {
    streamer = new DataStreamer(stat, null, dfsClient, src, progress,
        checksum, cachingStrategy, byteArrayManager, favoredNodes,
        addBlockFlags);
  }
}

点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java

protected synchronized void start() {
  getStreamer().start();
}

protected DataStreamer getStreamer() {
  return streamer;
}

//点击DataStreamer,进入DataStreamer.java
//点击Daemon,进入Daemon.java
//说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法

@Override
public void run() {

  long lastPacket = Time.monotonicNow();
  TraceScope scope = null;
  while (!streamerClosed && dfsClient.clientRunning) {
    // if the Responder encountered an error, shutdown Responder
    if (errorState.hasError()) {
    closeResponder();
    }

    DFSPacket one;
    try {
    // process datanode IO errors if any
    boolean doSleep = processDatanodeOrExternalError();

    final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      … …
      try {
        // 如果dataQueue里面没有数据,代码会阻塞在这儿
        dataQueue.wait(timeout);
      } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
      }
      doSleep = false;
      now = Time.monotonicNow();
      }
      … …
      //  队列不为空,从队列中取出packet
      one = dataQueue.getFirst(); // regular data packet
      SpanId[] parents = one.getTraceParents();
      if (parents.length > 0) {
        scope = dfsClient.getTracer().
          newScope("dataStreamer", parents[0]);
        scope.getSpan().setParents(parents);
      }
      }
    }
    … …
}

3、write上传过程

3.1 向DataStreamer的队列里面写数据

@Test
public void testPut2() throws IOException {
    FSDataOutputStream fos = fs.create(new Path("/input"));

    fos.write("hello world".getBytes());
}

// 一路点击write,直到抽象方法
public abstract void write(int b) throws IOException;
//ctrl+alt+b查看实现类,选择FSOutputSummer.java
public synchronized void write(int b) throws IOException {
  buf[count++] = (byte)b;
  if(count == buf.length) {
    flushBuffer();
  }
}

protected synchronized void flushBuffer() throws IOException {
  flushBuffer(false, true);
}

protected synchronized int flushBuffer(boolean keep,
    boolean flushPartial) throws IOException {
  int bufLen = count;
  int partialLen = bufLen % sum.getBytesPerChecksum();
  int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;

  if (lenToFlush != 0) {
// 向队列中写数据   
// Directory => File => Block(128M) => package(64K) => chunk(chunk 512byte + chunksum 4byte)
writeChecksumChunks(buf, 0, lenToFlush);

    if (!flushPartial || keep) {
      count = partialLen;
      System.arraycopy(buf, bufLen - count, buf, 0, count);
    } else {
      count = 0;
    }
  }

  // total bytes left minus unflushed bytes left
  return count - (bufLen - lenToFlush);
}

private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {

  // 计算chunk的校验和
  sum.calculateChunkedSums(b, off, len, checksum, 0);
  TraceScope scope = createWriteTraceScope();

  // 按照chunk的大小遍历数据
  try {
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();

    // 一个chunk一个chunk的将数据写入队列
      writeChunk(b, off + i, chunkLen, checksum, ckOffset,
          getChecksumSize());
    }
  } finally {
    if (scope != null) {
      scope.close();
    }
  }
}

protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
   byte[] checksum, int checksumOffset, int checksumLen) throws IOException;

//同理查找实现类DFSOutputStream
protected synchronized void writeChunk(byte[] b, int offset, int len,
    byte[] checksum, int ckoff, int cklen) throws IOException {
  
  writeChunkPrepare(len, ckoff, cklen);

  // 往packet里面写chunk的校验和 4byte
  currentPacket.writeChecksum(checksum, ckoff, cklen);

  // 往packet里面写一个chunk 512 byte
  currentPacket.writeData(b, offset, len);

  // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了
  currentPacket.incNumChunks();
  getStreamer().incBytesCurBlock(len);

  // If packet is full, enqueue it for transmission
  if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
      getStreamer().getBytesCurBlock() == blockSize) {
    enqueueCurrentPacketFull();
  }
}


synchronized void enqueueCurrentPacketFull() throws IOException {
  LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
          + " appendChunk={}, {}", currentPacket, src, getStreamer()
          .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
      getStreamer());

  enqueueCurrentPacket();

  adjustChunkBoundary();

  endBlock();
}

void enqueueCurrentPacket() throws IOException {
  getStreamer().waitAndQueuePacket(currentPacket);
  currentPacket = null;
}

void waitAndQueuePacket(DFSPacket packet) throws IOException {
  synchronized (dataQueue) {
    try {
    // 如果队列满了,等待
      // If queue is full, then wait till we have enough space
      boolean firstWait = true;
      try {
        while (!streamerClosed && dataQueue.size() + ackQueue.size() >
            dfsClient.getConf().getWriteMaxPackets()) {
          if (firstWait) {
            Span span = Tracer.getCurrentSpan();
            if (span != null) {
              span.addTimelineAnnotation("dataQueue.wait");
            }
            firstWait = false;
          }
          try {
            dataQueue.wait();
          } catch (InterruptedException e) {
            ... ...
          }
        }
      } finally {
        Span span = Tracer.getCurrentSpan();
        if ((span != null) && (!firstWait)) {
          span.addTimelineAnnotation("end.wait");
        }
      }
      checkClosed();
    // 如果队列没满,向队列中添加数据
      queuePacket(packet);
    } catch (ClosedChannelException ignored) {
    }
  }
}


DataStreamer.java

void queuePacket(DFSPacket packet) {
  synchronized (dataQueue) {
    if (packet == null) return;
    packet.addTraceParent(Tracer.getCurrentSpanId());

  // 向队列中添加数据
    dataQueue.addLast(packet);

    lastQueuedSeqno = packet.getSeqno();
    LOG.debug("Queued {}, {}", packet, this);

  // 通知队列添加数据完成
    dataQueue.notifyAll();
  }
}

3.2 建立管道之机架感知(块存储位置)

全局查找DataStreamer,搜索run方法

@Override
public void run() {

  long lastPacket = Time.monotonicNow();
  TraceScope scope = null;
  while (!streamerClosed && dfsClient.clientRunning) {
    // if the Responder encountered an error, shutdown Responder
    if (errorState.hasError()) {
    closeResponder();
    }

    DFSPacket one;
    try {
    // process datanode IO errors if any
    boolean doSleep = processDatanodeOrExternalError();

    final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      long now = Time.monotonicNow();
      while ((!shouldStop() && dataQueue.size() == 0 &&
        (stage != BlockConstructionStage.DATA_STREAMING ||
          now - lastPacket < halfSocketTimeout)) || doSleep) {
      long timeout = halfSocketTimeout - (now-lastPacket);
      timeout = timeout <= 0 ? 1000 : timeout;
      timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
        timeout : 1000;
      try {
        // 如果dataQueue里面没有数据,代码会阻塞在这儿
        dataQueue.wait(timeout); // 接收到notify消息
      } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
      }
      doSleep = false;
      now = Time.monotonicNow();
      }
      if (shouldStop()) {
      continue;
      }
      // get packet to be sent.
      if (dataQueue.isEmpty()) {
      one = createHeartbeatPacket();
      } else {
      try {
        backOffIfNecessary();
      } catch (InterruptedException e) {
        LOG.warn("Caught exception", e);
      }
      //  队列不为空,从队列中取出packet
      one = dataQueue.getFirst(); // regular data packet
      SpanId[] parents = one.getTraceParents();
      if (parents.length > 0) {
        scope = dfsClient.getTracer().
          newScope("dataStreamer", parents[0]);
        scope.getSpan().setParents(parents);
      }
      }
    }

    // get new block from namenode.
    if (LOG.isDebugEnabled()) {
      LOG.debug("stage=" + stage + ", " + this);
    }
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
      LOG.debug("Allocating new block: {}", this);
      // 步骤一:向NameNode 申请block 并建立数据管道
      setPipeline(nextBlockOutputStream());
      // 步骤二:启动ResponseProcessor用来监听packet发送是否成功
      initDataStreaming();
    } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
      setupPipelineForAppendOrRecovery();
      if (streamerClosed) {
      continue;
      }
      initDataStreaming();
    }

    long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
    if (lastByteOffsetInBlock > stat.getBlockSize()) {
      throw new IOException("BlockSize " + stat.getBlockSize() +
        " < lastByteOffsetInBlock, " + this + ", " + one);
    }
    … …
    // send the packet
    SpanId spanId = SpanId.INVALID;
    synchronized (dataQueue) {

      // move packet from dataQueue to ackQueue
      if (!one.isHeartbeatPacket()) {
      if (scope != null) {
        spanId = scope.getSpanId();
        scope.detach();

        one.setTraceScope(scope);
      }
      scope = null;
      // 步骤三:从dataQueue 把要发送的这个packet 移除出去
      dataQueue.removeFirst();
      // 步骤四:然后往ackQueue 里面添加这个packet
      ackQueue.addLast(one);
      packetSendTime.put(one.getSeqno(), Time.monotonicNow());
      dataQueue.notifyAll();
      }
    }

    LOG.debug("{} sending {}", this, one);

    // write out data to remote datanode
    try (TraceScope ignored = dfsClient.getTracer().
      newScope("DataStreamer#writeTo", spanId)) {
      //  将数据写出去
      one.writeTo(blockStream);
      blockStream.flush();
    } catch (IOException e) {
      errorState.markFirstNodeIfNotMarked();
      throw e;
    }
    … …
}


//点击nextBlockOutputStream
protected LocatedBlock nextBlockOutputStream() throws IOException {
  LocatedBlock lb;
  DatanodeInfo[] nodes;
  StorageType[] nextStorageTypes;
  String[] nextStorageIDs;
  int count = dfsClient.getConf().getNumBlockWriteRetry();
  boolean success;
  final ExtendedBlock oldBlock = block.getCurrentBlock();
  do {
    errorState.resetInternalError();
    lastException.clear();

    DatanodeInfo[] excluded = getExcludedNodes();
  // 向NN获取向哪个DN写数据
    lb = locateFollowingBlock(
        excluded.length > 0 ? excluded : null, oldBlock);

    // 创建管道
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
          0L, false);
    ......
  } while (!success && --count >= 0);

  if (!success) {
    throw new IOException("Unable to create new block.");
  }
  return lb;
}

private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
    ExtendedBlock oldBlock) throws IOException {
  return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
      stat.getFileId(), favoredNodes, addBlockFlags);
}

static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
      DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
      String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)
      throws IOException {
    ... ...
    // 向NN获取向哪个DN写数据
    return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
            excludedNodes, fileId, favoredNodes, allocFlags);
    ... ...
}

LocatedBlock addBlock(String src, String clientName,
      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
      String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
      throws IOException;

回到namenode,点击NameNodeRpcServer,在该类中搜索addBlock

public LocatedBlock addBlock(String src, String clientName,
    ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
    String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
    throws IOException {
  checkNNStartup();
  LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
      clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
  if (locatedBlock != null) {
    metrics.incrAddBlockOps();
  }
  return locatedBlock;
}

LocatedBlock getAdditionalBlock(
    String src, long fileId, String clientName, ExtendedBlock previous,
    DatanodeInfo[] excludedNodes, String[] favoredNodes,
    EnumSet<AddBlockFlag> flags) throws IOException {
  final String operationName = "getAdditionalBlock";
  NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {}  inodeId {}" +
      " for {}", src, fileId, clientName);

  ... ...
  // 选择块存储位置
  DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
      blockManager, src, excludedNodes, favoredNodes, flags, r);

  ... ...
  return lb;
}

static DatanodeStorageInfo[] chooseTargetForNewBlock(
    BlockManager bm, String src, DatanodeInfo[] excludedNodes,
    String[] favoredNodes, EnumSet<AddBlockFlag> flags,
    ValidateAddBlockResult r) throws IOException {
  ... ...
  return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
                                  excludedNodesSet, r.blockSize,
                                  favoredNodesList, r.storagePolicyID,
                                  r.blockType, r.ecPolicy, flags);
}

public DatanodeStorageInfo[] chooseTarget4NewBlock(... ...
  ) throws IOException {
  ... ...
    
  final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
      numOfReplicas, client, excludedNodes, blocksize, 
      favoredDatanodeDescriptors, storagePolicy, flags);

  ... ...
  return targets;
}

DatanodeStorageInfo[] chooseTarget(String src,
    int numOfReplicas, Node writer,
    Set<Node> excludedNodes,
    long blocksize,
    List<DatanodeDescriptor> favoredNodes,
    BlockStoragePolicy storagePolicy,
    EnumSet<AddBlockFlag> flags) {
  
  return chooseTarget(src, numOfReplicas, writer, 
      new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
      excludedNodes, blocksize, storagePolicy, flags);
}

public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
    int numOfReplicas,
    Node writer,
    List<DatanodeStorageInfo> chosen,
    boolean returnChosenNodes,
    Set<Node> excludedNodes,
    long blocksize,
    BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags);

// 查找chooseTarget实现类BlockPlacementPolicyDefault.java
public DatanodeStorageInfo[] chooseTarget(String srcPath,
    int numOfReplicas,
    Node writer,
    List<DatanodeStorageInfo> chosenNodes,
    boolean returnChosenNodes,
    Set<Node> excludedNodes,
    long blocksize,
    final BlockStoragePolicy storagePolicy,
    EnumSet<AddBlockFlag> flags) {
  
  return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
      excludedNodes, blocksize, storagePolicy, flags, null);
}

private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
  Node writer,
  List<DatanodeStorageInfo> chosenStorage,
  boolean returnChosenNodes,
  Set<Node> excludedNodes,
  long blocksize,
  final BlockStoragePolicy storagePolicy,
  EnumSet<AddBlockFlag> addBlockFlags,
  EnumMap<StorageType, Integer> sTypes) {
  … …
   
  int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
  numOfReplicas = result[0];
  int maxNodesPerRack = result[1];
    
  for (DatanodeStorageInfo storage : chosenStorage) {
    // add localMachine and related nodes to excludedNodes
  // 获取不可用的DN
    addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
  }

  List<DatanodeStorageInfo> results = null;
  Node localNode = null;
  boolean avoidStaleNodes = (stats != null
      && stats.isAvoidingStaleDataNodesForWrite());
  //   
  boolean avoidLocalNode = (addBlockFlags != null
      && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
      && writer != null
      && !excludedNodes.contains(writer));
  // Attempt to exclude local node if the client suggests so. If no enough
  // nodes can be obtained, it falls back to the default block placement
  // policy.

  // 有数据正在写,避免都写入本地
  if (avoidLocalNode) {
    results = new ArrayList<>(chosenStorage);
    Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
    if (writer != null) {
      excludedNodeCopy.add(writer);
    }
    localNode = chooseTarget(numOfReplicas, writer,
        excludedNodeCopy, blocksize, maxNodesPerRack, results,
        avoidStaleNodes, storagePolicy,
        EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
    if (results.size() < numOfReplicas) {
      // not enough nodes; discard results and fall back
      results = null;
    }
  }
  if (results == null) {
    results = new ArrayList<>(chosenStorage);
  // 真正的选择DN节点
    localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
        blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
        sTypes);
  }

  if (!returnChosenNodes) {  
    results.removeAll(chosenStorage);
  }
    
  // sorting nodes to form a pipeline
  return getPipeline(
      (writer != null && writer instanceof DatanodeDescriptor) ? writer
          : localNode,
      results.toArray(new DatanodeStorageInfo[results.size()]));
}

private Node chooseTarget(int numOfReplicas,
   ... ...) {
  
   writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
          maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
   ... ...
}

protected Node chooseTargetInOrder(int numOfReplicas, 
                               Node writer,
                               final Set<Node> excludedNodes,
                               final long blocksize,
                               final int maxNodesPerRack,
                               final List<DatanodeStorageInfo> results,
                               final boolean avoidStaleNodes,
                               final boolean newBlock,
                               EnumMap<StorageType, Integer> storageTypes)
                               throws NotEnoughReplicasException {
  final int numOfResults = results.size();
  if (numOfResults == 0) {
  // 第一个块存储在当前节点
    DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storageTypes, true);

    writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                                   : null;

    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
  // 第二个块存储在另外一个机架
  if (numOfResults <= 1) {
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  if (numOfResults <= 2) {
    final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
  // 如果第一个和第二个在同一个机架,那么第三个放在其他机架
    if (clusterMap.isOnSameRack(dn0, dn1)) {
      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else if (newBlock){
    // 如果是新块,和第二个块存储在同一个机架
      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else {
    // 如果不是新块,放在当前机架
      chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    }
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes);
  return writer;
}

3.3 建立管道之Socket发送

点击DataStreamer的nextBlockOutputStream

protected LocatedBlock nextBlockOutputStream() throws IOException {
  LocatedBlock lb;
  DatanodeInfo[] nodes;
  StorageType[] nextStorageTypes;
  String[] nextStorageIDs;
  int count = dfsClient.getConf().getNumBlockWriteRetry();
  boolean success;
  final ExtendedBlock oldBlock = block.getCurrentBlock();
  do {
    errorState.resetInternalError();
    lastException.clear();

    DatanodeInfo[] excluded = getExcludedNodes();
  // 向NN获取向哪个DN写数据
    lb = locateFollowingBlock(
        excluded.length > 0 ? excluded : null, oldBlock);

    // 创建管道
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
          0L, false);
    … …
  } while (!success && --count >= 0);

  if (!success) {
    throw new IOException("Unable to create new block.");
  }
  return lb;
}

boolean createBlockOutputStream(DatanodeInfo[] nodes,
      StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
      long newGS, boolean recoveryFlag) {
    ... ...
  // 和DN创建socket
  s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
  
  // 获取输出流,用于写数据到DN
  OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
  // 获取输入流,用于读取写数据到DN的结果
    InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
  
    IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
        unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
    unbufOut = saslStreams.out;
    unbufIn = saslStreams.in;
    out = new DataOutputStream(new BufferedOutputStream(unbufOut,
        DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
    blockReplyStream = new DataInputStream(unbufIn);
  
  // 发送数据
  new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
            nodes.length, block.getNumBytes(), bytesSent, newGS,
            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
            (targetPinnings != null && targetPinnings[0]), targetPinnings,
            nodeStorageIDs[0], nodeStorageIDs);
  ... ...
}

public void writeBlock(... ...) throws IOException {
  ... ...

  send(out, Op.WRITE_BLOCK, proto.build());
}

3.4 建立管道之Socket接收

全局查找DataXceiverServer.java,在该类中查找run方法

public void run() {
  Peer peer = null;
  while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
    try {
    // 接收socket的请求
      peer = peerServer.accept();

      // Make sure the xceiver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > maxXceiverCount) {
        throw new IOException("Xceiver count " + curXceiverCount
            + " exceeds the limit of concurrent xcievers: "
            + maxXceiverCount);
      }
    // 客户端每发送一个block,都启动一个DataXceiver去处理block
      new Daemon(datanode.threadGroup,
          DataXceiver.create(peer, datanode, this))
          .start();
    } catch (SocketTimeoutException ignored) {
      ... ...
    }
  }

  ... ...
}

点击DataXceiver(线程),查找run方法

public void run() {
  int opsProcessed = 0;
  Op op = null;

  try {
    synchronized(this) {
      xceiver = Thread.currentThread();
    }
    dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
    peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
    InputStream input = socketIn;
    try {
      IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
        socketIn, datanode.getXferAddress().getPort(),

      return;
    }
    
    super.initialize(new DataInputStream(input));
    
    do {
      updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

      try {
        if (opsProcessed != 0) {
          assert dnConf.socketKeepaliveTimeout > 0;
          peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
        } else {
          peer.setReadTimeout(dnConf.socketTimeout);
        }
    // 读取这次数据的请求类型
        op = readOp();
      } catch (InterruptedIOException ignored) {
        // Time out while we wait for client rpc
        break;
      } catch (EOFException | ClosedChannelException e) {
        // Since we optimistically expect the next op, it's quite normal to
        // get EOF here.
        LOG.debug("Cached {} closing after {} ops.  " +
            "This message is usually benign.", peer, opsProcessed);
        break;
      } catch (IOException err) {
        incrDatanodeNetworkErrors();
        throw err;
      }

      // restore normal timeout
      if (opsProcessed != 0) {
        peer.setReadTimeout(dnConf.socketTimeout);
      }

      opStartTime = monotonicNow();
    // 根据操作类型处理我们的数据
      processOp(op);
      ++opsProcessed;
    } while ((peer != null) &&
        (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
  } catch (Throwable t) {
    ... ... 
  } 
}

protected final void processOp(Op op) throws IOException {
  switch(op) {
  ... ...
  case WRITE_BLOCK:
    opWriteBlock(in);
    break;
  ... ...
  default:
    throw new IOException("Unknown op " + op + " in data stream");
  }
}

private void opWriteBlock(DataInputStream in) throws IOException {
  final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
  final DatanodeInfo[] targets = PBHelperClient.convert(proto.getTargetsList());
  TraceScope traceScope = continueTraceSpan(proto.getHeader(),
      proto.getClass().getSimpleName());
  try {
    writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelperClient.convertStorageType(proto.getStorageType()),
        PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        targets,
        PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
        PBHelperClient.convert(proto.getSource()),
        fromProto(proto.getStage()),
        proto.getPipelineSize(),
        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
        proto.getLatestGenerationStamp(),
        fromProto(proto.getRequestedChecksum()),
        (proto.hasCachingStrategy() ?
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()),
        (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
        (proto.hasPinning() ? proto.getPinning(): false),
        (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
        proto.getStorageId(),
        proto.getTargetStorageIdsList().toArray(new String[0]));
  } finally {
   if (traceScope != null) traceScope.close();
  }
}

ctrl +alt +b 查找writeBlock的实现类DataXceiver.java

public void writeBlock(... ...) throws IOException {
  ... ...
  try {
    final Replica replica;
    if (isDatanode || 
        stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
      // open a block receiver
    // 创建一个BlockReceiver
      setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
          peer.getRemoteAddressString(),
          peer.getLocalAddressString(),
          stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
          clientname, srcDataNode, datanode, requestedChecksum,
          cachingStrategy, allowLazyPersist, pinning, storageId));
      replica = blockReceiver.getReplica();
    } else {
      replica = datanode.data.recoverClose(
          block, latestGenerationStamp, minBytesRcvd);
    }
    storageUuid = replica.getStorageUuid();
    isOnTransientStorage = replica.isOnTransientStorage();

    //
    // Connect to downstream machine, if appropriate
    // 继续连接下游的机器
    if (targets.length > 0) {
      InetSocketAddress mirrorTarget = null;
      // Connect to backup machine
      mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
      LOG.debug("Connecting to datanode {}", mirrorNode);
      mirrorTarget = NetUtils.createSocketAddr(mirrorNode);

    // 向新的副本发送socket
      mirrorSock = datanode.newSocket();
      try {

        ... ...
        if (targetPinnings != null && targetPinnings.length > 0) {
      // 往下游socket发送数据
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes,
              srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy,
              allowLazyPersist, targetPinnings[0], targetPinnings,
              targetStorageId, targetStorageIds);
        } else {
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes,
              srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy,
              allowLazyPersist, false, targetPinnings,
              targetStorageId, targetStorageIds);
        }

        mirrorOut.flush();

        DataNodeFaultInjector.get().writeBlockAfterFlush();

        // read connect ack (only for clients, not for replication req)
        if (isClient) {
          BlockOpResponseProto connectAck =
            BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
          mirrorInStatus = connectAck.getStatus();
          firstBadLink = connectAck.getFirstBadLink();
          if (mirrorInStatus != SUCCESS) {
            LOG.debug("Datanode {} got response for connect" +
                "ack  from downstream datanode with firstbadlink as {}",
                targets.length, firstBadLink);
          }
        }

      … …

  //update metrics
  datanode.getMetrics().addWriteBlockOp(elapsed());
  datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
}

BlockReceiver getBlockReceiver(
    final ExtendedBlock block, final StorageType storageType,
    final DataInputStream in,
    final String inAddr, final String myAddr,
    final BlockConstructionStage stage,
    final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
    final String clientname, final DatanodeInfo srcDataNode,
    final DataNode dn, DataChecksum requestedChecksum,
    CachingStrategy cachingStrategy,
    final boolean allowLazyPersist,
    final boolean pinning,
    final String storageId) throws IOException {
  return new BlockReceiver(block, storageType, in,
      inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
      clientname, srcDataNode, dn, requestedChecksum,
      cachingStrategy, allowLazyPersist, pinning, storageId);
}

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
  final DataInputStream in,
  final String inAddr, final String myAddr,
  final BlockConstructionStage stage, 
  final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
  final String clientname, final DatanodeInfo srcDataNode,
  final DataNode datanode, DataChecksum requestedChecksum,
  CachingStrategy cachingStrategy,
  final boolean allowLazyPersist,
  final boolean pinning,
  final String storageId) throws IOException {
  ... ...
  if (isDatanode) { //replication or move
    replicaHandler =
        datanode.data.createTemporary(storageType, storageId, block, false);
  } else {
    switch (stage) {
    case PIPELINE_SETUP_CREATE:
    // 创建管道
      replicaHandler = datanode.data.createRbw(storageType, storageId,
          block, allowLazyPersist);
      datanode.notifyNamenodeReceivingBlock(
          block, replicaHandler.getReplica().getStorageUuid());
      break;
    ... ...
    default: throw new IOException("Unsupported stage " + stage + 
          " while receiving block " + block + " from " + inAddr);
    }
  }
  ... ...
}

public ReplicaHandler createRbw(
    StorageType storageType, String storageId, ExtendedBlock b,
    boolean allowLazyPersist) throws IOException {
  try (AutoCloseableLock lock = datasetLock.acquire()) {
    ... ...

    if (ref == null) {
      ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
    }

    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
    // create an rbw file to hold block in the designated volume

    if (allowLazyPersist && !v.isTransientStorage()) {
      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
    }

    ReplicaInPipeline newReplicaInfo;
    try {
    // 创建输出流的临时写文件 
      newReplicaInfo = v.createRbw(b);
      if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
        throw new IOException("CreateRBW returned a replica of state "
            + newReplicaInfo.getReplicaInfo().getState()
            + " for block " + b.getBlockId());
      }
    } catch (IOException e) {
      IOUtils.cleanup(null, ref);
      throw e;
    }

    volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
    return new ReplicaHandler(newReplicaInfo, ref);
  }
}

public ReplicaHandler createRbw(
    StorageType storageType, String storageId, ExtendedBlock b,
    boolean allowLazyPersist) throws IOException {
  try (AutoCloseableLock lock = datasetLock.acquire()) {
    ... ...

    if (ref == null) {
    // 有可能有多个临时写文件
      ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
    }

    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
    // create an rbw file to hold block in the designated volume

    if (allowLazyPersist && !v.isTransientStorage()) {
      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
    }

    ReplicaInPipeline newReplicaInfo;
    try {
    // 创建输出流的临时写文件 
      newReplicaInfo = v.createRbw(b);
      if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
        throw new IOException("CreateRBW returned a replica of state "
            + newReplicaInfo.getReplicaInfo().getState()
            + " for block " + b.getBlockId());
      }
    } catch (IOException e) {
      IOUtils.cleanup(null, ref);
      throw e;
    }

    volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
    return new ReplicaHandler(newReplicaInfo, ref);
  }
}

public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {

  File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
  LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
      .setBlockId(b.getBlockId())
      .setGenerationStamp(b.getGenerationStamp())
      .setFsVolume(this)
      .setDirectoryToUse(f.getParentFile())
      .setBytesToReserve(b.getNumBytes())
      .buildLocalReplicaInPipeline();
  return newReplicaInfo;
}

3.5 客户端接收DN写数据应答Response

全局查找DataStreamer,搜索run方法

@Override
public void run() {

  long lastPacket = Time.monotonicNow();
  TraceScope scope = null;
  while (!streamerClosed && dfsClient.clientRunning) {
    // if the Responder encountered an error, shutdown Responder
    if (errorState.hasError()) {
    closeResponder();
    }

    DFSPacket one;
    try {
    // process datanode IO errors if any
    boolean doSleep = processDatanodeOrExternalError();

    final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      long now = Time.monotonicNow();
      while ((!shouldStop() && dataQueue.size() == 0 &&
        (stage != BlockConstructionStage.DATA_STREAMING ||
          now - lastPacket < halfSocketTimeout)) || doSleep) {
      long timeout = halfSocketTimeout - (now-lastPacket);
      timeout = timeout <= 0 ? 1000 : timeout;
      timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
        timeout : 1000;
      try {
        // 如果dataQueue里面没有数据,代码会阻塞在这儿
        dataQueue.wait(timeout); // 接收到notify消息
      } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
      }
      doSleep = false;
      now = Time.monotonicNow();
      }
      if (shouldStop()) {
      continue;
      }
      // get packet to be sent.
      if (dataQueue.isEmpty()) {
      one = createHeartbeatPacket();
      } else {
      try {
        backOffIfNecessary();
      } catch (InterruptedException e) {
        LOG.warn("Caught exception", e);
      }
      //  队列不为空,从队列中取出packet
      one = dataQueue.getFirst(); // regular data packet
      SpanId[] parents = one.getTraceParents();
      if (parents.length > 0) {
        scope = dfsClient.getTracer().
          newScope("dataStreamer", parents[0]);
        scope.getSpan().setParents(parents);
      }
      }
    }

    // get new block from namenode.
    if (LOG.isDebugEnabled()) {
      LOG.debug("stage=" + stage + ", " + this);
    }
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
      LOG.debug("Allocating new block: {}", this);
      // 步骤一:向NameNode 申请block 并建立数据管道
      setPipeline(nextBlockOutputStream());
      // 步骤二:启动ResponseProcessor用来监听packet发送是否成功
      initDataStreaming();
    } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
      LOG.debug("Append to block {}", block);
      setupPipelineForAppendOrRecovery();
      if (streamerClosed) {
      continue;
      }
      initDataStreaming();
    }

    long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
    if (lastByteOffsetInBlock > stat.getBlockSize()) {
      throw new IOException("BlockSize " + stat.getBlockSize() +
        " < lastByteOffsetInBlock, " + this + ", " + one);
    }

    if (one.isLastPacketInBlock()) {
      // wait for all data packets have been successfully acked
      synchronized (dataQueue) {
      while (!shouldStop() && ackQueue.size() != 0) {
        try {
        // wait for acks to arrive from datanodes
        dataQueue.wait(1000);
        } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
        }
      }
      }
      if (shouldStop()) {
      continue;
      }
      stage = BlockConstructionStage.PIPELINE_CLOSE;
    }

    // send the packet
    SpanId spanId = SpanId.INVALID;
    synchronized (dataQueue) {

      // move packet from dataQueue to ackQueue
      if (!one.isHeartbeatPacket()) {
      if (scope != null) {
        spanId = scope.getSpanId();
        scope.detach();

        one.setTraceScope(scope);
      }
      scope = null;
      // 步骤三:从dataQueue 把要发送的这个packet 移除出去
      dataQueue.removeFirst();
      // 步骤四:然后往ackQueue 里面添加这个packet
      ackQueue.addLast(one);
      packetSendTime.put(one.getSeqno(), Time.monotonicNow());
      dataQueue.notifyAll();
      }
    }

    LOG.debug("{} sending {}", this, one);

    // write out data to remote datanode
    try (TraceScope ignored = dfsClient.getTracer().
      newScope("DataStreamer#writeTo", spanId)) {
      //  将数据写出去
      one.writeTo(blockStream);
      blockStream.flush();
    } catch (IOException e) {
      errorState.markFirstNodeIfNotMarked();
      throw e;
    }
    lastPacket = Time.monotonicNow();

    // update bytesSent
    long tmpBytesSent = one.getLastByteOffsetBlock();
    if (bytesSent < tmpBytesSent) {
      bytesSent = tmpBytesSent;
    }

    if (shouldStop()) {
      continue;
    }

    // Is this block full?
    if (one.isLastPacketInBlock()) {
      // wait for the close packet has been acked
      synchronized (dataQueue) {
      while (!shouldStop() && ackQueue.size() != 0) {
        dataQueue.wait(1000);// wait for acks to arrive from datanodes
      }
      }
      if (shouldStop()) {
      continue;
      }

      endBlock();
    }
    if (progress != null) { progress.progress(); }

    // This is used by unit test to trigger race conditions.
    if (artificialSlowdown != 0 && dfsClient.clientRunning) {
      Thread.sleep(artificialSlowdown);
    }
    } catch (Throwable e) {
    ... ...
    } finally {
    if (scope != null) {
      scope.close();
      scope = null;
    }
    }
  }
  closeInternal();
}

private void initDataStreaming() {
  this.setName("DataStreamer for file " + src +
      " block " + block);
  ... ...
  response = new ResponseProcessor(nodes);
  response.start();
  stage = BlockConstructionStage.DATA_STREAMING;
}

点击response再点击ResponseProcessor,ctrl + f 查找run方法

public void run() {
    ... ...
  ackQueue.removeFirst();
  packetSendTime.remove(seqno);
  dataQueue.notifyAll();
  ... ...
}

五、Yarn源码解析

1、概述

YARN工作机制

yarn源码解析

2、Yarn客户端向RM提交作业

在wordcount程序的驱动类中点击

boolean result = job.waitForCompletion(true);

public boolean waitForCompletion(boolean verbose
                                 ) throws IOException, InterruptedException,
                                          ClassNotFoundException {
  if (state == JobState.DEFINE) {
    submit();
  }
  if (verbose) {
    monitorAndPrintJob();
  } else {
    // get the completion poll interval from the client.
    int completionPollIntervalMillis = 
      Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
      try {
        Thread.sleep(completionPollIntervalMillis);
      } catch (InterruptedException ie) {
      }
    }
  }
  return isSuccessful();
}

public void submit() 
       throws IOException, InterruptedException, ClassNotFoundException {
  ensureState(JobState.DEFINE);
  setUseNewAPI();
  connect();
  final JobSubmitter submitter = 
      getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException, 
    ClassNotFoundException {
      return submitter.submitJobInternal(Job.this, cluster);
    }
  });
  state = JobState.RUNNING;
  LOG.info("The url to track the job: " + getTrackingURL());
 }
 
 JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
  ... ...
  status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials()); 
  ... ...
}

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException;

创建提交环境,ctrl + alt +B 查找submitJob实现类,YARNRunner.java

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
  
  addHistoryToken(ts);
  // 创建提交环境:
  ApplicationSubmissionContext appContext =
    createApplicationSubmissionContext(conf, jobSubmitDir, ts);

  // Submit to ResourceManager
  try {
    // 向RM提交一个应用程序,appContext里面封装了启动mrappMaster和运行container的命令
    ApplicationId applicationId =
        resMgrDelegate.submitApplication(appContext);
    
    // 获取提交响应
    ApplicationReport appMaster = resMgrDelegate
        .getApplicationReport(applicationId);
    
    String diagnostics =
        (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics());
    if (appMaster == null
        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      throw new IOException("Failed to run job : " +
          diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  } catch (YarnException e) {
    throw new IOException(e);
  }
}

public ApplicationSubmissionContext createApplicationSubmissionContext(
    Configuration jobConf, String jobSubmitDir, Credentials ts)
    throws IOException {
  ApplicationId applicationId = resMgrDelegate.getApplicationId();

  // Setup LocalResources
  // 封装了本地资源相关路径
  Map<String, LocalResource> localResources =
      setupLocalResources(jobConf, jobSubmitDir);

  // Setup security tokens
  DataOutputBuffer dob = new DataOutputBuffer();
  ts.writeTokenStorageToStream(dob);
  ByteBuffer securityTokens =
      ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

  // Setup ContainerLaunchContext for AM container
  // 封装了启动mrappMaster和运行container的命令
  List<String> vargs = setupAMCommand(jobConf);
  ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
      jobConf, localResources, securityTokens, vargs);

  ... ...

  return appContext;
}

private List<String> setupAMCommand(Configuration jobConf) {
  List<String> vargs = new ArrayList<>(8);
  // Java进程启动命令开始
  vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
      + "/bin/java");

  Path amTmpDir =
      new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
          YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
  vargs.add("-Djava.io.tmpdir=" + amTmpDir);
  MRApps.addLog4jSystemProperties(null, vargs, conf);

  // Check for Java Lib Path usage in MAP and REDUCE configs
  warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
      "map",
      MRJobConfig.MAP_JAVA_OPTS,
      MRJobConfig.MAP_ENV);
  warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
      "map",
      MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
      MRJobConfig.MAPRED_ADMIN_USER_ENV);
  warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
      "reduce",
      MRJobConfig.REDUCE_JAVA_OPTS,
      MRJobConfig.REDUCE_ENV);
  warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
      "reduce",
      MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
      MRJobConfig.MAPRED_ADMIN_USER_ENV);

  // Add AM admin command opts before user command opts
  // so that it can be overridden by user
  String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
      MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
  warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
      MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
  vargs.add(mrAppMasterAdminOptions);

  // Add AM user command opts 用户命令参数
  String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
      MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
  warnForJavaLibPath(mrAppMasterUserOptions, "app master",
      MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
  vargs.add(mrAppMasterUserOptions);

  if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
      MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
    final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
        MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
    if (profileParams != null) {
      vargs.add(String.format(profileParams,
          ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
              + TaskLog.LogName.PROFILE));
    }
  }

  // 封装了要启动的mrappmaster全类名 
  // org.apache.hadoop.mapreduce.v2.app.MRAppMaster
  vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
  vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
      Path.SEPARATOR + ApplicationConstants.STDOUT);
  vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
      Path.SEPARATOR + ApplicationConstants.STDERR);
  return vargs;
}

向Yarn提交,点击submitJob方法中的submitApplication(),ctrl + alt +B 查找submitApplication实现类,YarnClientImpl.java

public ApplicationId
    submitApplication(ApplicationSubmissionContext appContext)
        throws YarnException, IOException {
  ApplicationId applicationId = appContext.getApplicationId();
  if (applicationId == null) {
    throw new ApplicationIdNotProvidedException(
        "ApplicationId is not provided in ApplicationSubmissionContext");
  }
  // 创建一个提交请求
  SubmitApplicationRequest request =
      Records.newRecord(SubmitApplicationRequest.class);
  request.setApplicationSubmissionContext(appContext);
  ... ...

  //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
  // 继续提交,实现类是ApplicationClientProtocolPBClientImpl
  rmClient.submitApplication(request);

  int pollCount = 0;
  long startTime = System.currentTimeMillis();
  EnumSet<YarnApplicationState> waitingStates = 
                               EnumSet.of(YarnApplicationState.NEW,
                               YarnApplicationState.NEW_SAVING,
                               YarnApplicationState.SUBMITTED);
  EnumSet<YarnApplicationState> failToSubmitStates = 
                                EnumSet.of(YarnApplicationState.FAILED,
                                YarnApplicationState.KILLED);    
  while (true) {
    try {
    // 获取提交给Yarn的反馈
      ApplicationReport appReport = getApplicationReport(applicationId);
      YarnApplicationState state = appReport.getYarnApplicationState();
      ... ...
    } catch (ApplicationNotFoundException ex) {
      // FailOver or RM restart happens before RMStateStore saves
      // ApplicationState
      LOG.info("Re-submit application " + applicationId + "with the " +
          "same ApplicationSubmissionContext");
    // 如果提交失败,则再次提交
      rmClient.submitApplication(request);
    }
  }

  return applicationId;
}

3、RM启动MRAppMaster

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-app</artifactId>
    <version>3.1.3</version>
</dependency>

查找MRAppMaster,搜索main方法

public static void main(String[] args) {
  try {
    ... ...

  // 初始化一个container
    ContainerId containerId = ContainerId.fromString(containerIdStr);
    ApplicationAttemptId applicationAttemptId =
        containerId.getApplicationAttemptId();
    if (applicationAttemptId != null) {
      CallerContext.setCurrent(new CallerContext.Builder(
          "mr_appmaster_" + applicationAttemptId.toString()).build());
    }
    long appSubmitTime = Long.parseLong(appSubmitTimeStr);
    
    // 创建appMaster对象
    MRAppMaster appMaster =
        new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
            Integer.parseInt(nodePortString),
            Integer.parseInt(nodeHttpPortString), appSubmitTime);
    ... ...
  
  // 初始化并启动AppMaster
    initAndStartAppMaster(appMaster, conf, jobUserName);
  } catch (Throwable t) {
    LOG.error("Error starting MRAppMaster", t);
    ExitUtil.terminate(1, t);
  }
}

protected static void initAndStartAppMaster(final MRAppMaster appMaster,
    final JobConf conf, String jobUserName) throws IOException,
    InterruptedException {
  ... ...
  conf.getCredentials().addAll(credentials);
  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
    // 初始化
      appMaster.init(conf);
    // 启动
      appMaster.start();
      if(appMaster.errorHappenedShutDown) {
        throw new IOException("Was asked to shut down.");
      }
      return null;
    }
  });
}

public void init(Configuration conf) {
  ... ...
  synchronized (stateChangeLock) {
    if (enterState(STATE.INITED) != STATE.INITED) {
      setConfig(conf);
      try {
      // 调用MRAppMaster中的serviceInit()方法
        serviceInit(config);
        if (isInState(STATE.INITED)) {
          //if the service ended up here during init,
          //notify the listeners
      // 如果初始化完成,通知监听器
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

ctrl + alt +B 查找serviceInit实现类,MRAppMaster.java

protected void serviceInit(final Configuration conf) throws Exception {
  ... ...
  // 创建提交路径
  clientService = createClientService(context);
  
  // 创建调度器
  clientService.init(conf);
  
  // 创建job提交RPC客户端
  containerAllocator = createContainerAllocator(clientService, context);
  ... ...
}

点击MRAppMaster.java 中的initAndStartAppMaster 方法中的appMaster.start();

public void start() {
  if (isInState(STATE.STARTED)) {
    return;
  }
  //enter the started state
  synchronized (stateChangeLock) {
    if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
      try {
        startTime = System.currentTimeMillis();
    // 调用MRAppMaster中的serviceStart()方法
        serviceStart();
        if (isInState(STATE.STARTED)) {
          //if the service started (and isn't now in a later state), notify
          LOG.debug("Service {} is started", getName());
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

protected void serviceStart() throws Exception {
  ... ...
  if (initFailed) {
    JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
    jobEventDispatcher.handle(initFailedEvent);
  } else {
    // All components have started, start the job.
  // 初始化成功后,提交Job到队列中
    startJobs();
  }
}

protected void startJobs() {
  /** create a job-start event to get this ball rolling */
  JobEvent startJobEvent = new JobStartEvent(job.getID(),
      recoveredJobStartTime);
  /** send the job-start event. this triggers the job execution. */
  // 这里将job存放到yarn队列
  // dispatcher = AsyncDispatcher
  // getEventHandler()返回的是GenericEventHandler
  dispatcher.getEventHandler().handle(startJobEvent);
}

ctrl + alt +B 查找handle实现类,GenericEventHandler.java

class GenericEventHandler implements EventHandler<Event> {
  public void handle(Event event) {
    ... ...
    try {
    // 将job存储到yarn队列中
      eventQueue.put(event);
    } catch (InterruptedException e) {
      ... ...
    }
  };
}

4、调度器任务执行(YarnChild)

查找YarnChild,搜索main方法

public static void main(String[] args) throws Throwable {
  Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  LOG.debug("Child starting");

  ... ...

  task = myTask.getTask();
  YarnChild.taskid = task.getTaskID();
  ... ...

  // Create a final reference to the task for the doAs block
  final Task taskFinal = task;
  childUGI.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws Exception {
        // use job-specified working directory
        setEncryptedSpillKeyIfRequired(taskFinal);
        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
    // 调用task执行(maptask或者reducetask)
        taskFinal.run(job, umbilical); // run the task
        return null;
      }
    });
  } 
  ... ...
}

ctrl + alt +B 查找run实现类,maptask.java

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;

  // 判断是否是MapTask
  if (isMapTask()) {
    // If there are no reducers then there won't be any sort. Hence the map 
    // phase will govern the entire attempt's progress.
  // 如果reducetask个数为零,maptask占用整个任务的100%
    if (conf.getNumReduceTasks() == 0) {
      mapPhase = getProgress().addPhase("map", 1.0f);
    } else {
      // If there are reducers then the entire attempt's progress will be 
      // split between the map phase (67%) and the sort phase (33%).
    // 如果reduceTask个数不为零,MapTask占用整个任务的66.7% sort阶段占比
      mapPhase = getProgress().addPhase("map", 0.667f);
      sortPhase  = getProgress().addPhase("sort", 0.333f);
    }
  }
  ... ...
  if (useNewApi) {
    // 调用新的API执行maptask
    runNewMapper(job, splitMetaInfo, umbilical, reporter);
  } else {
    runOldMapper(job, splitMetaInfo, umbilical, reporter);
  }
  done(umbilical, reporter);
}

void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
  ... ...

  try {
    input.initialize(split, mapperContext);
  // 运行maptask
    mapper.run(mapperContext);
  
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    input = null;
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}

//Mapper.java(和Map联系在一起)
public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
  } finally {
    cleanup(context);
  }
}

启动ReduceTask,在YarnChild.java类中的main方法中ctrl + alt +B 查找run实现类,reducetask.java

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, InterruptedException, ClassNotFoundException {
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

  ... ...

  if (useNewApi) {
  // 调用新API执行reduce
    runNewReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  } else {
    runOldReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  }

  shuffleConsumerPlugin.close();
  done(umbilical, reporter);
}

void runNewReducer(JobConf job,
                   final TaskUmbilicalProtocol umbilical,
                   final TaskReporter reporter,
                   RawKeyValueIterator rIter,
                   RawComparator<INKEY> comparator,
                   Class<INKEY> keyClass,
                   Class<INVALUE> valueClass
                   ) throws IOException,InterruptedException, 
                            ClassNotFoundException {
  ... ...
  try {
    // 调用reducetask的run方法
    reducer.run(reducerContext);
  } finally {
    trackedRW.close(reducerContext);
  }
}

// Reduce.java
public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
      // If a back up store is used, reset it
      Iterator<VALUEIN> iter = context.getValues().iterator();
      if(iter instanceof ReduceContext.ValueIterator) {
        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
      }
    }
  } finally {
    cleanup(context);
  }
}

六、MapReduce源码解析

之前有介绍

1、Job提交流程源码和切片源码详解

//Job提交流程源码详解
waitForCompletion()

submit();

// 1建立连接
  connect();  
    // 1)创建提交Job的代理
    new Cluster(getConfiguration());
      // (1)判断是本地运行环境还是yarn集群运行环境
      initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)

  // 1)创建给集群提交数据的Stag路径
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

  // 2)获取jobid ,并创建Job路径
  JobID jobId = submitClient.getNewJobID();

  // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);  
  rUploader.uploadFiles(job, jobSubmitDir);

  // 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
    maps = writeNewSplits(job, jobSubmitDir);
    input.getSplits(job);

  // 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
  conf.writeXml(out);

  // 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

FileInputFormat 切片源码解析(input.getSplits(job))

2、MapTask & ReduceTask 源码解析

//MapTask源码解析流程
=================== MapTask ===================
context.write(k, NullWritable.get());   //自定义的map方法的写出,进入
  output.write(key, value);  
  //MapTask727行,收集方法,进入两次 
    collector.collect(key, value,partitioner.getPartition(key, value, partitions));
      HashPartitioner(); //默认分区器
    collect()  //MapTask1082行 map端所有的kv全部写出后会走下面的close方法
      close() //MapTask732行
      collector.flush() // 溢出刷写方法,MapTask735行,提前打个断点,进入
        sortAndSpill() //溢写排序,MapTask1505行,进入
          sorter.sort()   QuickSort //溢写排序方法,MapTask1625行,进入
        mergeParts(); //合并文件,MapTask1527行,进入
      
        collector.close(); //MapTask739行,收集器关闭,即将进入ReduceTask
//ReduceTask源码解析流程
=================== ReduceTask ===================
if (isMapOrReduce())  //reduceTask324行,提前打断点
initialize()   // reduceTask333行,进入
init(shuffleContext);  // reduceTask375行,走到这需要先给下面的打断点
        totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl第120行,提前打断点
         merger = createMergeManager(context); //合并方法,Shuffle第80行
          // MergeManagerImpl第232 235行,提前打断点
          this.inMemoryMerger = createInMemoryMerger(); //内存合并
          this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
        rIter = shuffleConsumerPlugin.run();
            eventFetcher.start();  //开始抓取数据,Shuffle第107行,提前打断点
            eventFetcher.shutDown();  //抓取结束,Shuffle第141行,提前打断点
            copyPhase.complete();   //copy阶段完成,Shuffle第151行
            taskStatus.setPhase(TaskStatus.Phase.SORT);  //开始排序阶段,Shuffle第152行
          sortPhase.complete();   //排序阶段完成,即将进入reduce阶段 reduceTask382行
        reduce();  //reduce阶段调用的就是我们自定义的reduce方法,会被调用多次
          cleanup(context); //reduce完成之前,会最后调用一次Reducer里面的cleanup方法

七、Hadoop源码编译

1、环境准备

源码地址:https://hadoop.apache.org/release/3.1.3.html

具体可以看build.txt文件,修改源码中的HDFS副本数的设置

回到Centos系统,Jar包准备(Hadoop源码、JDK8、Maven、Ant 、Protobuf)

  • hadoop-3.1.3-src.tar.gz
  • jdk-8u212-linux-x64.tar.gz
  • apache-maven-3.6.3-bin.tar.gz
  • protobuf-2.5.0.tar.gz(序列化的框架)
  • cmake-3.17.0.tar.gz

2、工具包安装

注意:所有操作必须在root用户下完成

# 分别创建/opt/software/hadoop_source和/opt/module/hadoop_source路径
# 上传软件包到指定的目录,例如 /opt/software/hadoop_source
# 解压软件包指定的目录,例如: /opt/module/hadoop_source
tar -zxvf apache-maven-3.6.3-bin.tar.gz -C  /opt/module/hadoop_source/
tar -zxvf cmake-3.17.0.tar.gz -C  /opt/module/hadoop_source/
tar -zxvf hadoop-3.1.3-src.tar.gz -C  /opt/module/hadoop_source/
tar -zxvf protobuf-2.5.0.tar.gz -C  /opt/module/hadoop_source/

# 安装JDK
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/hadoop_source/
vim /etc/profile.d/my_env.sh
# 输入如下内容:
#JAVA_HOME
export JAVA_HOME=/opt/module/hadoop_source/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

# 刷新JDK环境变量
source /etc/profile
java -version

# 配置maven环境变量,maven镜像,并验证
vim /etc/profile.d/my_env.sh
#MAVEN_HOME
MAVEN_HOME=/opt/module/hadoop_source/apache-maven-3.6.3
PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin

source /etc/profile
# 修改maven的镜像
vi conf/settings.xml
# 在 mirrors节点中添加阿里云镜像
<mirrors>
    <mirror>
         <id>nexus-aliyun</id>
         <mirrorOf>central</mirrorOf>
         <name>Nexus aliyun</name>
              <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
</mirrors>

# 验证maven安装是否成功
mvn -version 

# 安装相关的依赖(注意安装顺序不可乱,可能会出现依赖找不到问题)
# 安装gcc make
yum install -y gcc* make
# 安装压缩工具
yum -y install snappy*  bzip2* lzo* zlib*  lz4* gzip*
# 安装一些基本工具
yum -y install openssl* svn ncurses* autoconf automake libtool
# 安装扩展源,才可安装zstd
yum -y install epel-release
# 安装zstd
yum -y install *zstd*

# 手动安装cmake
# 在解压好的cmake目录下,执行./bootstrap进行编译,此过程需一小时请耐心等待
./bootstrap
# 执行安装
make && make install 
cmake -version

# 安装protobuf,进入到解压后的protobuf目录 
# 依次执行下列命令 --prefix 指定安装到当前目录
./configure --prefix=/opt/module/hadoop_source/protobuf-2.5.0
make && make install
# 配置环境变量
vim /etc/profile.d/my_env.sh
# 输入如下内容
PROTOC_HOME=/opt/module/hadoop_source/protobuf-2.5.0
PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin:$PROTOC_HOME/bin

source /etc/profile
protoc --version

3、编译源码

# 进入解压后的Hadoop源码目录下
#开始编译
mvn clean package -DskipTests -Pdist,native -Dtar
# 注意:第一次编译需要下载很多依赖jar包,编译时间会很久,预计1小时左右,最终成功是全部SUCCESS

# 成功的64位hadoop包在/opt/module/hadoop_source/hadoop-3.1.3-src/hadoop-dist/target下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1412536.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

day3C++

设计一个Per类&#xff0c;类中包含私有成员:姓名、年龄、指针成员身高、体重&#xff0c;再设计一个Stu类&#xff0c;类中包含私有成员:成绩、Per类对象p1&#xff0c;设计这两个类的构造函数、析构函数和拷贝构造函数。 #include <iostream>using namespace std;clas…

Oracle篇—分区表和分区索引的介绍和分类(第一篇,总共五篇)

☘️博主介绍☘️&#xff1a; ✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux&#xff0c;也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章&#xff0c;并且也会默默的点赞收藏加关注❣…

前端canvas项目实战——简历制作网站(二)——右侧属性栏(颜色)

目录 前言一、效果展示二、实现步骤1. 实现一个自定义的选色板2. 创建属性工厂&#xff0c;为每个对象定制属性3. 为canvas对象注册监听器&#xff0c;点击不同对象时更新属性列表 三、Show u the code后记 前言 上一篇博文中&#xff0c;我们实现了左侧工具栏&#xff0c;通过…

数字安全网:深入解析服务容错的三大绝招“

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 数字安全网&#xff1a;深入解析服务容错的三大绝招" 前言什么是服务雪崩降级&#xff1a;优雅的后退熔断&#xff1a;保卫系统的守护者流量整形&#xff1a;平衡与优化 前言 想象一下&#xff…

【java】常见的面试问题

目录 一、异常 1、 throw 和 throws 的区别&#xff1f; 2、 final、finally、finalize 有什么区别&#xff1f; 3、try-catch-finally 中哪个部分可以省略&#xff1f; 4、try-catch-finally 中&#xff0c;如果 catch 中 return 了&#xff0c;finally 还会执行吗&#…

Tosei 自助网络店铺管理系统network_test.php_RCE漏洞复现

简介 Tosei 自助洗衣机是日本一家公司的产品,在 network_test.php 文件存在命令执行 漏洞复现 FOFA语法: body="tosei_login_check.php" 主要是日本 访问界面如下所示: 验证POC: /cgi-bin/network_test.php 拼接访问url: https://ip:port/cgi-bin/network_tes…

(四)ros中ros::init(argc,argv,”节点名称”)。中的节点名称和launch文件中的节点名称关系。

1、使用rosrun 命令执行ros程序: Rosrun <功能包名称> <节点名称>。其中”节点名称”为ros::init中的ros节点名称。 2、使用launch 文件 name 参数为ros节点名称。 如果采样launch启动ros程序&#xff0c;launch文件中的ros节点名称会替换ros::init中的节点名称…

深入理解sysbench工具

文章目录 一、概述二、安装2.1、源码编译安装2.2、命令行安装2.3、安装确认 三、重要参数详解3.1、查询支持的参数3.2、重要参数说明 四、实例4.1、CPU性能测试4.2、内存性能测试4.3、IO性能测试4.4、POSIX线程性能测试4.5、多线程调度测试 团队博客: 汽车电子社区 一、概述 sy…

华为路由器IPv6基础配置

1. R2的两个接口均采用静态IPv6地址配置方法 2. R1的GigabitEthernet0/0/3接口采用无状态 地址配置 3. R3的GigabitEthernet0/0/3接口采用DHCPv6 的方式配置IPv6地址 R1配置 ipv6 #全局使能IPv6 interface GigabitEthernet0/0/0ipv6 enable ipv6 address auto link-local #为…

学习JavaEE的日子 Day17 面向对象版学生管理系统

Day17 面向对象版学生管理系统 代码已放在资源里&#xff0c;有需要可自取&#xff01;&#xff01;&#xff01; 1.需求分析 管理的是一个一个的学生对象 学生类&#xff1a; public class Student{String name&#xff1b;char sex;int age;String classId;//班级号String …

1.26 day3 C++

设计一个Per类&#xff0c;类中包含私有成员:姓名、年龄、指针成员身高、体重&#xff0c;再设计一个Stu类&#xff0c;类中包含私有成员:成绩、Per类对象p1&#xff0c;设计这两个类的构造函数、析构函数和拷贝构造函数。 #include <iostream>using namespace std; cla…

大文件传输之以太网UDP传输延迟解决方案

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业最宝贵的资产之一。随着企业规模的扩大和业务的全球化&#xff0c;大文件传输的需求日益增长&#xff0c;它不仅关系到企业内部数据的高效管理&#xff0c;也是与外部合作伙伴进行有效沟通的关键。然而&#xff0c;大文件…

aardio - 调用C编写的dll时的不同参数类型处理方法

import console; //生成 DLL import tcc; var c tcc(); c.code /** #include <stdio.h> #include <stdlib.h> #include <stdbool.h>#ifdef __cplusplus #define EXTERN_C extern "C" __declspec(dllexport) #else #define EXTERN_C __declsp…

23111 C++ day3

思维导图 设计一个Per类&#xff0c;类中包含私有成员:姓名、年龄、指针成员身高、体重&#xff0c;再设计一个Stu类&#xff0c;类中包含私有成员:成绩、Per类对象p1&#xff0c;设计这两个类的构造函数、析构函数和拷贝构造函数。 #include <iostream>using namespac…

零基础学编程工具简介,中文编程开发工具

零基础学编程工具简介&#xff0c;中文编程开发工具 一、前言 零基础自学编程&#xff0c;中文编程工具下载&#xff0c;中文编程工具构件之扩展系统菜单构件教程 编程系统化教程链接https://jywxz.blog.csdn.net/article/details/134073098?spm1001.2014.3001.5502 给大家…

Ansys APDL如何查看已经施加的约束和载荷

目录 查看当前已经施加的载荷和约束 查看具体的值 查看已经定义的参数 查看当前已经施加的载荷和约束 在菜单栏选择&#xff1a; 通常在有限元单元的视图下&#xff1a; 选择SOLID MODEL LOADS&#xff08;如下&#xff09;可查看当前已经施加的载荷和约束。 也可以看所有…

Redis核心技术与实战【学习笔记】 - 1.Redis为什么高性能

作为键值数据库&#xff0c;Redis 的应用非常广泛&#xff0c;如果你是后端工程师&#xff0c;我猜你出去面试&#xff0c;八成都会被问到与它相关的性能问题。比如说&#xff0c;为了保证数据的可靠性&#xff0c;Redis 需要在磁盘上读写 AOF 和 RDB&#xff0c;但在高并发场景…

使用Halcon匹配助手进行模板匹配

使用Halcon匹配助手进行模板匹配 文章目录 使用Halcon匹配助手进行模板匹配1. 选择匹配方法2. 创建模板3. 检测模板4. 优化匹配速度 使用Halcon匹配助手&#xff0c;可以很方便地选择模板图像&#xff0c;设置匹配参数&#xff0c;并测试匹配结果.Halcon匹配助手支持下面几种匹…

#常见问题总结#在docker中跑前端vue项目

目录 前言一、no such file or directory, open...总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 记录在docker中跑前端项目过程中&#xff0c;我遇到的问题以及解决方法 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一…

2022年至2023年广东省职业院校技能大赛高职组“信息安全管理与评估”赛项样题

2022 年至 2023 年广东省职业院校技能大赛高职组“信息安全管理与评估”赛项样题 一、 第一阶段竞赛项目试题 本文件为信息安全管理与评估项目竞赛第一阶段试题&#xff0c;第一阶段内容包 括&#xff1a;网络平台搭建、网络安全设备配置与防护。 本阶段比赛时间为 180 分钟…