5.MapReduce
5.1Linux中安装IDEA
IDEA官网:https://www.jetbrains.com.cn/idea/
点击右上角的下载
选择Linux进行下载压缩包
下载完成后找到压缩包并解压压缩包到当前目录下
tar -zxvf ideaIC-2024.1.4.tar.gz
运行idea
./idea.sh
5.2配置开发环境
1.mapreduce简介
mapreduce 的主要组成部分:Mapper 和 Reducer。
Mapper: 负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。
Reducer: 负责对map阶段的结果进行汇总。
2.开发环境准备
1)打开idea通过NewProject创建maven工程
点击create等待创建完成
新建Module
点击create创建
导入相关依赖包
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
刷新可以看到有相关依赖
删除mapreduce中默认的包然后重新创建
导入集群的配置文件
在main目录下创建一个resources目录,并设置为资源目录,将集群的core-site.xml和hdfs-site.xml放到resources目录下,目的是为了能识别hadoop集群
点击resources
copy配置文件到resources下
#将集群两个文件,下载到远程桌面服务器所在的桌面上
scp /usr/local/hadoop/etc/hadoop/hdfs-site.xml root@11.106.67.7:/headless/Desktop
scp /usr/local/hadoop/etc/hadoop/core-site.xml root@11.106.67.7:/headless/Desktop
copy到idea的resources下
5.3编写单词统计案例
1)自定义的mapreduce类
2)继承Mapper类,实现map函数
3)继承Reducer类,实现reduce函数
4)main()中设置Job相关信息 和 提交Job运行
在wordcount包下新建class类
需要分别编写下面三个class
1.Mapper代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
System.out.println("map task start ...");
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
2.Reducer代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
System.out.println("reducer task start ...");
int sum = 0;
for (IntWritable value : values) {
int count = value.get();
sum+=count;
}
context.write(key, new IntWritable(sum));
}
}
3.Driver类代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wordcount");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path("hdfs://ns1/word/words.txt"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://ns1/wcresult"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
然后执行Driver类的main方法
返回0表示执行成功
每读取1行数据执行一次map方法,一共是8行,所以执行8次map方法
执行完map方法后框架会自动进行聚合操作,也就是会进行如hello [1 1 1 1 1 1]、hadoop[1 1 1 1 1]...这样的聚合操作,聚合后再进行reduce处理,由于一共有7个不同的单词,所以reduce方法调用7次
最后查看hdfs中存放的结果
5.4单词统计案例代码完善
1.设置运行参数
设置程序执行参数,输入输出路径不用在代码中写死。
run ---> Edit Configurations. 进入设置参数界面
2.增加自动删除目录方法
由于在mapredue任务执行前需要删除输出目录。否则再次执行MR任务,造成输出目录重复则会报错
添加如下方法
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
3.增加输出日志详细信息
添加log4j所需的jar包到pom.xml中
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
resources资源文件夹中新建log4j.properties文件添加如下内容
#设定控制台和指定目录日志都按照info级别来打印输出
log4j.rootLogger=info,console,HFILE
#日志输出到指定目录
log4j.appender.HFILE=org.apache.log4j.RollingFileAppender
#输出路径自己设置
log4j.appender.HFILE.File=/opt/hadoop_logs/log.log
log4j.appender.HFILE.MaxFileSize=30mb
log4j.appender.HFILE.MaxBackupIndex=20
log4j.appender.HFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.HFILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %l %t %r %c: %m%n
#日志输出到控制台
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n
5.5Mapreduce 原理
1.切块和切片
切块是存储在hdfs中的数据块,是真实存在的;而切片是逻辑层面的,是为了计算maptask个数的,有多少个切片就有多少个maptask任务,之所以要有切片的概念,主要是为了确定maptask任务个数,节约资源
切块≠切片,比如现在有一个数据块的大小是260M,hdfs默认每128M切割,也就是会切割产生128M,128M,4M三个切块,但是切片实际只有两个,分别是128M,132M,切片的产生是通过框架自动计算出来的,由于第三个数据块只有4M,太小了,所以会和其中一个128M合并一起由一个maptask进行处理,这样做的目得是为了节约资源,如果4M单独开一个maptask进行处理的话,会额外消耗很多cpu和内存,很不划算,所以会把这4M的数据远程copy一份给maptask所在的服务器和另外的128M交给同一个maptask一起处理,这样虽然产生了一部分额外的远程I/O,但是比起来多起一个maptask要划算很多,hadoop集群为了节省开销,会这样来处理。集群的默认配置是1.1,所以如果按照128M来切割的话,如果最后一份数据的大小<12.8M的话,会把这部分数据跟其中的一份128M进行合并由一个maptask进行处理;如果数据的大小>12.8M的话,这部分数据会单独进行处理,也就是会单独开一个maptask来处理这部分数据
2.MapReduce灵魂shuffle流程
一个Reducer的情况
多个Reducer的情况(2个为例)
有多少个reducer将来就有多少个输出文件,一般reducer的数量和分区的数量是一致的。
笔记参考:http://hainiubl.com/topics/75967