MapReduce 源码分析-1

news2025/1/11 10:04:25

源码追踪

Class Job

作为使用Java语言编写的MapReduce城西,其入口方法位main方法,在MapReduce Main方法中,整个核心都在围绕着job类,中文称之为作业。

public class WordDriver {
    public static void main(String[] args) throws Exception {
        //配置文件对象
        Configuration conf = new Configuration();

        conf.set("mapreduce.framework.name","local");
        // 创建作业实例
        Job job = Job.getInstance(conf,WordDriver.class.getSimpleName());
        //设置作业的驱动
        job.setJarByClass(WordDriver.class);
        //设置Mapper
        job.setMapperClass(WordMapper.class);
        // job.setNumReduceTasks();
        //设置Reduce
        job.setReducerClass(WordReduce.class);
        //设置Mapper阶段输出的Key
        job.setMapOutputKeyClass(Text.class);
        //设置Mapper阶段输出的Value
        job.setMapOutputValueClass(LongWritable.class);
        //设置Reduce阶段输出的Key的类型
        job.setOutputKeyClass(Text.class);
        //设置Reduce阶段输出的Value的类型
        job.setOutputValueClass(LongWritable.class);
        //设置文件的路径 《上级文件夹》
        FileInputFormat.addInputPath(job,new Path("E:\\MapReduceTest\\output"));
        //设置输出结果的文件夹
        FileOutputFormat.setOutputPath(job,new Path("E:\\MapReduceTest\\output"));
        //提交任务
        boolean b = job.waitForCompletion(true);
        //判断任务是否执行成功
        if (b){
            System.out.println("OK");
        }else{
            System.out.println("error");
        }
    }
}

Job类在源码中的注释中,针对Job的作用做了描述:Job类允许用户配置作业、提交作业、控制作业执行以及查询作业状态。

用户创建MapReduce程序,通过Job描述作业的各个方面,然后提交作业并监视其进度。

通常我们把定义描述Job所在的主类(含有main方法的类)称之为MapReduce程序的驱动类

作业提交

submit是作业的提交的一种方式,但是我们通常不使用submit来提交作业,而是使用waitForCompletion方法进行提交,并传递一个值为true布尔值来进行参数的传递

public boolean waitForCompletion(boolean verbose
                                 ) throws IOException, InterruptedException,
                                          ClassNotFoundException {
  // 判断当前作业的状态是不是定义状态,也就是说是不是还没有提交
  if (state == JobState.DEFINE) {
    //没有提交就调用submit()方法,本质上来讲还是submit方法
    submit();
  }
  //这里就需要传入的verbose 值了,表明是否要详细过程
  if (verbose) {
    //如果需要详细过程,就执行monitorAndPrintJob方法,这个方法对执行流程进行监视、打印
    monitorAndPrintJob();
  } else {
    // get the completion poll interval from the client.
    //在这里来获取查询的时间,在默认情况下是5000ms
    int completionPollIntervalMillis = 
      Job.getCompletionPollInterval(cluster.getConf());
    // 在这里判断作业是否已经完成了
    while (!isComplete()) {
      try {
         //没有完成就休眠,默认5000ms
        Thread.sleep(completionPollIntervalMillis);
      } catch (InterruptedException ie) {
      }
    }
  }
  // 返回执行结果
  return isSuccessful();
}


  // submit方法
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    //在这里我们先确保任务是定义状态:也就是说任务还没有提交
    ensureState(JobState.DEFINE);
    //使用新版本的API
    setUseNewAPI();
    //和运行环境创建连接
    connect();
    
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }


// 创建一个连接
  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) { 
      // 如果为null就会新建一个Cluster实例
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
				   //创建一个实例
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

  //对象创建过程
  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }
  //  InetSocketAddress 是一个比较重要的参数,在Hadoop1.0之前是负责资源调度,相当于后续的yarn
  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf; // 设置新的conf
    this.ugi = UserGroupInformation.getCurrentUser();  // 获取当前用户
    initialize(jobTrackAddr, conf); // 初始化当前方法
  }

此时这个方法必然是在Cluster类中,这里展示一些相应的方法和关键的参数

  private ClientProtocolProvider clientProtocolProvider; // 客户端通信协议的提供者
  private ClientProtocol client; // 客户端通信协议
  private UserGroupInformation ugi; // 用户组信息
  private Configuration conf;
  private FileSystem fs = null;
  private Path sysDir = null;
  private Path stagingAreaDir = null; // job资源暂存路径
  private Path jobHistoryDir = null;  // job历史路径


//查看初始化方法,刚开始
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    initProviderList();
    for (ClientProtocolProvider provider : providerList) {
      LOG.debug("Trying ClientProtocolProvider : "
          + provider.getClass().getName());
      ClientProtocol clientProtocol = null;
      try {
        if (jobTrackAddr == null) {
           // 在这里有个关键点,就是在这里创建了客户端的通信协议,我们在后文中查看详细
          clientProtocol = provider.create(conf);
        } else {
            //如果集群信息不为空,那么久直接
          clientProtocol = provider.create(jobTrackAddr, conf);
        }
          ......
      } catch (Exception e) {
       ......
      }
    }

  }

我们进入到ClientProtocolProvider抽象类中,里面存在两个抽象方法,这两个抽象方法都是建立通信协议的,它们存在两个实现方法:

image-20230428170254816

在这里可以看到,底层两个实现,一个是Local本地,一个是Yarn,也就是在我们的Hadoop集群环境上。再往下看就不难发现,这几个方法本质上就是建立RPC协议,而且从底部屏蔽远程和本地直接的差异。

到后续,会调用submitter来进行设置,最后回通过submitter.submitJobInternal方法进行提交。在这里查看这个方法

JobStatus submitJobInternal(Job job, Cluster cluster) 
throws ClassNotFoundException, InterruptedException, IOException {

  //validate the jobs output specs 
  //检查输出路径是否配置而且是否存在,在正常情况下应该是已经配置过的且不应该存在
  checkSpecs(job);
    
  Configuration conf = job.getConfiguration();
  addMRFrameworkToDistributedCache(conf);
  // 获取作业准备区的路径,用于作业及相关资源提交的存放,比如说 jar、且pain信息等
  //默认情况下是/tmp/hadoop-yarn/staging/提交作业用户名/.staging
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  //configure the command line options correctly on the submitting dfs
  // 记录提交作业的主机IP、主机名
  InetAddress ip = InetAddress.getLocalHost();
  if (ip != null) {
    submitHostAddress = ip.getHostAddress();
    submitHostName = ip.getHostName();
    conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
    conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
  }
  // 与运行集群进行通信,将获取的jobID设置入job 中
  JobID jobId = submitClient.getNewJobID();
  job.setJobID(jobId);
  // 设置作业提的路径
  Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  JobStatus status = null;
  try {
    //设置job提交的用户名、
    conf.set(MRJobConfig.USER_NAME,
        UserGroupInformation.getCurrentUser().getShortUserName());
    conf.set("hadoop.http.filter.initializers", 
        "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
    LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
        + " as the submit dir");
    // get delegation token for the dir
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
        new Path[] { submitJobDir }, conf);
    
    populateTokenCache(conf, job.getCredentials());

    // generate a secret to authenticate shuffle transfers
    if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
      KeyGenerator keyGen;
      try {
        keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
        keyGen.init(SHUFFLE_KEY_LENGTH);
      } catch (NoSuchAlgorithmException e) {
        throw new IOException("Error generating shuffle secret key", e);
      }
      SecretKey shuffleKey = keyGen.generateKey();
      TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
          job.getCredentials());
    }
    if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
      conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
      LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
              "data spill is enabled");
    }
      
    //拷贝一些作业相关的资源到submitJobDir 作业准备区,比如说: -files、-archives
    copyAndConfigureFiles(job, submitJobDir);
    //创建job.xml,用来保存作业的配置
    Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
    
    // Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    // 生成本次作业的输入切片,并把切片信息协入作业准备区submitJobDir
    // 同时,切片也是在此时生成的,通过input.getSplits(job)方法生成切片数组,并按照从大到小进行排序
    int maps = writeSplits(job, submitJobDir);
    conf.setInt(MRJobConfig.NUM_MAPS, maps);
    LOG.info("number of splits:" + maps);

    int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
        MRJobConfig.DEFAULT_JOB_MAX_MAP);
    if (maxMaps >= 0 && maxMaps < maps) {
      throw new IllegalArgumentException("The number of map tasks " + maps +
          " exceeded limit " + maxMaps);
    }

    // write "queue admins of the queue to which job is being submitted"
    // to job file.
    //设置队列信息
    String queue = conf.get(MRJobConfig.QUEUE_NAME,
        JobConf.DEFAULT_QUEUE_NAME);
    AccessControlList acl = submitClient.getQueueAdmins(queue);
    conf.set(toFullPropertyName(queue,
        QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

    // removing jobtoken referrals before copying the jobconf to HDFS
    // as the tasks don't need this setting, actually they may break
    // because of it if present as the referral will point to a
    // different job.
    TokenCache.cleanUpTokenReferral(conf);

    if (conf.getBoolean(
        MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
        MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
      // Add HDFS tracking ids
      ArrayList<String> trackingIds = new ArrayList<String>();
      for (Token<? extends TokenIdentifier> t :
          job.getCredentials().getAllTokens()) {
        trackingIds.add(t.decodeIdentifier().getTrackingId());
      }
      conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
          trackingIds.toArray(new String[trackingIds.size()]));
    }

    // Set reservation info if it exists
    ReservationId reservationId = job.getReservationId();
    if (reservationId != null) {
      conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
    }

    // Write job file to submit dir
    //在这里进行权限的写入,等于说到现在了文件还没有开始写入,而是处于准备阶段
    writeConf(conf, submitJobFile);
    
    //
    // Now, actually submit the job (using the submit name)
    //
    //到现在才开始真正开始准备提交作业
    printTokens(jobId, job.getCredentials());
    //在这里创建一个客户端提交作业
    status = submitClient.submitJob(
        jobId, submitJobDir.toString(), job.getCredentials());
    if (status != null) {
      return status;
    } else {
      throw new IOException("Could not launch job");
    }
  } finally {
    if (status == null) {
      LOG.info("Cleaning up the staging area " + submitJobDir);
      if (jtFs != null && submitJobDir != null)
        jtFs.delete(submitJobDir, true);

    }
  }
}



  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //在这里开始尝试获取文件切片
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }


整体流程

真正提交任务之前:

  1. 检查输出路径:是否正确且不存在
  2. 创建作业准备区(或者说资源暂存区)
    1. Local:本地文件系统
    2. Yarn:HDFS文件系统
  3. 和运行集群交互 获取JobID
  4. 设置Job的参数
  5. 上传作业运行需要的资源到暂存区
    1. 依赖 jar、file、archive
    2. 配置文件 job.xml
    3. 逻辑规划文件->切片

最终提交作业

image-20230429085531649

MapReduce

整体流程图

image-20230429085608996

Map阶段流程简述

  1. 待处理目录下所有文件逻辑上被切分为多个Split文件,一个Split被MapTask处理

  2. 通过InputFormat按行读取内容返回<Key,Value>键值对,调用map(用户自己实现的)方法进行处理

    • InputFormat读一行写一行,将数据写入到Running map task中
  3. 将结果交给OutPutCollector输出收集器,对输出结果key进行分区Partition,然后写入内存缓冲区

    • 如果有Reduce结果,那么就需要先进行一个shuffle操作
    • 如果没有Reduce操作,那么就直接调用OutPutCollector
  4. 当缓冲区快要满的时候,将缓冲区的数据一一个临时文件的方式溢写Spill存放到磁盘,溢写是排序Sort

  5. 最终对这个MapTask产生的所有临时文件合并Merge,生成最终的Map正式输出文件

image-20230429092552800

MapTask

在MapReduce程序中,初登场的阶段叫做Map阶段,Map阶段运行的task叫做maptask,MapTask类作为maptask的载体,带哦用的就是类的run方法,开启Map阶段任务.

第一层调用MapTask.run

在MapTask.run方法的第一层调用中,有下面两个重要的代码段

Map阶段任务划分(根据是否有Reduce阶段来决定如何划分)

	//判断当前的task是不是MapTask
    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      // 判断ReduceTask的个数,如果是0就意味着没有Reduce阶段,那么mapper阶段输出就是最终的数据
      // 这样的话就没有必要进行shuffle了
      if (conf.getNumReduceTasks() == 0) {
        //没有Reduce阶段,那么mapper阶段占用100%
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        // 存在Reduce阶段,那么设置Map阶段66.7%,sort阶段33.3%
        mapPhase = getProgress().addPhase("map", 0.667f);
        //为什么要sort?因为要进行shuffle然后发送给ReduceTask
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
	//创建一个TaskReporter并启动通信线程
    TaskReporter reporter = startReporter(umbilical);
	//是否使用新的API,这个选项在Mapred.mapper.new-api控制,一般不需要用户显示设置
	// 但是在提交任务的时候MR框架会自己进行选择
	//默认情况下,使用新的API	,除非特别指定了使用old API
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

	/**
	 * 再提交任务的时候,MR框架会自动进行选择使用什么API,默认情况下都是newAPI,
	 * 除非特别指定了使用old API
	*/
    if (useNewApi) {// 使用新API
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {// 使用旧API
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);

image-20230504163725374

第二层调用(runNewMapper) – 准备阶段

runNewMapper内第一大部分代码为maptask运行的准备部分,其主要逻辑是创建maptask运行时需要的各种对象

  • Input Split 切片信息
  • InputFormat 读取数据组件,实际上调用的RecordReader类,读取数据大多情况下都是Text形式,这种方式底层调用的是LineRecordReader
  • Mapper 处理数据组件 我们创建的Mapper在这里被加载进来,起到真正的作用
  • OutputCollector 输出收集器
  • taskContext、mapperContext 上下文对象

第二层调用(runNewMapper) – 工作阶段

  1. 如何从切片读取数据?

    initialize核心逻辑:根据切片信息读取数据获得输入流in,判断切片是否压缩、压缩是否可切分,并判断自己是否属于第一个切片,如果不是,舍弃第一行数据不处理,最终读取数据的实现在in.readLine方法中

  2. 如何调用map方法处理数据

  3. 如何调用OutputCollector手机map输出的结果

这三个问题采用源码方式进行分析

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
  // make a task context so we can get the classes
  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                getTaskID(),
                                                                reporter);
  // make a mapper
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
  // make the input format
  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
  // rebuild the input split
  org.apache.hadoop.mapreduce.InputSplit split = null;
  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
      splitIndex.getStartOffset());
  LOG.info("Processing split: " + split);

  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
    new NewTrackingRecordReader<INKEY,INVALUE>
      (split, inputFormat, reporter, taskContext);
  
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
  org.apache.hadoop.mapreduce.RecordWriter output = null;
  
  // get an output object
	// 创建outputCollector 用于mapTask处理结果的输出
  if (job.getNumReduceTasks() == 0) {
    output =  // 如果没有reducetask 直接调用最终的输出其
      new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
  } else {
      //如果有reducetask 则需要进行shuffle操作
    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
  }
  // mapper上下文对象
  org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
  mapContext = 
    new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
        input, output,  // 此处的input 就是RecordReader (默认lineRecordReader)
        committer, 
        reporter, split);

  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
      mapperContext = 
        new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
            mapContext);

  // maptask工作干活的核心逻辑
  try {
    // mapTask初始化的时候调用一次,默认是显示LineRecordReader 的initialize方法
      // 里面描述了如何从切片中获取数据
    input.initialize(split, mapperContext);
      // 调用mapper的run方法 开始map阶段处理数据
      // 里面描述了每当LineRecordReader读取一行数据返回KV键值对
      // 就调用一次用户编写的map方法进行一次业务逻辑处理
    mapper.run(mapperContext);
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    input = null;
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}


public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit; // 拿到切片
  Configuration job = context.getConfiguration(); 
  this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);// 一行能够处理的最大长度
  start = split.getStart(); // 要处理的切片的第一个字节的位置
  end = start + split.getLength(); // 切片结束的位置
  final Path file = split.getPath(); // 获取切片的存储位置

  // open the file and seek to the start of the split
  final FileSystem fs = file.getFileSystem(job);
  fileIn = fs.open(file); // 打开切片文件,开始读取数据,返回的是FSDataInputStream输入流
  
  CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); // 获得文件中的编码器
  if (null!=codec) { // 判断是否进行编码压缩,如果不为空,意味着文件被编码了
    isCompressedInput = true;
    decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) { // 判断文件是否可被切分,如果是可切饭的压缩算法
      final SplitCompressionInputStream cIn =
        ((SplittableCompressionCodec)codec).createInputStream(
          fileIn, decompressor, start, end,
          SplittableCompressionCodec.READ_MODE.BYBLOCK);
      in = new CompressedSplitLineReader(cIn, job,
          this.recordDelimiterBytes);
      start = cIn.getAdjustedStart();
      end = cIn.getAdjustedEnd();
      filePosition = cIn;
    } else { // 这里表示压缩的编码算法是不可以被切分的
      if (start != 0) {
        // So we have a split that is only part of a file stored using
        // a Compression codec that cannot be split.
        throw new IOException("Cannot seek in " +
            codec.getClass().getSimpleName() + " compressed stream");
      }
       // 不可以被切分的压缩文件整体由SplitLineReader来进行处理
      in = new SplitLineReader(codec.createInputStream(fileIn,
          decompressor), job, this.recordDelimiterBytes);
      filePosition = fileIn;
    }
  } else {// 来到这里表示文件未进行编码
    fileIn.seek(start);
    in = new UncompressedSplitLineReader(
        fileIn, job, this.recordDelimiterBytes, split.getLength());
    filePosition = fileIn;
  }
  // If this is not the first split, we always throw away first record
  // because we always (except the last split) read one extra line in
  // next() method.
  // 如果当前处理的不是第一个切片,那么就舍弃第一行记录不处理
  if (start != 0) {
    // 读取一行数据 读取的时候会判断用户是否制定了换行符,如果指定使用用户指定的,
    //  如果没有指定就使用默认的
    // 默认的换行符取决于操作系统 Linux: \n  windows \r\n Mac:\r
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  }
  this.pos = start; 
}

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
          // 如果还有下一个输入kv对 继续调用map方法处理,
          // 该map方法通常在用户自定义的mapper中被重写,也就是整个mapper阶段业务逻辑实现的地方
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }



    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      collector = createSortingCollector(job, reporter); // 创建map输出收集器
      // 获取分区的个数,即reducetask的个数
      partitions = jobContext.getNumReduceTasks(); 
      if (partitions > 1) { // 如果分区数大于1,获取分区实现类 默认HashPartitioner
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        // 进入到这里表示分区只有一个,即不需要进行分区操作,所有数据分区计算返回 reduce个数-1
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }

  // 创建一个collector
  private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector.Context context =
      new MapOutputCollector.Context(this, job, reporter);
	// 获取map输出收集器的实现类,如果没有设置,默认实现是MapOutputBuffer
    Class<?>[] collectorClasses = job.getClasses(
      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
    int remainingCollectors = collectorClasses.length;
    Exception lastException = null;
    for (Class clazz : collectorClasses) {
      try {
        // 判断收集器是否为MapOutPutCollector的实现类
        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
          throw new IOException("Invalid output collector class: " + clazz.getName() +
            " (does not implement MapOutputCollector)");
        }
        // 将收集器转换为MapOutputCollector
        Class<? extends MapOutputCollector> subclazz =
          clazz.asSubclass(MapOutputCollector.class);
        LOG.debug("Trying map output collector class: " + subclazz.getName());
        // 通过反射创建出MapOutputColloctor 的对象实例
        MapOutputCollector<KEY, VALUE> collector =
          ReflectionUtils.newInstance(subclazz, job);
        // 数据收集器启动 默认实现:MapperOutputBuffer.init()方法
        collector.init(context);
        LOG.info("Map output collector class = " + collector.getClass().getName());
        return collector;// 返回收集器
      } catch (Exception e) {
        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
        if (--remainingCollectors > 0) {
          msg += " (" + remainingCollectors + " more collector(s) to try)";
        }
        lastException = e;
        LOG.warn(msg, e);
      }
    }

    if (lastException != null) {
      throw new IOException("Initialization of all the collectors failed. " +
          "Error in last collector was:" + lastException.toString(),
          lastException);
    } else {
      throw new IOException("Initialization of all the collectors failed.");
    }
  }

思维导图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/489712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决vue中父组件通过props向子组件传递数据,子组件接收不到

问题&#xff1a;父组件在挂载时向后端发起请求获取数据&#xff0c;然后将获取到的数据传递给子组件&#xff0c;子组件想要在挂载时获取数据&#xff0c;获取不到。 代码示例&#xff1a; //父组件 <template><div><HelloWorld :message"message"…

Mysql数据库中的用户管理与授权

1.登录用户的管理 1.1 查看用户密码的信息 用户信息存放在 mysql 数据库下的 user 表&#xff08;MySQL 服务下存在一个系统自带的 mysql 数据库&#xff09;。 经常使用的查看密码信息的命令&#xff1a; 能看到密码信息&#xff1a;是经过加密后的密码信息 select user…

真题详解(关系模型)-软件设计(六十六)

真题详解(ICMP)-软件设计&#xff08;六十五)https://blog.csdn.net/ke1ying/article/details/130475620 2017年下半年 内存按字节编址&#xff0c;若存储容量为32K*8bit的存储芯片构成地址从A0000H到 DFFFFH的内存&#xff0c;至少需要____片芯片。 解析&#xff1a; DFF…

《Netty》从零开始学netty源码(五十二)之PoolThreadCache

PoolThreadCache Netty有一个大的公共内存容器PoolArena&#xff0c;用来管理从操作系统中获得的内存&#xff0c;在高并发下如果所有线程都去这个大容器获取内存它的压力是非常大的&#xff0c;所以Netty为每个线程建立了一个本地缓存&#xff0c;即PoolThreadCache&#xff…

ReentrantLock实现原理-非公平锁

在线程间通信方式2一节中&#xff0c;我们了解了Lock&#xff0c;Condition和ReentrantLock&#xff0c;学习了简单使用Condition和RentrantLock完成线程间通信&#xff0c;从文章中我们了解到ReentrantLock是Lock接口的一个最常用的实现类&#xff0c;ReentrantLock是独占锁&a…

WEBPACK和ROLLUP构建前端工具库

1. WEBPACK webpack 提供了构建和打包不同模块化规则的库&#xff0c;只是需要自己去搭建开发底层架构。vue-cli&#xff0c;基于 webpack &#xff0c; vue-cli 脚手架工具可以快速初始化一个 vue 应用&#xff0c;它也可以初始化一个构建库。 2. ROLLUP rollup 是一个专门…

prusa2.6.0 树形支撑(有机支撑)Organic体验测试 3d打印及下载失败解决

目前官网没有这个2.6版本&#xff0c;只有2.5.2下载&#xff0c;是没有树形支撑的。如果试用2.6版本&#xff0c;需要从GitHub下载。地址为&#xff1a; https://github.com/prusa3d/PrusaSlicer/releases/tag/version_2.6.0-alpha6 或者点击链接&#xff1a; Release PrusaS…

aop切面调用失效问题排查

应用里有较多的地方访问外部供应商接口&#xff0c;由于公网网络不稳定或者外部接口不稳定&#xff08;重启&#xff0c;发版&#xff0c;ip切换&#xff09;的原因&#xff0c;经常报502或者504错误。为了解决HTTP调用的500报错&#xff0c;选择使用spring的retryable注解进行…

Leetcode292. Nim 游戏

Every day a leetcode 题目来源&#xff1a;292. Nim 游戏 解法1&#xff1a;数学推理 让我们考虑一些小例子。 显而易见的是&#xff0c;如果石头堆中只有一块、两块、或是三块石头&#xff0c;那么在你的回合&#xff0c;你就可以把全部石子拿走&#xff0c;从而在游戏中…

李沐动手学深度学习 v2 实战Kaggle比赛:预测房价

前言 最近学习一些深度学习知识&#xff0c;观看了李沐老师的《动手学深度学习》的视频 练习一下 实战Kaggle比赛&#xff1a;预测房价 巩固一下 前面学习的知识&#xff0c; 不coding一下总感觉什么也没学 陆陆续续调了一天 记录一下 导包 %matplotlib inline import numpy…

计算机网络第二章(谢希仁第八版)

作者&#xff1a;爱塔居 专栏&#xff1a;计算机网络 作者简介&#xff1a;大三学生&#xff0c;希望和大家一起进步 文章目录 目录 文章目录 前言 一、物理层的基本概念 1.1 物理层协议的主要任务 1.2 传输媒体&#xff08;了解&#xff09; 二、传输方式 2.1 串行传输…

第二十八章 Unity射线检测

本章节我们介绍一下射线。射线就是从一个固定点向一个方向发射出一条直线&#xff0c;在发射过程中需要判断该射线有没有与游戏物体发送碰撞。射线既可以用来检测射击游戏中武器指向目标&#xff1b;又可以判断鼠标是否指向游戏物体。射线的创建方式&#xff0c;一般使用代码来…

11. Kubernetes 开章

本章讲解知识点 Kubernetes 概念为什么要使用 KubernetesKubernetes 的部署架构Kubernetes 基本命令本章主要是针对 Kubernetes 基本概念为读者讲解,读者能有一个大概印象即可,不需要过于斟酌细节,针对 Kubernetes 的概念将在后面章节中详细讲解。 1. Kubernetes 概念 我们…

学习Transformer前言(Self Attention Multi head self attention)

一、前言 一直在做项目&#xff0c;也比较懒没有挤出时间去学习新的东西&#xff0c;感觉停滞很久了&#xff0c;好长一段时间都没有新的知识输入&#xff0c;早就需要就去学习transformer了&#xff0c;因此先来学习注意力机制&#xff0c;本文为个人的一个笔记总结。主要是基…

Linux系统编程(三)—— 文件编程(3)进程环境

一、main函数 现在的格式&#xff1a;int main(int argc, char *argv[])以前的main函数有三个参数&#xff0c;另一个参数就是环境变量 二、进程的终止&#xff08;两种都要背下来&#xff09; 2.1 正常终止 &#xff08;1&#xff09;从main函数返回 main函数被称为程序的…

第10章:堆

堆是什么&#xff1f; 堆是一种特殊的完全二叉树。 完全二叉树&#xff1a;每层节点都完全填满&#xff0c;最后一层若是没填满&#xff0c;则只缺少右边的节点。所有的节点都大于等于&#xff08;最大堆&#xff09;或小于等于&#xff08;最小堆&#xff09;它的子节点。jav…

软考——数据结构,算法基础,程序设计语言,法律法规,多媒体基础

数据结构与算法基础 数组与矩阵线性表广义表树与二叉树图排序与查找算法基础及常见算法 数组 稀疏矩阵 直接把&#xff08;0&#xff0c;0&#xff09;带入&#xff0c;排除B&#xff0c;C 将&#xff08;1&#xff0c;1&#xff09;带入&#xff0c;排除D&#xff0c; 最终…

Python | 人脸识别系统 — 博客索引

本博客为人脸识别系统的博客索引 工具安装、环境配置&#xff1a;Python | 人脸识别系统 — 简介 1、UI代码 UI界面设计&#xff1a;Python | 人脸识别系统 — UI界面设计UI事件处理&#xff1a;Python | 人脸识别系统 — UI事件处理 2、用户端代码 用户端博客索引&#xff1a;…

Jupyter Notebook入门教程

Jupyter Notebook&#xff08;又称Python Notebook&#xff09;是一个交互式的笔记本&#xff0c;支持运行超过40种编程语言。本文中我们将介绍Jupyter Notebook的主要特点&#xff0c;了解为什么它能成为人们创造优美的可交互式文档和教育资源的一个强大工具。 首先&#xff…

vue diff算法与虚拟dom知识整理(4) h函数虚拟节点嵌套

那么 先补充上文有一个没强调的点 h函数 当你不需要属性时 其实是可以不传的 例如 我们打开案例 打开 src下的index.js 修改代码如下 import {init,classModule,propsModule,styleModule,eventListenersModule,h,} from "snabbdom";//创建patch函数const patch ini…