文章目录
一、提出任务 二、任务过程 (一)准备数据 1、在虚拟机上创建文本文件 2、上传文件到HDFS指定目录
(二)实现步骤 1、创建Maven项目 2、添加相关依赖 3、创建日志属性文件 4、创建学生实体类 5、创建学生映射器类 5、创建学生归并器类 6、创建学生驱动器类 7、启动学生驱动器类,查看结果
一、提出任务
二、任务过程
(一)准备数据
启动hadoop服务
1、在虚拟机上创建文本文件
创建sortstudent
目录,在里面创建student.txt
文件
2、上传文件到HDFS指定目录
创建/sortstudent/input
目录 命令:hdfs dfs -mkdir -p /sortstudent/input
将文本文件student.txt
,上传到HDFS的/sortstudent/input
目录
(二)实现步骤
1、创建Maven项目
Maven项目:SortStudent
2、添加相关依赖
在pom.xml
文件里添加hadoop
和junit
依赖
< dependencies>
< ! -- hadoop客户端-- >
< dependency>
< groupId> org. apache. hadoop< / groupId>
< artifactId> hadoop- client< / artifactId>
< version> 3.3 .4 < / version>
< / dependency>
< ! -- 单元测试框架-- >
< dependency>
< groupId> junit< / groupId>
< artifactId> junit< / artifactId>
< version> 4.13 .2 < / version>
< / dependency>
< / dependencies>
3、创建日志属性文件
在resources
目录里创建log4j.properties
文件
log4j. rootLogger= ERROR , stdout, logfile
log4j. appender. stdout= org. apache. log4j. ConsoleAppender
log4j. appender. stdout. layout= org. apache. log4j. PatternLayout
log4j. appender. stdout. layout. ConversionPattern= % d % p [ % c] - % m% n
log4j. appender. logfile= org. apache. log4j. FileAppender
log4j. appender. logfile. File= target/ sortstudent. log
log4j. appender. logfile. layout= org. apache. log4j. PatternLayout
log4j. appender. logfile. layout. ConversionPattern= % d % p [ % c] - % m% n
4、创建学生实体类
在net.xxr.mr
包里创建Student
类 为了让学生按照年龄排序,需要让学生实体类实现一个序列化可比较接口 : WritableComparable
,这个接口有三个抽象方法要我们去实现
package net. xxr. mr ;
import org. apache. hadoop. io. WritableComparable ;
import java. io. DataInput ;
import java. io. DataOutput ;
import java. io. IOException ;
public class Student implements WritableComparable < Student > {
private String name;
private String gender;
private int age;
private String phone;
private String major;
public String getName ( ) {
return name;
}
public void setName ( String name) {
this . name = name;
}
public String getGender ( ) {
return gender;
}
public void setGender ( String gender) {
this . gender = gender;
}
public int getAge ( ) {
return age;
}
public void setAge ( int age) {
this . age = age;
}
public String getPhone ( ) {
return phone;
}
public void setPhone ( String phone) {
this . phone = phone;
}
public String getMajor ( ) {
return major;
}
public void setMajor ( String major) {
this . major = major;
}
@Override
public String toString ( ) {
return "Student{" +
"name='" + name + '\'' +
", gender='" + gender + '\'' +
", age=" + age +
", phone='" + phone + '\'' +
", major='" + major + '\'' +
'}' ;
}
public int compareTo ( Student o) {
return o. getAge ( ) - this . getAge ( ) ;
}
public void write ( DataOutput out) throws IOException {
out. writeUTF ( name) ;
out. writeUTF ( gender) ;
out. writeInt ( age) ;
out. writeUTF ( phone) ;
out. writeUTF ( major) ;
}
public void readFields ( DataInput in) throws IOException {
name = in. readUTF ( ) ;
gender = in. readUTF ( ) ;
age = in. readInt ( ) ;
phone = in. readUTF ( ) ;
major = in. readUTF ( ) ;
}
}
5、创建学生映射器类
在net.xxr.mr
里创建StudentMapper
类
package net. xxr. mr ;
import org. apache. hadoop. io. LongWritable ;
import org. apache. hadoop. io. NullWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Mapper ;
import java. io. IOException ;
public class StudentMapper extends Mapper < LongWritable , Text , Student , NullWritable > {
@Override
protected void map ( LongWritable key, Text value, Context context)
throws IOException , InterruptedException {
String line = value. toString ( ) ;
String [ ] fields = line. split ( " " ) ;
String name = fields[ 0 ] ;
String gender = fields[ 1 ] ;
int age = Integer . parseInt ( fields[ 2 ] ) ;
String phone = fields[ 3 ] ;
String major = fields[ 4 ] ;
Student student = new Student ( ) ;
student. setName ( name) ;
student. setGender ( gender) ;
student. setAge ( age) ;
student. setPhone ( phone) ;
student. setMajor ( major) ;
context. write ( student, NullWritable . get ( ) ) ;
}
}
5、创建学生归并器类
在net.xxr.mr
包里创建StudentReducer
类
package net. xxr. mr ;
import org. apache. hadoop. io. NullWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Reducer ;
import java. io. IOException ;
public class StudentReducer extends Reducer < Student , NullWritable , Text , NullWritable > {
@Override
protected void reduce ( Student key, Iterable < NullWritable > values, Context context)
throws IOException , InterruptedException {
Student student = key;
String studentInfo = student. getName ( ) + "\t"
+ student. getGender ( ) + "\t"
+ student. getAge ( ) + "\t"
+ student. getPhone ( ) + "\t"
+ student. getMajor ( ) ;
context. write ( new Text ( studentInfo) , NullWritable . get ( ) ) ;
}
}
6、创建学生驱动器类
在net.xxr.mr
包里创建StudentDriver
类
package net. xxr. mr ;
import org. apache. hadoop. conf. Configuration ;
import org. apache. hadoop. fs. FSDataInputStream ;
import org. apache. hadoop. fs. FileStatus ;
import org. apache. hadoop. fs. FileSystem ;
import org. apache. hadoop. fs. Path ;
import org. apache. hadoop. io. IOUtils ;
import org. apache. hadoop. io. NullWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Job ;
import org. apache. hadoop. mapreduce. lib. input. FileInputFormat ;
import org. apache. hadoop. mapreduce. lib. output. FileOutputFormat ;
import java. net. URI ;
public class StudentDriver {
public static void main ( String [ ] args) throws Exception {
Configuration conf = new Configuration ( ) ;
conf. set ( "dfs.client.use.datanode.hostname" , "true" ) ;
Job job = Job . getInstance ( conf) ;
job. setJarByClass ( StudentDriver . class ) ;
job. setMapperClass ( StudentMapper . class ) ;
job. setMapOutputKeyClass ( Student . class ) ;
job. setMapOutputValueClass ( NullWritable . class ) ;
job. setReducerClass ( StudentReducer . class ) ;
job. setOutputKeyClass ( Student . class ) ;
job. setOutputValueClass ( NullWritable . class ) ;
String uri = "hdfs://master:9000" ;
Path inputPath = new Path ( uri + "/sortstudent/input" ) ;
Path outputPath = new Path ( uri + "/sortstudent/output" ) ;
FileSystem fs = FileSystem . get ( new URI ( uri) , conf) ;
fs. delete ( outputPath, true ) ;
FileInputFormat . addInputPath ( job, inputPath) ;
FileOutputFormat . setOutputPath ( job, outputPath) ;
job. waitForCompletion ( true ) ;
System . out. println ( "======统计结果======" ) ;
FileStatus [ ] fileStatuses = fs. listStatus ( outputPath) ;
for ( int i = 1 ; i < fileStatuses. length; i++ ) {
System . out. println ( fileStatuses[ i] . getPath ( ) ) ;
FSDataInputStream in = fs. open ( fileStatuses[ i] . getPath ( ) ) ;
IOUtils . copyBytes ( in, System . out, 4096 , false ) ;
}
}
}
7、启动学生驱动器类,查看结果
运行StudentDriver 类 确实学生信息按照年龄降序排列了,但是做了一件我们不需要的去重,少了3条记录 需要修改学生归并器类,遍历值迭代器,这样就不会去重了 再次运行StudentDriver 类