一、源码下载
下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧
Index of /dist/hadoop/core
二、Mapper类
我们先看下我们写的map所继承的Mapper类
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* 传递给 Mapper 实现的 Context
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* 在任务开始时调用一次
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* 为输入分片的每个键/值对调用一次。我们的WordCount就是覆盖了这一点,
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* 在任务结束时调用一次
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* 专家用户可以覆盖此方法,以便对Mapper的执行进行更完整的控制
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
经过该类的注释我们可以得到以下信息:
1、输入的每个 <key,value> 经过Map后会处理成一组<key,value> ,即 <key,values>
2、输入的每个 <key,value> 经过Map后会可以输出 0个、1个、多个 <key,value>
3、Hadoop Map Reduce框架为Job中InputFormat生成的每个InputSplit分配一个MapTask
4、框架首先为InputSplit中的每个 <key,value>调用setup(),然后调用map(),最后调用cleanup()
5、Map输出的key对应的所有value由框架进行分组,并传递给Reduce
6、用户可以通过RawComparator来控制排序和分组
7、Map输出会根据Reduce进行分区,用户可以通过自定义Partitioner来控制哪些key去哪个Reduce
8、用户可以选择设置CombinaterClass指定组合器来聚合Map输出结果,这样会减少Map到Reduce的传送数据量
9、用户可以通过Configuration指定是否压缩、如何压缩中间输出。
10、如果Job只有Map阶段,那么Map会直接写入OutputFormat,且不会按照key排序
11、用户可以覆盖run()来对Map处理进行更大的干预,例如多线程实现
三、MapTask是如何调起的
在上一篇<Hadoop-MapReduce-YarnChild启动篇>博客中已经将了YarnChild的启动,而MapTask就是在它里面被调起的,下面我们来看下YarnChild中的代码
public static void main(String[] args) throws Throwable {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
LOG.debug("Child starting");
//根据job的配置文件构建JobConf
//JobConf是用户描述MapReduce作业到Hadoop框架执行的主要接口。框架试图忠实地执行作业,
//例如我们在WordCount中的main方法中用Job.set**了很多属性,比如Mapper、Reducer的实现类、输出格式、输入输出目录等等
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
//使用JobConf初始化可以避免两次加载conf
Limits.init(job);
UserGroupInformation.setConfiguration(job);
//MAPREDUCE-6565: 需要设置SecurityUtil的配置
SecurityUtil.setConfiguration(job);
String host = args[0];
int port = Integer.parseInt(args[1]);
//创建一个Socket地址
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
long jvmIdLong = Long.parseLong(args[3]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP, jvmIdLong);
CallerContext.setCurrent(
new CallerContext.Builder("mr_" + firstTaskid.toString()).build());
//初始化度量系统
DefaultMetricsSystem.initialize(
StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
//安全框架已将令牌加载到当前ugi中
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens: {}", credentials.getAllTokens());
//创建TaskUmplicalProtocol作为实际任务所有者
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
SecurityUtil.setTokenService(jt, address);
taskOwner.addToken(jt);
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
@Override
public TaskUmbilicalProtocol run() throws Exception {
return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID, address, job);
}
});
//向ApplicationMaster报告non-pid
JvmContext context = new JvmContext(jvmId, "-1000");
LOG.debug("PID: " + System.getenv().get("JVM_PID"));
Task task = null;
UserGroupInformation childUGI = null;
ScheduledExecutorService logSyncer = null;
try {
int idleLoopCount = 0;
JvmTask myTask = null;
//轮询新任务
for (int idle = 0; null == myTask; ++idle) {
long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
LOG.info("Sleeping for " + sleepTimeMilliSecs
+ "ms before retrying again. Got null now.");
MILLISECONDS.sleep(sleepTimeMilliSecs);
myTask = umbilical.getTask(context);
}
if (myTask.shouldDie()) {
return;
}
task = myTask.getTask();
YarnChild.taskid = task.getTaskID();
//创建作业conf并设置凭据
configureTask(job, task, credentials, jt);
//记录系统属性
String systemPropsToLog = MRApps.getSystemPropertiesToLog(job);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
//启动Java VM指标
JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
childUGI = UserGroupInformation.createRemoteUser(System
.getenv(ApplicationConstants.Environment.USER.toString()));
//向新用户添加令牌,使其能够正确执行任务
childUGI.addCredentials(credentials);
//如果在调用任务之前进行了配置,请设置作业类加载器
MRApps.setJobClassLoader(job);
logSyncer = TaskLog.createLogSyncer();
//为doAs块创建对任务的最终引用
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
//使用作业指定的工作目录
setEncryptedSpillKeyIfRequired(taskFinal);
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // 运行Task
return null;
}
});
} catch (FSError e) {
LOG.error("FSError from child", e);
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fsError(taskid, e.getMessage());
}
} catch (Exception exception) {
LOG.warn("Exception running child : "
+ StringUtils.stringifyException(exception));
try {
if (task != null) {
// do cleanup for the task
if (childUGI == null) { // no need to job into doAs block
task.taskCleanup(umbilical);
} else {
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
taskFinal.taskCleanup(umbilical);
return null;
}
});
}
}
} catch (Exception e) {
LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
}
// Report back any failures, for diagnostic purposes
if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fatalError(taskid,
StringUtils.stringifyException(exception), false);
}
}
} catch (Throwable throwable) {
LOG.error("Error running child : "
+ StringUtils.stringifyException(throwable));
if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
Throwable tCause = throwable.getCause();
String cause =
tCause == null ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
umbilical.fatalError(taskid, cause, false);
}
}
} finally {
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
TaskLog.syncLogsShutdown(logSyncer);
}
}
taskFinal.run(job, umbilical) 将此任务作为命名作业的一部分运行。此方法在子进程中执行,并调用用户提供的map、reduce等方法。
四、MapTask运行细节(源码跟读)
1、MapTask
紧跟第三大步的节奏,我们看MapTask.run()
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
//如果没有ReduceTask,就不会有任何reduce。因此,map阶段将控制整个尝试任务的进度
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
//如果有ReduceTask,那么整个尝试任务的进度将在map阶段(67%)和sort阶段(33%)之间分配。
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
//创建TaskReporter并启动通信线程
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
//初始化作业:
// 1、构建Job上下文
// 2、构建尝试任务上下文
// 3、将尝试任务的状态从UNASSIGNED改为RUNNING
// 4、判断是否使用新API
// 5、设置Job输出提交者
// 6、设置任务的输出(这是从将输出到HDFS的每个单独任务的进程中调用的,并且它只是为该任务调用的。对于同一任务,但对于不同的任务尝试,可以多次调用此函数)
// 7、从Job配置中的类名创建根到指定进程的ResourceCalculatorProcessTree并对其进行配置。如果类名为null,此方法将尝试返回可用于此系统的进程树插件。
// 8、更新进程树
// 9、获取自创建进程树以来进程树中所有进程使用的CPU时间(以毫秒为单位)
initialize(job, getJobID(), reporter, useNewApi);
//检查任务类型:cleanupJobTask、jobSetupTask、taskCleanupTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
//判断是否使用新的API来运行不同的Mapper,这里我们走新的API
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
下面我们看runNewMapper()
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
//制作一个任务上下文,以便我们可以获得类
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
//制作一个mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
//制作一个InputFormat
//InputFormat描述Map Reduce作业的输入规范。
//Map Reduce框架依赖作业的InputFormat做以下操作
// 1、验证作业的输入规范
// 2、将输入文件拆分为逻辑的InputSplit,然后将每个InputSplit分配给一个单独的Mapper
// 3、提供RecordReader}实现,用于从InputSplit中收集输入记录,供Mapper处理
//InputSplit的上限是输入文件的块大小,下限是mapred-default.xml中配置的mapreduce.input.fileinputformat.split.minsize的值
//显然,基于输入大小的逻辑拆分对于许多应用程序来说是不够的,因为要尊重记录边界。
//在这种情况下,应用程序还必须实现一个RecordReader负责尊重记录边界,并向单个任务呈现逻辑InputSplit的面向记录的视图
//inputFormat 可以通过 mapreduce.job.inputformat.class指定,默认TextInputFormat.class
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
//重新生成InputSplie
//InputSplit表示要由单个mapper处理的数据
//它在输入上显示一个面向字节的视图,由作业的RecordReader负责将其处理成一个面向记录的视图。
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
//生成RecordReader
//将数据分解为<key,value>,以便输入到mapper
//NewTrackingRecordReader是MapTask的内部类,下面也会用到,在new的时候会设置该类中的RecordReader
//RecordReader real = inputFormat.createRecordReader(split, taskContext);
//通过TextInputFormat.createRecordReader()会得到LineRecordReader
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
//判断该任务是否是跳过任务,并设置给job
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
//RecordWriter负责将<key,value>写入到输出文件
org.apache.hadoop.mapreduce.RecordWriter output = null;
//创建一个RecordWriter
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
//我们看下NewOutputCollector的实例化都做了什么,看第2步
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
//提供给Mapper的上下文。
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
//传递给Mapper实现的Context
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
//在初始化时调用一次
//事件调用的是本类中内部类NewTrackingRecordReader的initialize()
//根据以上的注释,我们可以知道最终走的是LineRecordReader的initialize()
//我们停下来,先看看其中的实现(跳到第3步)
input.initialize(split, mapperContext);
//下面我们看下mapper.run()(跳到第4步)
mapper.run(mapperContext);
//完成此节点,将父节点移动到其下一个子节点
mapPhase.complete();
//设置任务的当前阶段为SORT阶段
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
2、NewOutputCollector
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final MapOutputCollector<K,V> collector;
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
private final int partitions;
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
//创建排序缓冲区,我们看看其内部实现
collector = createSortingCollector(job, reporter);
//ReduceTask的数量就是分区器的数量
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
}
下面看看createSortingCollector()的处理逻辑
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
//可以通过mapreduce.job.map.output.collector.class指定自己的缓冲区实现
//默认是MapOutputBuffer.class
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
//缓冲区必须是MapOutputCollector的实现类
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
//初始化缓冲区,我们看看默认缓冲区MapOutputBuffer的init方法
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
if (lastException != null) {
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was:" + lastException.toString(),
lastException);
} else {
throw new IOException("Initialization of all the collectors failed.");
}
}
下面我们看下MapOutputBuffer的init方法
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//健全性检查
//排序溢写百分比:可以通过mapreduce.map.sort.spill.percent设置,默认0.8
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
//排序缓冲区大小:可以通过mapreduce.task.io.sort.mb设置,默认值为100mb
final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
MRJobConfig.DEFAULT_IO_SORT_MB);
//缓存限制:可以通过mapreduce.task.index.cache.limit.bytes设置,默认1024 * 1024 字节
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
//如果溢写百分比必须在0-1之间
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
//0x7FF = 111 1111 1111
//& 按位与运算:二进制对应位都为1时,结果才为1;否则结果为0
//说明sortmb的最大值为0x7FF 既 2047M
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
//spill用于接受IndexedSortable项的排序算法的接口
//默认采用 QuickSort.class (快排序)
//用户可以通过 map.sort.class 指定 但要实现IndexedSorter接口
sorter = ReflectionUtils.newInstance(job.getClass(
MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
IndexedSorter.class), job);
// 缓冲和记账
//<< 左位移运算:符号位始终保持不变 如果右侧空出位置,则自动填充为 0;超出 32 位的值,则自动丢弃
//比如 默认值 100 左移20 就是 104,857,600 换成16进制就是 0x6400000
int maxMemUsage = sortmb << 20;
//METASIZE = 4*4
//% 求模运算
//如果 用默认值 100 那么此时 maxMemUsage % METASIZE = 104,857,600 % 16 = 0
//maxMemUsage = maxMemUsage 本身
maxMemUsage -= maxMemUsage % METASIZE;
//主输出缓冲器 字节数组 上面为什么是左位移 20 因为 MB 是 byte 的2的20次方 倍 因此是 位移 20 得到 100MB 换算的 字节长度
//1MB = 1024KB
//1KB = 1024Byte
//1MB = 1024 * 1024 Byte = 2的10次方 + 2的10次方 = 2的20次方 Byte
kvbuffer = new byte[maxMemUsage];
//字节数组的长度 ,标记我们应该在缓冲区末尾停止读取的点
bufvoid = kvbuffer.length;
//将字节数组封装到缓冲区中
//新的缓冲区将由给定的字节数组支持;也就是说,对缓冲区的修改将导致数组被修改,反之亦然。
//新缓冲区的容量和限制将为array.length,其位置将为零,其标记将未定义。它的后备数组将是给定的数组,其数组偏移量>将为零。
//java.nio.ByteOrder.nativeOrder()
//检索基础平台的本机字节顺序。
//定义此方法是为了使性能敏感的Java代码可以分配与硬件具有相同字节顺序的直接缓冲区。当使用这样的缓冲区时,本机代码库通常更高效。
//java.nio.ByteBuffer.order()
//修改此缓冲区的字节顺序。
//java.nio.ByteBuffer.asIntBuffer()
//创建此字节缓冲区的视图作为int缓冲区。
//新缓冲区的内容将从此缓冲区的当前位置开始。对该缓冲区内容的更改将在新缓冲区中可见,反之亦然;两个缓冲区的位置、限制和标记值将是独立的。
//新缓冲区的位置将为零,其容量和限制将是该缓冲区中剩余的字节数除以4,其标记将未定义。当且仅当此缓冲区为直接缓冲区时,
//新缓冲区将为直接缓冲区;当且仅当此缓冲区是只读缓冲区时,它将为只读缓冲区。
//返回一个新的int缓冲区
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
//Equator 赤道的意思 这里设置赤道为0
//设置元数据和序列化数据展开的点。元索引与缓冲区对齐,因此元数据永远不会跨越循环缓冲区的末端。
//缓冲区本质还是线性的int数组,但是有了赤道的概念,我们可以把他抽象成环形的,因为赤道也是可以移动的
setEquator(0);
//bufstart 标志着泄漏的开始
//bufend 标志着溢出的开始标志着可收藏的开始
//bufindex 标记收集的结束
//equator 赤道,标记元/序列化的起源
//初始值都是0
bufstart = bufend = bufindex = equator;
//kvstart 标记泄漏元数据的来源
//kvend 标记溢出元数据的结束
//kvindex 标记完全序列化记录的结束
kvstart = kvend = kvindex;
//maxRec = int数组的长度 / 4 = 16字节数组长度
maxRec = kvmeta.capacity() / NMETA;
//软限制 = 容量的 0.8
softLimit = (int)(kvbuffer.length * spillper);
//剩余的缓冲区大小,默认是容量的 0.8 也就是 80M
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization 连续
//RawComparator<K> comparator 直接对字节表进行操作
//获取用于比较密钥的{@link RawComparator}比较器。
//用户可以设置 mapreduce.job.output.key.comparator.class 默认值是 null
//当为null 会再找 mapreduce.map.output.key.class
//获取映射输出数据的键类。如果未设置,请使用(最终)输出键类。这允许映射输出键类与最终输出键类不同。
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
//序列化是通过从<code>conf</code>中读取<code>io.Serializations</code>属性来找到的,该属性是一个逗号分隔的类名列表
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
//BlockingBuffer bb = new BlockingBuffer()
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
//输出统计信息 比如:mapreduce 跑完都会显示读了多少字节写了多少字节数据
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
//压缩
if (job.getCompressMapOutput()) {
//可以通过 mapreduce.map.output.compress.codec 设置 需要实现CompressionCodec接口 默认是 DefaultCodec
//只有map时 默认的压缩类是 GzipCodec.class
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
//合并
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
spillInProgress = false;
//最小为组合溢出的次数 可以通过 mapreduce.map.combine.minspills 设置 默认值 3
//
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
//将此线程标记为守护进程线程或用户线程。当唯一运行的线程都是守护进程线程时,Java虚拟机将退出。
//必须在启动线程之前调用此方法。
//Daemon 守护线程、守护进程、守护程序
spillThread.setDaemon(true);
//将此线程的名称更改为等于参数名称。参数不能为null
//首先,调用此线程的checkAccess方法时不带任何参数。这可能导致引发SecurityException。
spillThread.setName("SpillThread");
spillLock.lock();
try {
//使此线程开始执行溢写操作
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
3、LineRecordReader
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
//一行的最大长度可以通过mapreduce.input.linerecordreader.line.maxlength设置,默认是Integer.MAX_VALUE也就是2的31次方-1
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
//文件中要处理的第一个字节的位置
start = split.getStart();
//end = start + 文件中要处理的字节数 = 既要处理的字节数的最后一个字节的位置
end = start + split.getLength();
//包含此分片数据的文件
final Path file = split.getPath();
//打开文件并查找拆分的开始位置
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
//根据给定文件的文件名后缀查找相应的压缩编解码器。
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
if (start != 0) {
// 因此,我们有一个分割,它只是使用无法分割的压缩编解码器存储的文件的一部分。
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
//如果这不是第一次拆分,我们总是丢弃第一条记录,因为我们总是(除了最后一次拆分)在next()方法中多读取一行。这样就避免了因为根据字节分片导致的数据完整性破坏
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private LongWritable key;
private Text value;
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
//我们总是多读一行,它位于拆分上限之外,即(end-1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
//BOM(Byte Order Mark)既字节序标记,这里需要剔除
//支持UTF-8,我们只需要检查UTF-8 BOM
//判断的标准是在文本流的开头有(0xEF、0xBB、0xBF)
//即使我们为第一行额外读取3个字节,我们也不会改变现有的行为(没有向后不兼容的问题)。
//因为newSize小于maxLineLength,并且复制到Text的字节数始终不大于newSize。
//如果readLine的返回大小不小于maxLineLength,我们将丢弃当前行并读取下一行
newSize = skipUtfByteOrderMark();
} else {
//将InputStream中的一行读取到给定的Text中
//返回读取的字节数,包括找到的(最长的)换行符
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
//pos即为下一行数据开头的字节偏移量,也就是key
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
//该行太长,再试一次
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
//获取当前key:当前行数据开头的字节偏移量
@Override
public LongWritable getCurrentKey() {
return key;
}
//获取当前value:当前行数据
@Override
public Text getCurrentValue() {
return value;
}
4、Mapper
public void run(Context context) throws IOException, InterruptedException {
//可以覆盖该方法在map方法执行前执行一次
setup(context);
try {
while (context.nextKeyValue()) {
//一般我们的程序会覆盖这个方法,让框架调我们的处理逻辑,这样想想:原来框架做了怎么多,我们只写了一个处理方法而已,就可以处理大数据了,单从运用上讲是不是很简单
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
//可以覆盖该方法在map方法执行后执行一次
cleanup(context);
}
}
context.nextKeyValue()、context.getCurrentKey()、context.getCurrentValue()给我们的是什么数据呢?默认的实现在LineRecordReader中,已经在第2步中的代码中写了
下面我们以源码自带的WordCount为例看看其中的map方法
5、WordCount.map()
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
//为指定的字符串构造一个字符串标记化器。标记化器使用默认的分隔符集,
//即"\t\n\r\f":空格字符、制表符、换行符、回车符和换行符。分隔符字符本身不会被视为标记
StringTokenizer itr = new StringTokenizer(value.toString());
//将一行中的所有单词都输出下,数量都给成1,既:
//<word1,1>
//<word2,1>
//......
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
//下面我们看下write的过程,调的是NewOutputCollector.write()
//NewOutputCollector.write()中又调的MapTask类中的collect()
context.write(word, one);
}
}
}
6、MapTask.collect()
collect将<key,value>序列化到中间存储
// k/v 记账信息
private IntBuffer kvmeta; //后备存储上的元数据覆盖
int kvstart; //标记溢写元数据的开始
int kvend; //标记溢写元数据的结束
int kvindex; //标记完全序列化的记录的结束
int equator; //标记元数据和序列化数据的源头,equator是赤道的意思
int bufstart; //标记开始溢写
int bufend; //标记开始收集
int bufmark; //标记记录的结尾
int bufindex; //标记收集的结尾
int bufvoid; //标记我们应该在缓冲区的末尾停止读取的点
byte[] kvbuffer; //主输出缓冲器
private final byte[] b0 = new byte[0];
private static final int VALSTART = 0; //value的偏移
private static final int KEYSTART = 1; //key的偏移
private static final int PARTITION = 2; //分区偏移
private static final int VALLEN = 3; //value长度
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
//溢写记账信息
private int maxRec;
private int softLimit;
boolean spillInProgress;;
int bufferRemaining;
public synchronized void collect(K key, V value, final int partition
) throws IOException {
reporter.progress();
//判断key的类型是否是job启动时设置的类型
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
+ key.getClass().getName());
}
//判断value的类型是否是job启动时设置的类型
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", received "
+ value.getClass().getName());
}
//判断分区的范围是否在正常区间
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
checkSpillException();
bufferRemaining -= METASIZE;
if (bufferRemaining <= 0) {
//如果线程未运行并且已达到软限制,则开始溢出
spillLock.lock();
try {
do {
if (!spillInProgress) {
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
//序列化的、未分页的字节总是位于kvindex和bufindex之间,穿过赤道。
//请注意,重置创建的任何空白空间都必须包含在“已使用”字节中
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
//溢写完成,回收空间
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
//泄漏记录(如有);检查后者,因为元数据对齐可能会影响溢出pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
//为序列化数据保留至少一半的拆分缓冲区,确保kvindex>=bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
//必须持有锁并检查限制之前剩余的字节是三个弧的最小值:
// 元数据空间、序列化空间和软限制
bufferRemaining = Math.min(
//元数据最大值
distanceTo(bufend, newPos),
Math.min(
//序列化最大值
distanceTo(newPos, serBound),
//软限制
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
try {
//将密钥字节序列化到缓冲区
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
//包裹钥匙;必须使连续
bb.shiftBufferedKey();
keystart = 0;
}
//将值字节序列化到缓冲区
final int valstart = bufindex;
valSerializer.serialize(value);
//记录的长度可能为零,即序列化程序将不执行任何写入操作。
//为了确保检查边界条件并保持kvindex不变,请对缓冲区执行零长度写入。
//监控这一点的逻辑可以转移到collect中,但这更干净、更便宜。目前,这是可以接受的。
bb.write(b0, 0, 0);
//该记录必须标记在前一次写入之后,因为该记录的元数据尚未写入
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
//写入记账信息
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
//提升 kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
五、总结
1、YarnChild启动,加载Job相关的配置信息,设置Job工作目录等
2、YarnChild调起MapTask
3、MapTask初始化,比如构建上下文、设置Job输出提交者、设置任务的输出等
4、制作自定义Mapper、InputFormat、RecordReader、RecordWriter(这里会初始化一个环形缓冲区并启动一个线程准备排序、合并、溢写)
5、处理InputFormat(规避因为根据字节分片导致的数据完整性破坏、剔除UTF-8文件的BOM等),处理成map的输入数据<key,value> key为该行数据在文件中的偏移量,value为该行文本数据
6、运行自己编写的Mapper中的map方法
7、溢写线程开始对数据进行排序(默认快排)、合并、溢写