文章目录
- 一、源代码
- 1.UserSaleMapper类
- 2. UserSaleReducer类
- 3. UserSaleDriver类
- 4.pom.xml
- 二、执行结果
指导参考图:
一、源代码
1.UserSaleMapper类
package org.example.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.example.model.UserSale;
import java.io.IOException;
public class UserSaleMapper extends Mapper<LongWritable, Text,Text, UserSale> {
//map阶段输出的key类型
private Text outputKey =new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行文本数据
String line = value.toString();
//根据分隔符进行数据拆分
String[] split = line.split( ",");
//根据下标获取数据
String saleld = split[0] ;
String userName = split[1];
String goodsName = split[2];
String price = split[3] ;
String saleCount = split[4];
//对对象封装
UserSale userSale = new UserSale();
userSale.setSaleId(Integer. parseInt(saleld));
userSale.setUserName (userName);
userSale.setGoodsName(goodsName);
userSale.setPrice(Integer.parseInt(price));
userSale.setSaleCount(Integer.parseInt(saleCount));
userSale.setTotalPrice(Integer.parseInt(price)*Integer.parseInt(saleCount));
//封装输出的key
outputKey.set (userName);
//map阶段进行输出
context.write(outputKey, userSale);
}
}
2. UserSaleReducer类
package org.example.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.example.model.UserSale;
import java.io.IOException;
public class UserSaleReducer extends Reducer<Text, UserSale,Text,UserSale> {
//创建reducer阶段输出的对象
private UserSale outputUser = new UserSale();
@Override
protected void reduce(Text key, Iterable<UserSale> values, Context context) throws IOException, InterruptedException {
Integer totalPrice = 0;
//遍历集合,进行总金额的累加
for (UserSale userSale : values){
totalPrice += userSale.getTotalPrice();
}
//对输出对象进行封装
outputUser.setTotalPrice(totalPrice);
// reduce阶段输出
context.write(key,outputUser);
}
}
3. UserSaleDriver类
package org.example.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.example.model.UserSale;
import java.io.IOException;
public class UserSaleDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1获取配置对象和job对象
Configuration conf = new Configuration();
Job job = Job. getInstance(conf);
// 2关联Driver类
job.setJarByClass(UserSaleDriver.class);
// 3关联mapper和reducer类
job.setMapperClass(UserSaleMapper.class);
job.setReducerClass(UserSaleReducer.class);
// 4设置mapper阶段输出的k, v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UserSale.class);
// 5设置最终输出的k, v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(UserSale.class);
// 6设置文件的输入路径和输出路径
FileInputFormat. setInputPaths(job, new Path( "D:\\Documentation\\sale_detail.txt"));
// 注意:输出目录需不存在,填入已存在的目录会报错
FileOutputFormat. setOutputPath(job, new Path( "D:\\Documentation\\output3"));
// 7提交任务
boolean result = job. waitForCompletion(true) ;
System.out. println(result?"任务执行成功":"任务执行失败");
}
}
4.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mapreduce_demo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>mapreduce_demo</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>