读取任务一中序列文件,统计每个用户每天的访问次数,最终将2021/1和2021/2的数据分别输出在两个文件中。
一、创建项目步骤:
1.创建项目
2.修改pom.xml文件
<packaging>jar</packaging>
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency> </dependencies>
<build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> </execution> </executions> </plugin> </plugins> </build>
3.在resources目录下创建日志文件log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=D:\\countLog.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
二、编写程序:
1.自定义键的类型 MemberLogTime 包含两个属性(memberId,memberLogTime) 实现WritableComparable接口
2.编写Mapper模块:(在Mapper中计数器,使用枚举)
package com.maidu.countlog; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /**@author:yt * @since:2024-05-15 * 定义Mapper模块 */ public class LogCountMapper extends Mapper<Text,Text,MemberLogTime, IntWritable> { private MemberLogTime mt =new MemberLogTime(); private IntWritable one =new IntWritable(1); //使用枚举 enum LogCounter{ January, February } @Override protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context) throws IOException, InterruptedException { String memberId =key.toString(); String logTime =value.toString(); //计数器 if(logTime.contains("2021/1")){ //一月计数器值+1 context.getCounter(LogCounter.January).increment(1); }else{ context.getCounter(LogCounter.February).increment(1); } //将用户ID和访问时间存到MemberLogTime对象中 mt.setMemberId(memberId); mt.setLogTime(logTime); context.write(mt,one); } }
3.编写Combiner:
package com.maidu.countlog; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author:yt * @since:2024-05-15 * 合并的类 */ public class LogCoutCombiner extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> { @Override protected void reduce(MemberLogTime key, Iterable<IntWritable> values, Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context) throws IOException, InterruptedException { int sum =0; for(IntWritable val:values){ sum+=val.get(); } context.write(key,new IntWritable(sum)); } }
4.编写Patitioner:
package com.maidu.countlog; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author:yt * @since:2024-05-15 */ public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> { @Override public int getPartition(MemberLogTime memberLogTime, IntWritable intWritable, int numPartitions) { String date = memberLogTime.getLogTime(); if( date.contains( "2021/1") ){ return 0%numPartitions; }else{ return 1%numPartitions; } } }
5.编写Reduce模块:
package com.maidu.countlog; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author:yt * @since:2024-05-15 */ public class LogCountReducer extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> { @Override protected void reduce(MemberLogTime key, Iterable<IntWritable> values, Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context) throws IOException, InterruptedException { //计数器(动态计数器) if(key.getLogTime().contains("2021/1")){ context.getCounter("OuputCounter","JanuaryResult").increment(1); } else{ context.getCounter("OuputCounter","FabruaryResult").increment(1); } int sum =0; for(IntWritable val:values){ sum+=val.get(); } context.write(key,new IntWritable(sum)); } }
6.编写Driver模块:
package com.maidu.countlog; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * @author:yt * @since:2024-05-15 */ public class LogCount { public static void main(String[] args) throws Exception { Configuration conf =new Configuration(); Job job =Job.getInstance(conf,"Log count"); job.setJarByClass(LogCount.class); job.setMapperClass(LogCountMapper.class); job.setReducerClass(LogCountReducer.class); job.setCombinerClass(LogCoutCombiner.class); job.setPartitionerClass(LogCountPartitioner.class); //设置reduce任务数2 job.setNumReduceTasks(2); job.setOutputKeyClass(MemberLogTime.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(SequenceFileAsTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //添加输入路径 FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
7.使用Maven打包为Jar文件,上传到master节点上执行
[yt@master ~]$ hadoop jar countLog-1.0-SNAPSHOT.jar com.maidu.countlog.LogCount /ouput-2301-select/part-m-00000 /logcount/2301/
8.查看运行结果
(1)计数器
(2)结果
(3)查看其中一个文件内容
[yt@master ~]$ hdfs dfs -cat /logcount/2301/part-r-00001