Hadoop-MapReduce-源码跟读-MapTask阶段篇

news2024/12/25 13:05:27

一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、Mapper类

我们先看下我们写的map所继承的Mapper类

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  /**
   * 传递给 Mapper 实现的 Context
   */
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
  
  /**
   * 在任务开始时调用一次
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * 为输入分片的每个键/值对调用一次。我们的WordCount就是覆盖了这一点,
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * 在任务结束时调用一次
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * 专家用户可以覆盖此方法,以便对Mapper的执行进行更完整的控制
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

经过该类的注释我们可以得到以下信息:

1、输入的每个 <key,value> 经过Map后会处理成一组<key,value> ,即 <key,values>

2、输入的每个 <key,value> 经过Map后会可以输出 0个、1个、多个 <key,value>

3、Hadoop Map Reduce框架为Job中InputFormat生成的每个InputSplit分配一个MapTask

4、框架首先为InputSplit中的每个 <key,value>调用setup(),然后调用map(),最后调用cleanup()

5、Map输出的key对应的所有value由框架进行分组,并传递给Reduce

6、用户可以通过RawComparator来控制排序和分组

7、Map输出会根据Reduce进行分区,用户可以通过自定义Partitioner来控制哪些key去哪个Reduce

8、用户可以选择设置CombinaterClass指定组合器来聚合Map输出结果,这样会减少Map到Reduce的传送数据量

9、用户可以通过Configuration指定是否压缩、如何压缩中间输出。

10、如果Job只有Map阶段,那么Map会直接写入OutputFormat,且不会按照key排序

11、用户可以覆盖run()来对Map处理进行更大的干预,例如多线程实现

三、MapTask是如何调起的

在上一篇<Hadoop-MapReduce-YarnChild启动篇>博客中已经将了YarnChild的启动,而MapTask就是在它里面被调起的,下面我们来看下YarnChild中的代码

public static void main(String[] args) throws Throwable {
    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    LOG.debug("Child starting");

    //根据job的配置文件构建JobConf
    //JobConf是用户描述MapReduce作业到Hadoop框架执行的主要接口。框架试图忠实地执行作业,
    //例如我们在WordCount中的main方法中用Job.set**了很多属性,比如Mapper、Reducer的实现类、输出格式、输入输出目录等等
    final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
    //使用JobConf初始化可以避免两次加载conf
    Limits.init(job);
    UserGroupInformation.setConfiguration(job);
    //MAPREDUCE-6565: 需要设置SecurityUtil的配置
    SecurityUtil.setConfiguration(job);

    String host = args[0];
    int port = Integer.parseInt(args[1]);
    //创建一个Socket地址
    final InetSocketAddress address =
        NetUtils.createSocketAddrForHost(host, port);
    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
    long jvmIdLong = Long.parseLong(args[3]);
    JVMId jvmId = new JVMId(firstTaskid.getJobID(),
        firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong);
    
    CallerContext.setCurrent(
        new CallerContext.Builder("mr_" + firstTaskid.toString()).build());

    //初始化度量系统
    DefaultMetricsSystem.initialize(
        StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");

    //安全框架已将令牌加载到当前ugi中
    Credentials credentials =
        UserGroupInformation.getCurrentUser().getCredentials();
    LOG.info("Executing with tokens: {}", credentials.getAllTokens());

    //创建TaskUmplicalProtocol作为实际任务所有者
    UserGroupInformation taskOwner =
      UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
    SecurityUtil.setTokenService(jt, address);
    taskOwner.addToken(jt);
    final TaskUmbilicalProtocol umbilical =
      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
      @Override
      public TaskUmbilicalProtocol run() throws Exception {
        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
            TaskUmbilicalProtocol.versionID, address, job);
      }
    });

    //向ApplicationMaster报告non-pid
    JvmContext context = new JvmContext(jvmId, "-1000");
    LOG.debug("PID: " + System.getenv().get("JVM_PID"));
    Task task = null;
    UserGroupInformation childUGI = null;
    ScheduledExecutorService logSyncer = null;

    try {
      int idleLoopCount = 0;
      JvmTask myTask = null;
      //轮询新任务
      for (int idle = 0; null == myTask; ++idle) {
        long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
        LOG.info("Sleeping for " + sleepTimeMilliSecs
            + "ms before retrying again. Got null now.");
        MILLISECONDS.sleep(sleepTimeMilliSecs);
        myTask = umbilical.getTask(context);
      }
      if (myTask.shouldDie()) {
        return;
      }

      task = myTask.getTask();
      YarnChild.taskid = task.getTaskID();

      //创建作业conf并设置凭据
      configureTask(job, task, credentials, jt);

      //记录系统属性
      String systemPropsToLog = MRApps.getSystemPropertiesToLog(job);
      if (systemPropsToLog != null) {
        LOG.info(systemPropsToLog);
      }

      //启动Java VM指标
      JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
      childUGI = UserGroupInformation.createRemoteUser(System
          .getenv(ApplicationConstants.Environment.USER.toString()));
      //向新用户添加令牌,使其能够正确执行任务
      childUGI.addCredentials(credentials);

      //如果在调用任务之前进行了配置,请设置作业类加载器
      MRApps.setJobClassLoader(job);

      logSyncer = TaskLog.createLogSyncer();

      //为doAs块创建对任务的最终引用
      final Task taskFinal = task;
      childUGI.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws Exception {
          //使用作业指定的工作目录
          setEncryptedSpillKeyIfRequired(taskFinal);
          FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
          taskFinal.run(job, umbilical); // 运行Task
          return null;
        }
      });
    } catch (FSError e) {
      LOG.error("FSError from child", e);
      if (!ShutdownHookManager.get().isShutdownInProgress()) {
        umbilical.fsError(taskid, e.getMessage());
      }
    } catch (Exception exception) {
      LOG.warn("Exception running child : "
          + StringUtils.stringifyException(exception));
      try {
        if (task != null) {
          // do cleanup for the task
          if (childUGI == null) { // no need to job into doAs block
            task.taskCleanup(umbilical);
          } else {
            final Task taskFinal = task;
            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
              @Override
              public Object run() throws Exception {
                taskFinal.taskCleanup(umbilical);
                return null;
              }
            });
          }
        }
      } catch (Exception e) {
        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
      }
      // Report back any failures, for diagnostic purposes
      if (taskid != null) {
        if (!ShutdownHookManager.get().isShutdownInProgress()) {
          umbilical.fatalError(taskid,
              StringUtils.stringifyException(exception), false);
        }
      }
    } catch (Throwable throwable) {
      LOG.error("Error running child : "
    	        + StringUtils.stringifyException(throwable));
      if (taskid != null) {
        if (!ShutdownHookManager.get().isShutdownInProgress()) {
          Throwable tCause = throwable.getCause();
          String cause =
              tCause == null ? throwable.getMessage() : StringUtils
                  .stringifyException(tCause);
          umbilical.fatalError(taskid, cause, false);
        }
      }
    } finally {
      RPC.stopProxy(umbilical);
      DefaultMetricsSystem.shutdown();
      TaskLog.syncLogsShutdown(logSyncer);
    }
  }

taskFinal.run(job, umbilical) 将此任务作为命名作业的一部分运行。此方法在子进程中执行,并调用用户提供的map、reduce等方法。

四、MapTask运行细节(源码跟读)

1、MapTask

紧跟第三大步的节奏,我们看MapTask.run()

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
      //如果没有ReduceTask,就不会有任何reduce。因此,map阶段将控制整个尝试任务的进度
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
      //如果有ReduceTask,那么整个尝试任务的进度将在map阶段(67%)和sort阶段(33%)之间分配。
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    //创建TaskReporter并启动通信线程
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    //初始化作业:
    //    1、构建Job上下文
    //    2、构建尝试任务上下文
    //    3、将尝试任务的状态从UNASSIGNED改为RUNNING
    //    4、判断是否使用新API
    //    5、设置Job输出提交者
    //    6、设置任务的输出(这是从将输出到HDFS的每个单独任务的进程中调用的,并且它只是为该任务调用的。对于同一任务,但对于不同的任务尝试,可以多次调用此函数)
    //    7、从Job配置中的类名创建根到指定进程的ResourceCalculatorProcessTree并对其进行配置。如果类名为null,此方法将尝试返回可用于此系统的进程树插件。
    //    8、更新进程树
    //    9、获取自创建进程树以来进程树中所有进程使用的CPU时间(以毫秒为单位)
    initialize(job, getJobID(), reporter, useNewApi);

    //检查任务类型:cleanupJobTask、jobSetupTask、taskCleanupTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    //判断是否使用新的API来运行不同的Mapper,这里我们走新的API
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

下面我们看runNewMapper()

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    //制作一个任务上下文,以便我们可以获得类
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    //制作一个mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    //制作一个InputFormat
    //InputFormat描述Map Reduce作业的输入规范。
    //Map Reduce框架依赖作业的InputFormat做以下操作
    //    1、验证作业的输入规范
    //    2、将输入文件拆分为逻辑的InputSplit,然后将每个InputSplit分配给一个单独的Mapper
    //    3、提供RecordReader}实现,用于从InputSplit中收集输入记录,供Mapper处理
    //InputSplit的上限是输入文件的块大小,下限是mapred-default.xml中配置的mapreduce.input.fileinputformat.split.minsize的值
    //显然,基于输入大小的逻辑拆分对于许多应用程序来说是不够的,因为要尊重记录边界。
    //在这种情况下,应用程序还必须实现一个RecordReader负责尊重记录边界,并向单个任务呈现逻辑InputSplit的面向记录的视图
    //inputFormat 可以通过 mapreduce.job.inputformat.class指定,默认TextInputFormat.class
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    //重新生成InputSplie
    //InputSplit表示要由单个mapper处理的数据
    //它在输入上显示一个面向字节的视图,由作业的RecordReader负责将其处理成一个面向记录的视图。
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    //生成RecordReader
    //将数据分解为<key,value>,以便输入到mapper
    //NewTrackingRecordReader是MapTask的内部类,下面也会用到,在new的时候会设置该类中的RecordReader
    //RecordReader real = inputFormat.createRecordReader(split, taskContext);
    //通过TextInputFormat.createRecordReader()会得到LineRecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    //判断该任务是否是跳过任务,并设置给job
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    //RecordWriter负责将<key,value>写入到输出文件
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    //创建一个RecordWriter
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      //我们看下NewOutputCollector的实例化都做了什么,看第2步
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    //提供给Mapper的上下文。
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);
    //传递给Mapper实现的Context
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      //在初始化时调用一次
      //事件调用的是本类中内部类NewTrackingRecordReader的initialize()
      //根据以上的注释,我们可以知道最终走的是LineRecordReader的initialize()
      //我们停下来,先看看其中的实现(跳到第3步)
      input.initialize(split, mapperContext);
      //下面我们看下mapper.run()(跳到第4步)
      mapper.run(mapperContext);
      //完成此节点,将父节点移动到其下一个子节点
      mapPhase.complete();
      //设置任务的当前阶段为SORT阶段
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

2、NewOutputCollector

private class NewOutputCollector<K,V>
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
    private final MapOutputCollector<K,V> collector;
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
    private final int partitions;

    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      //创建排序缓冲区,我们看看其内部实现
      collector = createSortingCollector(job, reporter);
      //ReduceTask的数量就是分区器的数量
      partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }

    @Override
    public void close(TaskAttemptContext context
                      ) throws IOException,InterruptedException {
      try {
        collector.flush();
      } catch (ClassNotFoundException cnf) {
        throw new IOException("can't find class ", cnf);
      }
      collector.close();
    }
  }

下面看看createSortingCollector()的处理逻辑

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector.Context context =
      new MapOutputCollector.Context(this, job, reporter);

    //可以通过mapreduce.job.map.output.collector.class指定自己的缓冲区实现
    //默认是MapOutputBuffer.class
    Class<?>[] collectorClasses = job.getClasses(
      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
    int remainingCollectors = collectorClasses.length;
    Exception lastException = null;
    for (Class clazz : collectorClasses) {
      try {
        //缓冲区必须是MapOutputCollector的实现类
        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
          throw new IOException("Invalid output collector class: " + clazz.getName() +
            " (does not implement MapOutputCollector)");
        }
        Class<? extends MapOutputCollector> subclazz =
          clazz.asSubclass(MapOutputCollector.class);
        LOG.debug("Trying map output collector class: " + subclazz.getName());
        MapOutputCollector<KEY, VALUE> collector =
          ReflectionUtils.newInstance(subclazz, job);
        //初始化缓冲区,我们看看默认缓冲区MapOutputBuffer的init方法
        collector.init(context);
        LOG.info("Map output collector class = " + collector.getClass().getName());
        return collector;
      } catch (Exception e) {
        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
        if (--remainingCollectors > 0) {
          msg += " (" + remainingCollectors + " more collector(s) to try)";
        }
        lastException = e;
        LOG.warn(msg, e);
      }
    }

    if (lastException != null) {
      throw new IOException("Initialization of all the collectors failed. " +
          "Error in last collector was:" + lastException.toString(),
          lastException);
    } else {
      throw new IOException("Initialization of all the collectors failed.");
    }
  }

下面我们看下MapOutputBuffer的init方法

 public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //健全性检查
      //排序溢写百分比:可以通过mapreduce.map.sort.spill.percent设置,默认0.8
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      //排序缓冲区大小:可以通过mapreduce.task.io.sort.mb设置,默认值为100mb
      final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
          MRJobConfig.DEFAULT_IO_SORT_MB);
      //缓存限制:可以通过mapreduce.task.index.cache.limit.bytes设置,默认1024 * 1024 字节
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);

      //如果溢写百分比必须在0-1之间
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      //0x7FF = 111 1111 1111
      //& 按位与运算:二进制对应位都为1时,结果才为1;否则结果为0
      //说明sortmb的最大值为0x7FF 既 2047M
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
      //spill用于接受IndexedSortable项的排序算法的接口
      //默认采用 QuickSort.class (快排序)
      //用户可以通过 map.sort.class 指定 但要实现IndexedSorter接口
      sorter = ReflectionUtils.newInstance(job.getClass(
                   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
                   IndexedSorter.class), job);
      // 缓冲和记账
      //<< 左位移运算:符号位始终保持不变 如果右侧空出位置,则自动填充为 0;超出 32 位的值,则自动丢弃
      //比如 默认值 100 左移20 就是 104,857,600 换成16进制就是 0x6400000
      int maxMemUsage = sortmb << 20;
      //METASIZE = 4*4
      //% 求模运算
      //如果 用默认值 100 那么此时   maxMemUsage % METASIZE = 104,857,600 % 16 = 0
      //maxMemUsage = maxMemUsage 本身
      maxMemUsage -= maxMemUsage % METASIZE;
      //主输出缓冲器 字节数组 上面为什么是左位移 20 因为 MB 是 byte 的2的20次方 倍 因此是 位移 20 得到 100MB 换算的 字节长度
      //1MB = 1024KB 
      //1KB = 1024Byte
      //1MB = 1024 * 1024 Byte = 2的10次方 + 2的10次方 = 2的20次方 Byte
      kvbuffer = new byte[maxMemUsage];
      //字节数组的长度 ,标记我们应该在缓冲区末尾停止读取的点
      bufvoid = kvbuffer.length;
      //将字节数组封装到缓冲区中
      //新的缓冲区将由给定的字节数组支持;也就是说,对缓冲区的修改将导致数组被修改,反之亦然。
      //新缓冲区的容量和限制将为array.length,其位置将为零,其标记将未定义。它的后备数组将是给定的数组,其数组偏移量>将为零。
      //java.nio.ByteOrder.nativeOrder()
      //检索基础平台的本机字节顺序。
      //定义此方法是为了使性能敏感的Java代码可以分配与硬件具有相同字节顺序的直接缓冲区。当使用这样的缓冲区时,本机代码库通常更高效。
      //java.nio.ByteBuffer.order()
      //修改此缓冲区的字节顺序。
      //java.nio.ByteBuffer.asIntBuffer()
      //创建此字节缓冲区的视图作为int缓冲区。
      //新缓冲区的内容将从此缓冲区的当前位置开始。对该缓冲区内容的更改将在新缓冲区中可见,反之亦然;两个缓冲区的位置、限制和标记值将是独立的。
      //新缓冲区的位置将为零,其容量和限制将是该缓冲区中剩余的字节数除以4,其标记将未定义。当且仅当此缓冲区为直接缓冲区时,
      //新缓冲区将为直接缓冲区;当且仅当此缓冲区是只读缓冲区时,它将为只读缓冲区。
      //返回一个新的int缓冲区
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      //Equator 赤道的意思 这里设置赤道为0
      //设置元数据和序列化数据展开的点。元索引与缓冲区对齐,因此元数据永远不会跨越循环缓冲区的末端。
      //缓冲区本质还是线性的int数组,但是有了赤道的概念,我们可以把他抽象成环形的,因为赤道也是可以移动的
      setEquator(0);
      //bufstart 标志着泄漏的开始
	  //bufend   标志着溢出的开始标志着可收藏的开始
	  //bufindex 标记收集的结束
	  //equator  赤道,标记元/序列化的起源
	  //初始值都是0
      bufstart = bufend = bufindex = equator;
      //kvstart	标记泄漏元数据的来源
	  //kvend	标记溢出元数据的结束
	  //kvindex	标记完全序列化记录的结束
      kvstart = kvend = kvindex;
      //maxRec = int数组的长度 / 4 = 16字节数组长度
      maxRec = kvmeta.capacity() / NMETA;
      //软限制 = 容量的 0.8
      softLimit = (int)(kvbuffer.length * spillper);
      //剩余的缓冲区大小,默认是容量的 0.8  也就是 80M
      bufferRemaining = softLimit;
      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
      LOG.info("soft limit at " + softLimit);
      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

      // k/v serialization 连续
      //RawComparator<K> comparator 直接对字节表进行操作
      //获取用于比较密钥的{@link RawComparator}比较器。
      //用户可以设置 mapreduce.job.output.key.comparator.class 默认值是 null 
      //当为null 会再找 mapreduce.map.output.key.class
      //获取映射输出数据的键类。如果未设置,请使用(最终)输出键类。这允许映射输出键类与最终输出键类不同。
      comparator = job.getOutputKeyComparator();
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      //序列化是通过从<code>conf</code>中读取<code>io.Serializations</code>属性来找到的,该属性是一个逗号分隔的类名列表
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      //BlockingBuffer bb = new BlockingBuffer()
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

      //输出统计信息 比如:mapreduce 跑完都会显示读了多少字节写了多少字节数据
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

      //压缩
      if (job.getCompressMapOutput()) {
        //可以通过 mapreduce.map.output.compress.codec 设置 需要实现CompressionCodec接口 默认是 DefaultCodec 
        //只有map时 默认的压缩类是 GzipCodec.class
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      //合并
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      spillInProgress = false;
      //最小为组合溢出的次数 可以通过 mapreduce.map.combine.minspills 设置 默认值 3
      //
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      //将此线程标记为守护进程线程或用户线程。当唯一运行的线程都是守护进程线程时,Java虚拟机将退出。
      //必须在启动线程之前调用此方法。
      //Daemon 守护线程、守护进程、守护程序
      spillThread.setDaemon(true);
      //将此线程的名称更改为等于参数名称。参数不能为null
      //首先,调用此线程的checkAccess方法时不带任何参数。这可能导致引发SecurityException。
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        //使此线程开始执行溢写操作
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
            sortSpillException);
      }
    }

3、LineRecordReader

public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    //一行的最大长度可以通过mapreduce.input.linerecordreader.line.maxlength设置,默认是Integer.MAX_VALUE也就是2的31次方-1
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    //文件中要处理的第一个字节的位置
    start = split.getStart();
    //end = start  + 文件中要处理的字节数 = 既要处理的字节数的最后一个字节的位置
    end = start + split.getLength();
    //包含此分片数据的文件
    final Path file = split.getPath();

    //打开文件并查找拆分的开始位置
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    
    //根据给定文件的文件名后缀查找相应的压缩编解码器。
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
      isCompressedInput = true;
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new CompressedSplitLineReader(cIn, job,
            this.recordDelimiterBytes);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        if (start != 0) {
          // 因此,我们有一个分割,它只是使用无法分割的压缩编解码器存储的文件的一部分。
          throw new IOException("Cannot seek in " +
              codec.getClass().getSimpleName() + " compressed stream");
        }

        in = new SplitLineReader(codec.createInputStream(fileIn,
            decompressor), job, this.recordDelimiterBytes);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new UncompressedSplitLineReader(
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    //如果这不是第一次拆分,我们总是丢弃第一条记录,因为我们总是(除了最后一次拆分)在next()方法中多读取一行。这样就避免了因为根据字节分片导致的数据完整性破坏
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

private LongWritable key;
private Text value;
public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    //我们总是多读一行,它位于拆分上限之外,即(end-1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        //BOM(Byte Order Mark)既字节序标记,这里需要剔除
        //支持UTF-8,我们只需要检查UTF-8 BOM
        //判断的标准是在文本流的开头有(0xEF、0xBB、0xBF)
        //即使我们为第一行额外读取3个字节,我们也不会改变现有的行为(没有向后不兼容的问题)。
        //因为newSize小于maxLineLength,并且复制到Text的字节数始终不大于newSize。
        //如果readLine的返回大小不小于maxLineLength,我们将丢弃当前行并读取下一行
        newSize = skipUtfByteOrderMark();
      } else {
        //将InputStream中的一行读取到给定的Text中
        //返回读取的字节数,包括找到的(最长的)换行符
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        //pos即为下一行数据开头的字节偏移量,也就是key
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      //该行太长,再试一次
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

  //获取当前key:当前行数据开头的字节偏移量
  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  //获取当前value:当前行数据
  @Override
  public Text getCurrentValue() {
    return value;
  }

4、Mapper

public void run(Context context) throws IOException, InterruptedException {
    //可以覆盖该方法在map方法执行前执行一次
    setup(context);
    try {
      while (context.nextKeyValue()) {
        //一般我们的程序会覆盖这个方法,让框架调我们的处理逻辑,这样想想:原来框架做了怎么多,我们只写了一个处理方法而已,就可以处理大数据了,单从运用上讲是不是很简单
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      //可以覆盖该方法在map方法执行后执行一次
      cleanup(context);
    }
  }

context.nextKeyValue()、context.getCurrentKey()、context.getCurrentValue()给我们的是什么数据呢?默认的实现在LineRecordReader中,已经在第2步中的代码中写了

下面我们以源码自带的WordCount为例看看其中的map方法

5、WordCount.map()

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      //为指定的字符串构造一个字符串标记化器。标记化器使用默认的分隔符集,
      //即"\t\n\r\f":空格字符、制表符、换行符、回车符和换行符。分隔符字符本身不会被视为标记
      StringTokenizer itr = new StringTokenizer(value.toString());
      //将一行中的所有单词都输出下,数量都给成1,既:
      //<word1,1>
      //<word2,1>
      //......
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        //下面我们看下write的过程,调的是NewOutputCollector.write()
        //NewOutputCollector.write()中又调的MapTask类中的collect()
        context.write(word, one);
      }
    }
  }

6、MapTask.collect()

collect将<key,value>序列化到中间存储

// k/v 记账信息
    private IntBuffer kvmeta; //后备存储上的元数据覆盖
    int kvstart;            //标记溢写元数据的开始
    int kvend;              //标记溢写元数据的结束
    int kvindex;            //标记完全序列化的记录的结束

    int equator;            //标记元数据和序列化数据的源头,equator是赤道的意思
    int bufstart;           //标记开始溢写
    int bufend;             //标记开始收集
    int bufmark;            //标记记录的结尾
    int bufindex;           //标记收集的结尾
    int bufvoid;            //标记我们应该在缓冲区的末尾停止读取的点

    byte[] kvbuffer;        //主输出缓冲器
    private final byte[] b0 = new byte[0];

    private static final int VALSTART = 0;         //value的偏移
    private static final int KEYSTART = 1;         //key的偏移
    private static final int PARTITION = 2;        //分区偏移
    private static final int VALLEN = 3;           //value长度
    private static final int NMETA = 4;            // num meta ints
    private static final int METASIZE = NMETA * 4; // size in bytes

    //溢写记账信息
    private int maxRec;
    private int softLimit;
    boolean spillInProgress;;
    int bufferRemaining;


public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();
      //判断key的类型是否是job启动时设置的类型
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }
      //判断value的类型是否是job启动时设置的类型
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }
      //判断分区的范围是否在正常区间
      if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }
      checkSpillException();
      bufferRemaining -= METASIZE;
      if (bufferRemaining <= 0) {
        //如果线程未运行并且已达到软限制,则开始溢出
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              //序列化的、未分页的字节总是位于kvindex和bufindex之间,穿过赤道。
              //请注意,重置创建的任何空白空间都必须包含在“已使用”字节中
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                //溢写完成,回收空间
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                //泄漏记录(如有);检查后者,因为元数据对齐可能会影响溢出pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                //为序列化数据保留至少一半的拆分缓冲区,确保kvindex>=bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                //必须持有锁并检查限制之前剩余的字节是三个弧的最小值:
                //    元数据空间、序列化空间和软限制
                bufferRemaining = Math.min(
                    //元数据最大值
                    distanceTo(bufend, newPos),
                    Math.min(
                      //序列化最大值
                      distanceTo(newPos, serBound),
                      //软限制
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }
      }

      try {
        //将密钥字节序列化到缓冲区
        int keystart = bufindex;
        keySerializer.serialize(key);
        if (bufindex < keystart) {
          //包裹钥匙;必须使连续
          bb.shiftBufferedKey();
          keystart = 0;
        }
        //将值字节序列化到缓冲区
        final int valstart = bufindex;
        valSerializer.serialize(value);
        //记录的长度可能为零,即序列化程序将不执行任何写入操作。
        //为了确保检查边界条件并保持kvindex不变,请对缓冲区执行零长度写入。
        //监控这一点的逻辑可以转移到collect中,但这更干净、更便宜。目前,这是可以接受的。
        bb.write(b0, 0, 0);

        //该记录必须标记在前一次写入之后,因为该记录的元数据尚未写入
        int valend = bb.markRecord();

        mapOutputRecordCounter.increment(1);
        mapOutputByteCounter.increment(
            distanceTo(keystart, valend, bufvoid));

        //写入记账信息
        kvmeta.put(kvindex + PARTITION, partition);
        kvmeta.put(kvindex + KEYSTART, keystart);
        kvmeta.put(kvindex + VALSTART, valstart);
        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
        //提升 kvindex
        kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
      } catch (MapBufferTooSmallException e) {
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
        spillSingleRecord(key, value, partition);
        mapOutputRecordCounter.increment(1);
        return;
      }
    }

五、总结

1、YarnChild启动,加载Job相关的配置信息,设置Job工作目录等

2、YarnChild调起MapTask

3、MapTask初始化,比如构建上下文、设置Job输出提交者、设置任务的输出等

4、制作自定义Mapper、InputFormat、RecordReader、RecordWriter(这里会初始化一个环形缓冲区并启动一个线程准备排序、合并、溢写)

5、处理InputFormat(规避因为根据字节分片导致的数据完整性破坏、剔除UTF-8文件的BOM等),处理成map的输入数据<key,value> key为该行数据在文件中的偏移量,value为该行文本数据

6、运行自己编写的Mapper中的map方法

7、溢写线程开始对数据进行排序(默认快排)、合并、溢写

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1420490.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python二维高斯热力图绘制简单的思路代码

import numpy as np import matplotlib.pyplot as plt from scipy.ndimage import gaussian_filter import cv2# 生成一个示例图像 image_size 100 image np.zeros((image_size, image_size))# 在图像中心创建一个高亮区域 center_x, center_y image_size // 2, image_size …

爱可声助听器参与南湖区价值百万公益助残捐赠活动成功举行

“声音大小合适吗&#xff1f;能听清楚吗&#xff1f;”今天下午&#xff0c;一场助残捐赠活动在南湖区凤桥镇悄然举行&#xff0c;杭州爱听科技有限公司带着验配团队和听力检测设备来到活动现场&#xff0c;为南湖区听障残疾人和老人适配助听器。 家住余新镇的75岁的周奶奶身体…

mac截图翻译软件有哪些?五大超实用翻译软件

mac截图翻译软件有哪些&#xff1f;随着全球化的发展&#xff0c;跨语言沟通已成为日常生活和工作中不可或缺的一部分。然而&#xff0c;语言障碍常常让我们在阅读外文资料时感到困惑。为了解决这一问题&#xff0c;Mac用户需要一款强大的截图翻译软件来帮助他们快速理解外文内…

Docker 搭建MySQL主从复制-读写分离

一. 介绍 MySQL主从复制是一种常用的数据库高可用性解决方案&#xff0c;通过在主数据库上记录的数据变更&#xff0c;同步到一个或多个从数据库&#xff0c;实现数据的冗余备份和读写分离。在Docker环境下搭建MySQL主从复制和读写分离&#xff0c;不仅方便管理&#xff0c;还…

MES和QMS怎么选?

MES&#xff0c;即制造执行系统&#xff0c;主要用于监控和控制生产过程&#xff0c;提升生产效率、减少生产成本。万界星空科技MES可以提供实时的生产数据&#xff0c;帮助企业做出更准确的决策&#xff0c;并且能够自动化地执行生产任务&#xff0c;提高生产线的效率和灵活性…

视频转GIF动图实践, 支持长视频转GIF

背景 找了很多GIF动图制作的工具&#xff0c;比如将视频转成GIF, 或者将一系列图片转成GIF, 增加背景文案等等功能。很多收费或者用的一些三方库有点点卡顿&#xff0c;或者需要安装一个软件&#xff0c;所以就自己做一款纯前端页面级别的 视频转 GIF 动图工具。 最开始找到一…

LLM漫谈(四)| ChatDOC:超越ChatPDF性能并支持更多功能的阅读聊天工具

在过去的一年里&#xff0c;ChatGPT的兴起催生了许多基于GPT的人工智能工具&#xff0c;其中Chat PDF工具得到了广泛关注。这些工具对知识密集型专业人员来说尤其有价值&#xff0c;大大提高了生产力。随着Chat PDF工具的激增&#xff0c;选择正确的工具变得至关重要。 接下来&…

vue-computed 计算属性

一、computed 计算属性 在Vue应用中&#xff0c;在模板中双向绑定一些数据或者表达式&#xff0c;但是表达式如果过长&#xff0c;或者逻辑更为复杂 时&#xff0c;就会变得臃肿甚至难以维护和阅读&#xff0c;例如&#xff1a; <div>写在双括号中的表达式太长了,不利于阅…

Apache Commons Collection3.2.1反序列化分析(CC1)

Commons Collections简介 Commons Collections是Apache软件基金会的一个开源项目&#xff0c;它提供了一组可复用的数据结构和算法的实现&#xff0c;旨在扩展和增强Java集合框架&#xff0c;以便更好地满足不同类型应用的需求。该项目包含了多种不同类型的集合类、迭代器、队…

LiveGBS流媒体平台GB/T28181功能-支持配置开启 HTTPS 服务什么时候需要开启HTTPS服务

LiveGBS功能支持配置开启 HTTPS 服务什么时候需要开启HTTPS服务 1、配置开启HTTPS1.1、准备https证书1.1.1、选择Nginx类型证书下载 1.2、配置 LiveCMS 开启 HTTPS1.2.1 web页面配置1.2.2 配置文件配置 2、验证HTTPS服务3、为什么要开启HTTPS3.1、安全性要求3.2、功能需求 4、搭…

2024 中国(南京)国际口腔设备器械博览会

2024 中国&#xff08;南京&#xff09;国际口腔设备器械博览会 时间&#xff1a;2024 年 7 月 18-20 日 地点&#xff1a;南京国际展览中心 WeChat_20230512134641 主办单位: 南京民营口腔医疗协会 北京铭曼国际展览有限公司 承办单位: 北京铭曼国际展览有限公司 展会介绍 随…

SpringCloudStream整合MQ

目录 概念 快速搭建SCS环境 一秒切换MQ 组件 1. Binder 2. Binding 3. Message 分组消费 概念 Spring Cloud Stream&#xff08;SCS&#xff09; 的主要目标是一套代码&#xff0c;兼容所有MQ, 降低MQ的学习成本&#xff0c;提供一致性的编程模型&#xff0c;让开发者能更…

Qt之窗口位置

Qt提供了很多关于获取窗体位置及显示区域大小的函数&#xff0c;如x&#xff08;&#xff09;&#xff0c;y()和pos()&#xff0c;rect()&#xff0c;size()&#xff0c;geometry()等&#xff0c;统称为"位置相关函数"或"位置函数"。几种主要位置函数及其之…

Python爬虫实践指南:利用cpr库爬取技巧

引言 在信息时代&#xff0c;数据是无价之宝。为了获取网络上的丰富数据&#xff0c;网络爬虫成为了不可或缺的工具。在Python这个强大的编程语言中&#xff0c;cpr库崭露头角&#xff0c;为网络爬虫提供了便捷而高效的解决方案。本文将深入探讨如何利用cpr库实现数据爬取的各…

SpringBoot整合EasyCaptcha图形验证码

简介 EasyCaptcha&#xff1a;https://github.com/ele-admin/EasyCaptcha Java图形验证码&#xff0c;支持gif、中文、算术等类型&#xff0c;可用于Java Web、JavaSE等项目。 添加依赖 <dependency><groupId>com.github.whvcse</groupId><artifactId…

2023最新版克魔助手抓包教程(9) - 克魔助手 IOS 数据抓包

引言 在移动应用程序的开发中&#xff0c;了解应用程序的网络通信是至关重要的。数据抓包是一种很好的方法&#xff0c;可以让我们分析应用程序的网络请求和响应&#xff0c;了解应用程序的网络操作情况。克魔助手是一款非常强大的抓包工具&#xff0c;可以帮助我们在 Android…

Shell脚本⑦awk

目录 一.awk概述 1.awk介绍 2.基本格式 3.工作原理 4.常见的内建变量 二.awk基本操作 1.打印文本内容 &#xff08;1&#xff09;打印磁盘使用情况 &#xff08;2&#xff09;打印字符串 &#xff08;3&#xff09;打印字符串确定文件有多少行 2.根据$n以及NR提取字…

http和https的区别是什么?https有什么优缺点?

HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一个简单的请求-响应协议&#xff0c;它通常运行在TCP之上。它指定了客户端可能发送给服务器什么样的消息以及得到什么样的响应。这个简单模型是早期Web成功的有功之臣&#xff0c;因为它…

【JVM】运行时数据区域,内存如何分配和对象在内存中的组成

目录 一.运行时数据区域 1.线程独享 2.线程共享 二.内存如何分配 1.指针碰撞法 2.空闲列表法 3.TLAB 三.对象在内存中的组成 ​编辑1.对象头 2.实例数据 3.对齐填充 一.运行时数据区域 1.线程独享 &#xff08;1&#xff09;栈 虚拟机栈&#xff1a;每个 Java 方法在…

如何在centos云服务器上持续运行

一、直接上命令 cd到jar包所在目录 输入命令运行 nohup java -jar xxx.jar & 退出当前命令 二、云服务器上安装宝塔管理面板 直接用宝塔的进程守护&#xff0c;设置好当前进程输入参数保存就ok