Hadoop源码阅读(三):HDFS上传

news2024/9/28 17:24:44

说明:
1.Hadoop版本:3.1.3
2.阅读工具:IDEA 2023.1.2
3.源码获取:Index of /dist/hadoop/core/hadoop-3.1.3 (apache.org)
4.工程导入:下载源码之后得到 hadoop-3.1.3-src.tar.gz 压缩包,在当前目录打开PowerShell,使用tar -zxvf指令解压即可,然后使用IDEA打开hadoop-3.1.3-src文件夹,要注意配置好Maven或Gradle仓库,否则jar包导入会比较慢
5.参考课程:www.bilibili.com/video/BV1Qp…

HDFS上传

一个简单的上传代码:

public void test() throws IOException {

	FSDataOutputStream fos = fs.create(new Path("/input"));

	fos.write("hello world".getBytes());
}

可以看到,首先创建了一个FSDataOutputStream,然后向其中写数据; 接下来就分为 create创建过程 和 write上传过程 分别进行源码阅读解析

create创建过程

1.客户端向NN发送创建请求

首先进入create方法中,来到FileSystem.java:

找到create方法,继续进入,直到找到静态方法create

因此返回到该静态方法的调用:

ctrl+alt+B查找该静态方法的实现类:

进入DistributedFileSystem中:

继续向下查找:

可以看到在doCall方法中创建了一个输出流对象;

继续进入create方法,来到DFSClient.java中:

不断向下查找,找到newStreamForCreate方法:

进入newStreamForCreate方法,来到DFSOutputStream.java

这里客户端将创建请求通过RPC通信发送给NN进行处理

开启线程

2.NN处理来自客户端的创建请求

newStreamForCreate方法中进入create方法,来到ClientProtocol.java:

查找其实现类:

进入NameNodeRpcServer,create方法如下:

 @Override // ClientProtocol
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
      throws IOException {
    checkNNStartup(); //检查NN是否启动
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
          +src+" for "+clientName+" at "+clientMachine);
    }
    if (!checkPathLength(src)) { //检查路径长度
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
    if (cacheEntry != null && cacheEntry.isSuccess()) { //缓存相关检查
      return (HdfsFileStatus) cacheEntry.getPayload();
    }

    HdfsFileStatus status = null;
    try {
      PermissionStatus perm = new PermissionStatus(getRemoteUser()
          .getShortUserName(), null, masked);
      //开启文件(重要)
      status = namesystem.startFile(src, perm, clientName, clientMachine,
          flag.get(), createParent, replication, blockSize, supportedVersions,
          ecPolicyName, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
  }

接下来进入startFile方法,来到FSNamesystem.java:

进入startFileInt

将src(文件路径)封装到INodesInPath中;

对于INodesInPath类的解释:Contains INodes information resolved from a given path.

首先我们需要明确INodes类的概念:

INodes是一个抽象类,官方对于其的解释如下:

简单来说一个基本的INode类是一种保留在内存中的文件/块层次结构的表示形式,包含文件和目录索引节点的公共字段

可以看到INodes是最底层的一个类,保存一些文件目录共有的属性,而INodesInPath类则保存了从给定的路径解析出的INode信息;

接下来定位到startFile

进入startFile

  • 首先对文件路径是否存在进行校验:

进入getLastINode

进入getINode

可以看到,i=-1时,return inodes[inodes.length-1];

也就是说,获取最后位置上的inode,如果有,说明文件路径已经存在;

接下来再判断是否允许覆写:

如果不允许覆写,则会抛出异常,告知文件路径已存在,不允许重复上传文件;

  • 然后判断是否存在父目录:

如果父目录存在,则向其中添加文件元数据信息(addFile方法)

进入addFile方法:

进入addINode

将数据写入到INode的目录树中;至此文件目录创建完毕

3.DataStreamer启动流程

NN处理完成后,再次回到客户端,启动相应线程;

打开DFSOutputStream.java,找到newStreamForCreate方法,NN完成创建请求后,进行输出流的创建:

定位到DFSOutputStream

计算chunk大小(Directory => File => Block(128M) => packet(64K) => chunk(chunk 512byte + chunksum 4byte))

返回到newStreamForCreate方法,进入out.start()

继续进入:

继续进入DataStreamer

进入Daemon

可以看到,out.start方法开启了一个线程,因此回到DataStreamer,搜索run方法:

如果dataQueue中没有数据,代码会进行阻塞;

如果dataQueue不为空,则从其中取出packet

write上传过程

1.向DataStreamer的队列里面写数据

create阶段启动了DataStreamer,在write阶段向其中写数据;

进入write方法,到FilterOutputStream.java中:

一直前进,直到抽象方法write

ctrl+alt+B查找其实现类:

进入FSOutputSummer.java,定位到write方法:

进入flushBuffer方法,顾名思义即为刷写缓冲区:

进入writeChecksumChunks方法:

进入writeChunk方法(将chunk写入数据队列):

是一个抽象方法,因此查找其实现类:

进入DFSOutputStream.java,查看writeChunk方法的具体实现逻辑,如下:

  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    writeChunkPrepare(len, ckoff, cklen);

    currentPacket.writeChecksum(checksum, ckoff, cklen); //往packet里面写chunk的校验和 4byte
    currentPacket.writeData(b, offset, len); // 往packet里面写一个chunk  512byte
    // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了
    currentPacket.incNumChunks();
    getStreamer().incBytesCurBlock(len);

    //如果packet已经满了,则将其放入队列等待传输
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        getStreamer().getBytesCurBlock() == blockSize) {
      enqueueCurrentPacketFull(); 
    }
  }

进入enqueueCurrentPacketFull方法:

进入enqueueCurrentPacket方法:

进入waitAndQueuePacket方法:

void waitAndQueuePacket(DFSPacket packet) throws IOException {
    synchronized (dataQueue) {
      try {
        // 如果队列满了,则等待
        boolean firstWait = true;
        try {
          while (!streamerClosed && dataQueue.size() + ackQueue.size() >
              dfsClient.getConf().getWriteMaxPackets()) {
            if (firstWait) {
              Span span = Tracer.getCurrentSpan();
              if (span != null) {
                span.addTimelineAnnotation("dataQueue.wait");
              }
              firstWait = false;
            }
            try {
              dataQueue.wait(); //等待队列有充足的空间
            } catch (InterruptedException e) {
              // If we get interrupted while waiting to queue data, we still need to get rid
              // of the current packet. This is because we have an invariant that if
              // currentPacket gets full, it will get queued before the next writeChunk.
              //
              // Rather than wait around for space in the queue, we should instead try to
              // return to the caller as soon as possible, even though we slightly overrun
              // the MAX_PACKETS length.
              Thread.currentThread().interrupt();
              break;
            }
          }
        } finally {
          Span span = Tracer.getCurrentSpan();
          if ((span != null) && (!firstWait)) {
            span.addTimelineAnnotation("end.wait");
          }
        }
        checkClosed();
        //如果队列没满,则向队列中添加数据
        queuePacket(packet);
      } catch (ClosedChannelException ignored) {
      }
    }
  }

进入queuePacket方法(向队列中添加数据的逻辑),来到DataStreamer.java中:

2.建立管道

2.1机架感知(确定block的存储位置)

Ctrl + n全局查找DataStreamer,搜索run方法:

  @Override
  public void run() {
    long lastPacket = Time.monotonicNow();
    TraceScope scope = null;
    while (!streamerClosed && dfsClient.clientRunning) {
      // if the Responder encountered an error, shutdown Responder
      if (errorState.hasError()) {
        closeResponder();
      }

      DFSPacket one;
      try {
        // process datanode IO errors if any
        boolean doSleep = processDatanodeOrExternalError();

        final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
        //步骤一:等待要发送的packet到来
        synchronized (dataQueue) {
          // wait for a packet to be sent.
          long now = Time.monotonicNow();
          while ((!shouldStop() && dataQueue.size() == 0 &&
              (stage != BlockConstructionStage.DATA_STREAMING ||
                  now - lastPacket < halfSocketTimeout)) || doSleep) {
            long timeout = halfSocketTimeout - (now-lastPacket);
            timeout = timeout <= 0 ? 1000 : timeout;
            timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                timeout : 1000;
            try {
              //如果dataQueue中没有数据,代码会阻塞在这里
              dataQueue.wait(timeout);
            } catch (InterruptedException  e) {
              LOG.warn("Caught exception", e);
            }
            doSleep = false;
            now = Time.monotonicNow();
          }
          if (shouldStop()) {
            continue;
          }
          // 获取要发送的数据包
          if (dataQueue.isEmpty()) {
            one = createHeartbeatPacket();
          } 
          else {
            try {
              backOffIfNecessary();
            } catch (InterruptedException e) {
              LOG.warn("Caught exception", e);
            }
            //如果数据队列不为空,则从其中取出packet
            one = dataQueue.getFirst(); 
            SpanId[] parents = one.getTraceParents();
            if (parents.length > 0) {
              scope = dfsClient.getTracer().
                  newScope("dataStreamer", parents[0]);
              scope.getSpan().setParents(parents);
            }
          }
        }

        //步骤二:从NN获取新的block
        if (LOG.isDebugEnabled()) {
          LOG.debug("stage=" + stage + ", " + this);
        }
        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
          LOG.debug("Allocating new block: {}", this);
          //向NN申请block并建立数据管道(Pipeline)
          setPipeline(nextBlockOutputStream());
          //启动ResponseProcessor用来监听packet发送是否成功
          initDataStreaming();
        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
          LOG.debug("Append to block {}", block);
          setupPipelineForAppendOrRecovery();
          if (streamerClosed) {
            continue;
          }
          initDataStreaming();
        }

        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
        if (lastByteOffsetInBlock > stat.getBlockSize()) {
          throw new IOException("BlockSize " + stat.getBlockSize() +
              " < lastByteOffsetInBlock, " + this + ", " + one);
        }

        if (one.isLastPacketInBlock()) {
          // wait for all data packets have been successfully acked
          synchronized (dataQueue) {
            while (!shouldStop() && ackQueue.size() != 0) {
              try {
                // wait for acks to arrive from datanodes
                dataQueue.wait(1000);
              } catch (InterruptedException  e) {
                LOG.warn("Caught exception", e);
              }
            }
          }
          if (shouldStop()) {
            continue;
          }
          stage = BlockConstructionStage.PIPELINE_CLOSE;
        }

        // 步骤三:发送packet
        SpanId spanId = SpanId.INVALID;
        synchronized (dataQueue) {
          // move packet from dataQueue to ackQueue
          if (!one.isHeartbeatPacket()) {
            if (scope != null) {
              spanId = scope.getSpanId();
              scope.detach();
              one.setTraceScope(scope);
            }
            scope = null;
            dataQueue.removeFirst(); //从dataQueue 把要发送的这个packet 移除出去
            ackQueue.addLast(one); //ackQueue 里面添加这个packet
            packetSendTime.put(one.getSeqno(), Time.monotonicNow());
            dataQueue.notifyAll();
          }
        }

        LOG.debug("{} sending {}", this, one);

        // 步骤四:向DN中写数据
        try (TraceScope ignored = dfsClient.getTracer().
            newScope("DataStreamer#writeTo", spanId)) {
          one.writeTo(blockStream); //写出数据
          blockStream.flush();
        } catch (IOException e) {
          // HDFS-3398 treat primary DN is down since client is unable to
          // write to primary DN. If a failed or restarting node has already
          // been recorded by the responder, the following call will have no
          // effect. Pipeline recovery can handle only one node error at a
          // time. If the primary node fails again during the recovery, it
          // will be taken out then.
          errorState.markFirstNodeIfNotMarked();
          throw e;
        }
        lastPacket = Time.monotonicNow();

        // update bytesSent
        long tmpBytesSent = one.getLastByteOffsetBlock();
        if (bytesSent < tmpBytesSent) {
          bytesSent = tmpBytesSent;
        }

        if (shouldStop()) {
          continue;
        }

        // Is this block full?
        if (one.isLastPacketInBlock()) {
          // wait for the close packet has been acked
          synchronized (dataQueue) {
            while (!shouldStop() && ackQueue.size() != 0) {
              dataQueue.wait(1000);// wait for acks to arrive from datanodes
            }
          }
          if (shouldStop()) {
            continue;
          }

          endBlock();
        }
        if (progress != null) { progress.progress(); }

        // This is used by unit test to trigger race conditions.
        if (artificialSlowdown != 0 && dfsClient.clientRunning) {
          Thread.sleep(artificialSlowdown);
        }
      } catch (Throwable e) {
        // Log warning if there was a real error.
        if (!errorState.isRestartingNode()) {
          // Since their messages are descriptive enough, do not always
          // log a verbose stack-trace WARN for quota exceptions.
          if (e instanceof QuotaExceededException) {
            LOG.debug("DataStreamer Quota Exception", e);
          } else {
            LOG.warn("DataStreamer Exception", e);
          }
        }
        lastException.set(e);
        assert !(e instanceof NullPointerException);
        errorState.setInternalError();
        if (!errorState.isNodeMarked()) {
          // Not a datanode issue
          streamerClosed = true;
        }
      } finally {
        if (scope != null) {
          scope.close();
          scope = null;
        }
      }
    }
    closeInternal();
  }

进入nextBlockOutputStream(第68行):

进入locateFollowingBlock

进入addBlock

进入addBlock,来到ClientProtocol类:

因此可以判断,该方法是通过NN的客户端代理来实现的

查找其实现类:

进入NameNodeRpcServer,定位到addBlock:

进入getAdditionalBlock

选择block的存储位置;

进入chooseTargetForNewBlock

进入chooseTarget4NewBlock

进入chooseTarget

继续进入chooseTarget

可以看到其是一个抽象类,因此查找其实现类:

进入BlockPlacementPolicyDefault.java:

进入chooseTarget

进入chooseTarget

进入chooseTargetInOrder,即机架感知的逻辑:

  protected Node chooseTargetInOrder(int numOfReplicas, 
                                 Node writer,
                                 final Set<Node> excludedNodes,
                                 final long blocksize,
                                 final int maxNodesPerRack,
                                 final List<DatanodeStorageInfo> results,
                                 final boolean avoidStaleNodes,
                                 final boolean newBlock,
                                 EnumMap<StorageType, Integer> storageTypes)
                                 throws NotEnoughReplicasException {
    final int numOfResults = results.size();
    if (numOfResults == 0) {
      //第一个block存储在当前节点
      DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
          storageTypes, true);

      writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                                     : null;

      if (--numOfReplicas == 0) {
        return writer;
      }
    }
    final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
    if (numOfResults <= 1) {
      //第二个block存储在另外一个机架
      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
      if (--numOfReplicas == 0) {
        return writer;
      }
    }
    if (numOfResults <= 2) {
      final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
      if (clusterMap.isOnSameRack(dn0, dn1)) {
        //如果第一个和第二个在同一个机架,那么第三个放在其他机架
        chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
            results, avoidStaleNodes, storageTypes);
      } else if (newBlock){
        //如果是新块,和第二个块存储在同一个机架
        chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
            results, avoidStaleNodes, storageTypes);
      } else {
        //如果不是新块,放在当前机架
        chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
            results, avoidStaleNodes, storageTypes);
      }
      if (--numOfReplicas == 0) {
        return writer;
      }
    }
    chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
    return writer;
  }
2.2 socket发送

回到nextBlockOutputStream

进入createBlockOutputStream

从注释可以看出,该方法的主要功能是和管道中的第一个DN建立连接;

  boolean createBlockOutputStream(DatanodeInfo[] nodes,
      StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
      long newGS, boolean recoveryFlag) {
    if (nodes.length == 0) {
      LOG.info("nodes are empty for write pipeline of " + block);
      return false;
    }
    String firstBadLink = "";
    boolean checkRestart = false;
    if (LOG.isDebugEnabled()) {
      LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
    }

    // persist blocks on namenode on next flush
    persistBlocks.set(true);

    int refetchEncryptionKey = 1;
    while (true) {
      boolean result = false;
      DataOutputStream out = null;
      try {
        assert null == s : "Previous socket unclosed";
        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
        //和DN创建socket连接
        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
        long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
    	//输出流,用于写数据到DN
        OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
        //输入流,用于读取写数据到DN的结果
        InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
        unbufOut = saslStreams.out;
        unbufIn = saslStreams.in;
        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
            DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
        blockReplyStream = new DataInputStream(unbufIn);

        //
        // Xmit header info to datanode
        //

        BlockConstructionStage bcs = recoveryFlag ?
            stage.getRecoveryStage() : stage;

        // We cannot change the block length in 'block' as it counts the number
        // of bytes ack'ed.
        ExtendedBlock blockCopy = block.getCurrentBlock();
        blockCopy.setNumBytes(stat.getBlockSize());

        boolean[] targetPinnings = getPinnings(nodes);
        // 发送数据
        new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
            nodes.length, block.getNumBytes(), bytesSent, newGS,
            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
            (targetPinnings != null && targetPinnings[0]), targetPinnings,
            nodeStorageIDs[0], nodeStorageIDs);

        // receive ack for connect
        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
            PBHelperClient.vintPrefixed(blockReplyStream));
        Status pipelineStatus = resp.getStatus();
        firstBadLink = resp.getFirstBadLink();

        // Got an restart OOB ack.
        // If a node is already restarting, this status is not likely from
        // the same node. If it is from a different node, it is not
        // from the local datanode. Thus it is safe to treat this as a
        // regular node error.
        if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
            !errorState.isRestartingNode()) {
          checkRestart = true;
          throw new IOException("A datanode is restarting.");
        }

        String logInfo = "ack with firstBadLink as " + firstBadLink;
        DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);

        assert null == blockStream : "Previous blockStream unclosed";
        blockStream = out;
        result =  true; // success
        errorState.resetInternalError();
        lastException.clear();
        // remove all restarting nodes from failed nodes list
        failed.removeAll(restartingNodes);
        restartingNodes.clear();
      } catch (IOException ie) {
        if (!errorState.isRestartingNode()) {
          LOG.info("Exception in createBlockOutputStream " + this, ie);
        }
        if (ie instanceof InvalidEncryptionKeyException &&
            refetchEncryptionKey > 0) {
          LOG.info("Will fetch a new encryption key and retry, "
              + "encryption key was invalid when connecting to "
              + nodes[0] + " : " + ie);
          // The encryption key used is invalid.
          refetchEncryptionKey--;
          dfsClient.clearDataEncryptionKey();
          // Don't close the socket/exclude this node just yet. Try again with
          // a new encryption key.
          continue;
        }

        // find the datanode that matches
        if (firstBadLink.length() != 0) {
          for (int i = 0; i < nodes.length; i++) {
            // NB: Unconditionally using the xfer addr w/o hostname
            if (firstBadLink.equals(nodes[i].getXferAddr())) {
              errorState.setBadNodeIndex(i);
              break;
            }
          }
        } else {
          assert !checkRestart;
          errorState.setBadNodeIndex(0);
        }

        final int i = errorState.getBadNodeIndex();
        // Check whether there is a restart worth waiting for.
        if (checkRestart) {
          errorState.initRestartingNode(i,
              "Datanode " + i + " is restarting: " + nodes[i],
              shouldWaitForRestart(i));
        }
        errorState.setInternalError();
        lastException.set(ie);
        result =  false;  // error
      } finally {
        if (!result) {
          IOUtils.closeSocket(s);
          s = null;
          IOUtils.closeStream(out);
          IOUtils.closeStream(blockReplyStream);
          blockReplyStream = null;
        }
      }
      return result;
    }
  }

进入writeBlock

进入send:

通过flush刷写数据;

2.3.socket接收

数据接收是DN的任务,因此进入DataXceiverServer.java,定位到run方法:

接收socket请求;

客户端每发送一个block,都启动一个DataXceiver去处理block

进入DataXceiver,定位到run方法:

读取数据的操作类型;

根据操作类型处理数据;

进入processOp

可以看到不同的操作类型

进入opWriteBlock(写数据):

Ctrl +alt +b 查找writeBlock的实现类,进入DataXceiver.java:

创建一个BlockReceiver;

向下游socket中发送数据

接下来进入getBlockReceiver

进入BlockReceiver

创建管道;

进入createRbw

进入FsDatasetImpl.java:

进入createRbw

通过createRbwFile创建file

3.客户端接收DN的应答

回到DataStreamer.java,定位到run:

通过initDataStreaming方法来启动ResponseProcessor,用于监听packet发送是否成功;

创建ResponseProcessor并启动线程;

进入ResponseProcessor,定位到run:

    @Override
    public void run() {

      setName("ResponseProcessor for block " + block);
      PipelineAck ack = new PipelineAck();

      TraceScope scope = null;
      while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
        // 处理来自DN的应答
        try {
          // 从管道中读取一个ack
          ack.readFields(blockReplyStream);
          if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
            Long begin = packetSendTime.get(ack.getSeqno());
            if (begin != null) {
              long duration = Time.monotonicNow() - begin;
              if (duration > dfsclientSlowLogThresholdMs) {
                LOG.info("Slow ReadProcessor read fields for block " + block
                    + " took " + duration + "ms (threshold="
                    + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
                    + ", targets: " + Arrays.asList(targets));
              }
            }
          }

          if (LOG.isDebugEnabled()) {
            LOG.debug("DFSClient {}", ack);
          }

          long seqno = ack.getSeqno();
          // processes response status from datanodes.
          ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
          for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
            final Status reply = PipelineAck.getStatusFromHeader(ack
                .getHeaderFlag(i));
            if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
                PipelineAck.ECN.CONGESTED) {
              congestedNodesFromAck.add(targets[i]);
            }
            // Restart will not be treated differently unless it is
            // the local node or the only one in the pipeline.
            if (PipelineAck.isRestartOOBStatus(reply)) {
              final String message = "Datanode " + i + " is restarting: "
                  + targets[i];
              errorState.initRestartingNode(i, message,
                  shouldWaitForRestart(i));
              throw new IOException(message);
            }
            // node error
            if (reply != SUCCESS) {
              errorState.setBadNodeIndex(i); // mark bad datanode
              throw new IOException("Bad response " + reply +
                  " for " + block + " from datanode " + targets[i]);
            }
          }

          if (!congestedNodesFromAck.isEmpty()) {
            synchronized (congestedNodes) {
              congestedNodes.clear();
              congestedNodes.addAll(congestedNodesFromAck);
            }
          } else {
            synchronized (congestedNodes) {
              congestedNodes.clear();
              lastCongestionBackoffTime = 0;
            }
          }

          assert seqno != PipelineAck.UNKOWN_SEQNO :
              "Ack for unknown seqno should be a failed ack: " + ack;
          if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
            continue;
          }

          // 标志成功传输的ack
          DFSPacket one;
          synchronized (dataQueue) {
            one = ackQueue.getFirst();
          }
          if (one.getSeqno() != seqno) {
            throw new IOException("ResponseProcessor: Expecting seqno " +
                " for block " + block +
                one.getSeqno() + " but received " + seqno);
          }
          isLastPacketInBlock = one.isLastPacketInBlock();

          // Fail the packet write for testing in order to force a
          // pipeline recovery.
          if (DFSClientFaultInjector.get().failPacket() &&
              isLastPacketInBlock) {
            failPacket = true;
            throw new IOException(
                "Failing the last packet for testing.");
          }

          // update bytesAcked
          block.setNumBytes(one.getLastByteOffsetBlock());

          synchronized (dataQueue) {
            scope = one.getTraceScope();
            if (scope != null) {
              scope.reattach();
              one.setTraceScope(null);
            }
            lastAckedSeqno = seqno;
            pipelineRecoveryCount = 0;
            ackQueue.removeFirst(); //从ack队列中移除
            packetSendTime.remove(seqno);
            dataQueue.notifyAll(); //通知dataQueue应答处理完毕

            one.releaseBuffer(byteArrayManager);
          }
        } catch (Exception e) {
          if (!responderClosed) {
            lastException.set(e);
            errorState.setInternalError();
            errorState.markFirstNodeIfNotMarked();
            synchronized (dataQueue) {
              dataQueue.notifyAll();
            }
            if (!errorState.isRestartingNode()) {
              LOG.warn("Exception for " + block, e);
            }
            responderClosed = true;
          }
        } finally {
          if (scope != null) {
            scope.close();
          }
          scope = null;
        }
      }
    }

至此,客户端成功收到DN的应答后,上传过程完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1025942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机网络】 拥塞控制

文章目录 背景TCP的四种拥塞控制算法慢开始与拥塞避免&#xff1a;快重传&#xff1a;快恢复&#xff1a; 流量控制和拥塞控制本质上的 区别 背景 网络中的链路容量和交换节点中的缓存和处理机都有着工作的极限&#xff0c;当网络的需求超过他们的工作极限时&#xff0c;就出现…

电脑回收站为什么自动清空?win10回收站自动清理的东西怎么找回

“很奇怪&#xff0c;我昨天才删除的文件&#xff0c;但是今天回收站中啥也没有了。难道win10回收站是自动清空的吗&#xff1f;&#xff1f;请问如何恢复这些文件呢&#xff1f;” ——您是否曾经遇到过回收站自动清空的情况&#xff1f;您是如何处理的呢&#xff1f;下面为大…

爬虫 — Js 逆向案例一英汉互译

目标网站&#xff1a;https://fanyi.baidu.com/ 需求&#xff1a;实现英汉互译 案例分析 1、分析网站加载方式 动态加载&#xff0c;目标 url&#xff1a;https://fanyi.baidu.com/v2transapi?fromen&tozh 2、分析请求方式 post&#xff08;携带 data 参数&#xff09…

功率放大器的特点是什么

功率放大器是电子系统中常见的一种设备&#xff0c;其主要功能是将低功率输入信号放大为高功率的信号输出。功率放大器具有多种特点&#xff0c;下面西安安泰电子将详细介绍功率放大器的几个主要特点。 功率放大器的一个主要特点是高功率输出。与信号放大器相比&#xff0c;功率…

Pytorch 深度学习实践 day01(背景)

准备 线性代数&#xff0c;概率论与数理统计&#xff0c;Python理解随机变量和分布之间的关系 人类智能和人工智能 人类智能分为推理和预测 推理&#xff1a;通过外界信息的输入&#xff0c;来进行的推测 预测&#xff1a;例如&#xff0c;看到一个真实世界的实体&#xff…

windows 电脑改成安卓桌面

windows电脑改造为类安卓的操作逻辑&#xff0c;适用于触摸屏windows系统。主要操作逻辑见下面图片列表&#xff1a; 桌面主页面 侧边工具栏 工具栏里有一些常用小工具 打开首页的应用&#xff08;edge浏览器&#xff09; 查看我的所有应用

2023年9月20日

画个钟 头文件&#xff1a; #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent> #include <QDebug> #include <QPainter> #include <QTimerEvent> #include <QTime> #include <QDateTime> #include <…

ChatGLM 大模型炼丹手册-理论篇

序言一)大还丹的崛起 在修真界,人们一直渴望拥有一种神奇的「万能型丹药」,可包治百病。 但遗憾的是,在很长的一段时间里,炼丹师们只能对症炼药。每一枚丹药,都是特效药,专治一种病。这样就导致,每遇到一个新的问题,都需要针对性的炼制,炼丹师们苦不堪言,修真者们吐…

渗透测试——formatworld(1)

文章目录 一、环境二、获取flag11、扫描局域网内存活主机1.1 查看kali的IP地址1.2 扫描存活主机 2、粗略扫描靶机端口&#xff08;服务&#xff09;3、寻找ftp服务漏洞4、扫描端口详细信息5、匿名登录ftp 一、环境 攻击机&#xff1a;kali 靶机&#xff1a;formatworld 二、获…

transformer系列1---Attention Is All You Need全文详细翻译

论文链接&#xff1a;Attention Is All You Need. 代码链接&#xff1a;Transformer. Transformer 0 Abstract 摘要1 Introduction 引言2 background 背景3 Model Architecture模型架构3.2 Attention注意力3.2.1 Scaled Dot-Product Attention缩放点积注意力3.2.2 Multi-Head …

idea设置gradle

1、不选中 2、下面选specified location 指定gradle目录

IDEA 启动 java web 老项目

背景&#xff1a;一套 java web 老代码&#xff0c;使用 eclipse 工具开发。内网&#xff0c;无 eclipse 开发工具&#xff0c;只有 IDEA。 代码目录结构如下&#xff1a; demo/.settings/* demo/src/com/demo/controller/* demo/webapp/js/* demo/webapp/jsp/* demo/webapp/M…

realloc

目录 前提须知&#xff1a; 函数介绍&#xff1a; 函数原型&#xff1a; 使用realloc&#xff1a; realloc在调整内存空间的是存在两种情况/使用realloc为扩大空间的两种情况 1.是剩下的没有被分配的空间足够 2 .剩下没有被分配的空间不够了 注意事项&#xff1a; rea…

普中51-矩阵按键

矩阵按键 原理图如下&#xff1a; 行列扫描 行列扫描法检测时&#xff0c;先送一列为低电平&#xff0c;其余几列全为高电平(此时我们确 定了列数)&#xff0c;然后立即轮流检测一次各行是否有低电平&#xff0c;若检测到某一行为低电 平(这时我们又确定了行数)&#xff0c;…

巨人互动|Facebook海外户Facebook的特点优势

Facebook作为全球最大的社交媒体平台之一&#xff0c;同时也是最受欢迎的社交网站之一&#xff0c;Facebook具有许多独特的特点和优势。本文小编将说一些关于Facebook的特点及优势。 1、全球化 Facebook拥有数十亿的全球用户&#xff0c;覆盖了几乎所有国家和地区。这使得人们…

Unity用相机实现的镜子效果

首先登场 场景中的元素 mirror是镜子&#xff0c;挂着我们的脚本&#xff0c;Quad是一个面片。Camera是用来生成RenderTexture给面片的。里面的test1是我用来调试位置的球。 镜子size是大小&#xff0c;x是-2&#xff0c;为了反转一下贴图 相机直接可以禁用掉&#xff0c;用…

无需申请专线、无需改动网络,ERP/MES管理系统如何远程访问?

深圳市某模具公司作为一家以设计、制作五金模具、五金冲压、机加工件、加工经营为主的五金企业。为了实现更为高效的生产管理流程&#xff0c;引入了面向钣金/五金行业信息化建设的ERP/MES管理系统及方案&#xff0c;并将其部署在了企业总部的内网服务器。 除了总部访问需求外&…

ThreeJS入门-创建一个正方体

ThreeJs实际上是WebGL的框架&#xff0c;Three.js是在WebGL的api接口基础上&#xff0c;又进行的一层封装。相当于js和JQuery的关系&#xff0c;学习webgl需要图形学知识&#xff0c;而webgl需要通过js和glsl两种语言。如果我们不通过threejs使用webgl势必逃不过底层知识:你必须…

SSRF漏洞(利用file协议读取本地文件)

简介 当利用SSRF漏洞时&#xff0c;攻击者可以通过构造恶意请求来读取本地文件。其中一种方法是使用file协议来读取本地文件。例如&#xff0c;file:///etc/passwd是一个常见的示例&#xff0c;它用于读取Linux系统上的passwd文件。 passwd文件是Linux系统中用于存储用户账户…

Flink——Flink检查点(checkpoint)、保存点(savepoint)的区别与联系

Flink checkpoint Checkpoint是Flink实现容错机制最核心的功能&#xff0c;能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot&#xff0c;从而将这些状态数据定期持久化存储下来&#xff0c;从而将这些状态数据定期持久化存储下来&#xff0c;当Flink程序一…