Hadoop的MapReduce计算框架
概述
- MapReduce计算框架是一种计算框架,用于计算处理大规模的数据集,他将数据分成小块,然后在集群中的多个节点上并行处理这些块
- MapReduce框架是由两个组件组成:Map和Reduce
- Map任务将输入数据分解成键值对,然后将这些键值对传递给Reduce任务进行处理
- Reduce任务将相同的所有值组合在一起,并将它们转换为输出键值对
- 这种分布式计算框架可以在大规模数据集上高效地运行,因为它可以利用集群中的多个节点并行处理数据。
MapReduce是一种分布式计算模型,用于处理大规模数据集。它将数据分成多个小块,然后在多个计算节点上并行处理这些小块。MapReduce的工作原理如下:
-
Map阶段:将输入数据分成多个小块,然后在多个计算节点上并行处理这些小块。每个计算节点都会执行Map函数,将输入数据转换成键值对。
-
Shuffle阶段:将Map函数的输出结果按照键进行分组,然后将同一组的键值对发送到同一个Reduce节点上。
-
Reduce阶段:将同一组的键值对发送到同一个Reduce节点上,然后在该节点上执行Reduce函数,将同一组的键值对合并成一个结果。
-
输出结果:将所有Reduce节点的输出结果合并成一个最终结果。
特点
- 可扩展性
- MapReduce框架可以在大规模数据集上高效地运行,因为它可以利用集群中的多个节点并行处理数据。
- 容错性
- MapReduce框架具有容错机制,可以处理节点和数据丢失等问题
- 简单性
- MapReduce框架的编程模型相对简单,可以轻松地编写Map和Reduce任务
- 适用性
- MapReduce框架适用于各种类型地数据处理任务,包含文本处理,图像处理,机器学习等。
优缺点
优点:
- 可以处理大规模的数据集,具有良好的可扩展的容错性,
- 可以在分布式的环境下运行,可以利用集群中的多台计算机进行计算,提高计算效率
- 可以通过MapReduce编程模型进行编程,简化了分布式计算的开发难度
- 可以与HDFS文件系统和YARN资源管理系统配合使用,构建完整的分布式计算平台
缺点:
- MapReduce编程模型对于一些复杂的计算任务来说,可能不太适合
- MapReduce编程模型需要将计算任务分成Map阶段和Reduce阶段,可能会导致计算效率的降低
- MapReduce编程模型需要将数据写入磁盘,可能会导致I/O开销较大
进程
MapReduce进程包括两个主要的进程:JobTracker进程和TaskTracker进程,
- JobTracker进程负责接收客户端提交的MapReduce任务,并将任务分配给TaskTracker进程执行;
- TaskTracker进程负责执行MapReduce任务,并将执行结果返回给JobTracker进程。
- JobTracker进程和TaskTracker进程都运行在Hadoop集群中的计算节点上,它们之间通过心跳机制进行通信,以保证任务的正确执行。
JobTracker进程:
- JobTracker进程是MapReduce进程中的主进程,负责接收客户端提交的MapReduce任务,并将任务分配给TaskTracker进程执行。
- JobTracker进程运行在Hadoop集群中的一台计算节点上,它通过心跳机制与TaskTracker进程进行通信,以监控任务的执行情况。
- JobTracker进程还负责维护任务的元数据信息,包括任务的状态、进度、日志等。
TaskTracker进程
- TaskTracker进程是MapReduce进程中的工作进程,负责执行MapReduce任务,并将执行结果返回给JobTracker进程。
- TaskTracker进程运行在Hadoop集群中的计算节点上,它通过心跳机制与JobTracker进程进行通信,以接收任务的分配和监控任务的执行情况。
- TaskTracker进程还负责维护任务的本地数据,包括输入数据、输出数据、中间结果等。
常见的配置
- 输入格式:MapReduce框架支持多种输入格式,包括文本、序列化、Hadoop Archives等。可以根据数据类型选择合适的输入格式。
- 输出格式:MapReduce框架支持多种输出格式,包括文本、序列化、Hadoop Archives等。可以根据需要选择合适的输出格式。
- Map任务数:MapReduce框架默认将输入数据分成64MB的块,并为每个块分配一个Map任务。可以根据数据大小和集群规模调整Map任务数。
- Reduce任务数:Reduce任务数的数量通常与集群中的节点数相同。可以根据集群规模和数据大小调整Reduce任务数。
- Combiner函数:Combiner函数是一种在Map任务输出之前对Map任务输出进行合并的函数。可以使用Combiner函数来减少Map任务输出的数据量,从而提高MapReduce框架的效率。
- Partitioner函数:Partitioner函数是一种将Map任务输出分配到Reduce任务的函数。可以根据键的哈希值将Map任务输出分配到不同的Reduce任务中。
- 缓存文件:MapReduce框架支持将文件缓存在内存中,以便在Map和Reduce任务中使用。可以使用DistributedCache类将文件缓存到集群中的节点上。
- 压缩:MapReduce框架支持对输入和输出数据进行压缩,以减少数据传输的大小。可以使用Gzip、Bzip2等压缩算法对数据进行压缩。
- 计数器:MapReduce框架支持计数器,用于记录任务的进度和状态。可以使用计数器来监视任务的执行情况。
WordCount编程
本地测试
需求:在给定的文件中统计输出的每个单词出现的总次数
package com.sin.mapRedice;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @CreateName SIN
* @CreateDate 2023/05/26 11:33
* @description
*/
public class WordCount {
/**
* Mapper:将输入键/值映射到一组中间键/值对中
*/
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//创建IntWritable对象
private final static IntWritable one = new IntWritable(1);
//创建Text对象
private Text word = new Text();
/**
* 对输入分割中的每一个键/值调用一次
* @param key 键
* @param value 值
* @param context 上下文
* @throws IOException
* @throws InterruptedException
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将输入的文本数据转换为StringToKenizer对象
StringTokenizer itr = new StringTokenizer(value.toString());
//遍历StringTokenizer对象中的每一个单词
while (itr.hasMoreTokens()) {
//将当前单词设置为Text类型的值
word.set(itr.nextToken());
//将当前单词和IntWritable对象写入上下文中
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/**
* 接收键值对列表,将相同键的值相加并输出
* @param key 键
* @param values 值的迭代器
* @param context 上下文
* @throws IOException
* @throws InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//定义一个变量sum,用来存储values中的所有元素的和
int sum = 0;
//遍历values中的所有元素
for (IntWritable val : values) {
//将元素的值添加到sum中
sum += val.get();
}
//将相加后的结果设置为IntWritable对象的值
result.set(sum);
// 将 key 和 sum 作为输出写入 context
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// 创建Configuration对象
// 配置由资源指定,资源包含一组名称/值对作为xml数据,
// 每个资源都是由字符串或者路径命名
// 如果使用String命名,则检查类路径以查找具有改名称的文件
// 如果以Path命名,则直接检查本地文件系统,而不引用类路径
Configuration conf = new Configuration();
/**
* 创建job对象
* 任务提交者对任务的视图
* 它允许用户配置任务,提交任务,控制其执行和查询状态,
*/
//获取实例
Job job = Job.getInstance(conf, "word count");
//设置jar包
job.setJarByClass(WordCount.class);
//关联mapper和reducer
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径
FileInputFormat.addInputPath(job, new Path("F:\\input"));
//设置输出路径
FileOutputFormat.setOutputPath(job, new Path("F:\\output\\outputword"));
//提交任务并等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
测试异常
解决java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
解决Exception in thread “main” java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
return后面删了。然后改为true,但是这里是只读文件不能修改,所以创建一个类就行
复制包路径
创建NativeIO类并修改access方法