在Hadoop中,判断Job和Map任务的开始和结束可以通过多种方式实现。以下是几种常见的方法:
1. 使用Hadoop命令行工具
Hadoop提供了一些命令行工具来监控和查询Job的状态。
1.1 查看Job状态
hadoop job -status <job_id>
这条命令会显示Job的详细信息,包括Job的状态(如RUNNING, SUCCEEDED, FAILED等)。
1.2 查看Map任务状态
hadoop job -list
这条命令会列出所有正在运行的Job及其任务的状态。你可以通过Job ID进一步查询Map任务的状态。
2. 使用Web界面
Hadoop提供了Web UI来监控Job的状态。
2.1 访问JobTracker/ResourceManager Web UI
http://<jobtracker_or_resourcemanager_host>:50030/jobtracker.jsp
在Web UI中,你可以查看所有Job的列表,点击具体的Job可以查看其详细信息和Map任务的进度。
3. 使用API
如果你正在开发Hadoop应用程序,可以通过Hadoop API来监控Job和Map任务的状态。
3.1 使用JobClient
JobClient jobClient = new JobClient(new JobConf());
JobStatus[] jobStatuses = jobClient.getAllJobs();
for (JobStatus jobStatus : jobStatuses) {
System.out.println("Job ID: " + jobStatus.getJobID() + ", Status: " + jobStatus.getRunState());
}
3.2 使用JobTracker
JobTracker jobTracker = JobTracker.getJobTracker();
JobStatus[] jobStatuses = jobTracker.getAllJobs();
for (JobStatus jobStatus : jobStatuses) {
System.out.println("Job ID: " + jobStatus.getJobID() + ", Status: " + jobStatus.getRunState());
}
4. 使用Streaming或Pipes
如果你使用的是Hadoop Streaming或Pipes,可以通过标准输入输出流来获取Map任务的状态。
4.1 使用Streaming
Hadoop Streaming允许你使用脚本语言(如Python)编写Map和Reduce任务。你可以通过检查标准输入输出来判断任务的状态。
import sys
def mapper(line):
# Map任务的逻辑
pass
def main():
for line in sys.stdin:
mapper(line)
if __name__ == "__main__":
main()
5. 日志文件
Hadoop会在各个节点上生成日志文件,你可以通过查看这些日志文件来判断Job和Map任务的状态。
5.1 访问HDFS上的日志
你可以通过HDFS命令行或HDFS Web UI访问日志文件。
hadoop fs -cat /path/to/logs/*
6. 使用监控工具
除了以上方法,你还可以使用一些第三方的监控工具来监控Hadoop集群的状态,如Ganglia、Nagios、Prometheus等。这些工具能够提供更详细的监控信息,包括Job和Map任务的状态。
7. 总结
- 命令行工具:使用
hadoop job -status
和hadoop job -list
等命令查询Job和Map任务的状态。 - Web UI:访问JobTracker/ResourceManager的Web界面查看Job和Map任务的进度。
- API:在应用程序中使用Hadoop API监控Job和Map任务的状态。
- 日志文件:查看Hadoop生成的日志文件以获取Job和Map任务的状态。
- 监控工具:使用第三方监控工具提供更详细的监控信息。
通过这些方法,你可以有效地监控Hadoop中的Job和Map任务的开始和结束。
8. 源码
package com.mofang.data.hadoop.report;
import java.text.DateFormat;
import java.util.Date;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.mongodb.core.MongoTemplate;
import com.mofang.data.hadoop.ReportHadoopMain;
import com.mofang.data.hadoop.domain.SpringContextFactory;
import com.mofang.domain.entity.report.ReportResultChangeLogTrace;
public class ReportJob extends Configured implements Tool {
private AbstractApplicationContext springContenxt = null;
private ReportResultChangeLogTrace reportResultChangeLogTrace = new ReportResultChangeLogTrace();
public int run(String[] arg0) throws Exception {
System.out.println("job运行开始"+DateFormat.getDateTimeInstance().format(new Date()));
springContenxt = SpringContextFactory.initSpringContext();
reportResultChangeLogTrace.setId(UUID.randomUUID().toString());
reportResultChangeLogTrace.setCreateTime(new Date());
reportResultChangeLogTrace.setChangeStart(new Date());
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();
conf.addResource("hadoop.xml"); // 不需要加classpath
conf.set("reportResultChangeLogTraceId",reportResultChangeLogTrace.getId());
Job job = Job.getInstance(conf);
job.setJarByClass(ReportHadoopMain.class);
job.setMapperClass(ReportMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setReducerClass(TraitRetioReudcer.class);
// job.setNumReduceTasks(8);
job.setInputFormatClass(NLineInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1])); // 设置输入路径
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(otherArgs[2]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); // 设置输出路径
NLineInputFormat.setNumLinesPerSplit(job, 5000); // 一个分片5000个barCode
job.waitForCompletion(true);
System.out.println("job运行结束"+DateFormat.getDateTimeInstance().format(new Date()));
reportResultChangeLogTrace.setChangeEnd(new Date());
MongoTemplate logMongoTemplate = (MongoTemplate) springContenxt.getBean("logMongoTemplate");
logMongoTemplate.save(reportResultChangeLogTrace);
springContenxt.close();
System.exit(0);
return 0;
}
}
package com.mofang.data.hadoop.report;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.mongodb.core.MongoTemplate;
import com.alibaba.fastjson.JSONObject;
import com.mofang.data.domain.service.LogMonitorService;
import com.mofang.data.domain.service.ReportDataService;
import com.mofang.data.hadoop.domain.SpringContextFactory;
import com.mofang.data.hadoop.util.Heartbeat;
import com.mofang.domain.entity.report.ReportResultChangeLogTraceBarcode;
public class ReportMapper extends Mapper<LongWritable, Text, Text,Text>{
private AbstractApplicationContext springContenxt=null;
private Heartbeat heartbeat=null;
// private ReportResultChangeLogTrace reportResultChangeLogTrace=new ReportResultChangeLogTrace();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
springContenxt=SpringContextFactory.initSpringContext();
heartbeat=Heartbeat.createHeartbeat(context);
// reportResultChangeLogTrace.setId(UUID.randomUUID().toString());
// reportResultChangeLogTrace.setCreateTime(new Date());
// reportResultChangeLogTrace.setChangeStart(new Date());
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String barcode=value.toString().trim();
JSONObject json = new JSONObject();
json.put("barCode", barcode);
ReportDataService reportService = springContenxt.getBean(ReportDataService.class);
LogMonitorService logMonitorService = springContenxt.getBean(LogMonitorService.class);
MongoTemplate logMongoTemplate = (MongoTemplate) springContenxt.getBean("logMongoTemplate");
ReportResultChangeLogTraceBarcode reportResultChangeLogTraceBarcode=new ReportResultChangeLogTraceBarcode();
reportResultChangeLogTraceBarcode.setBarcode(barcode);
reportResultChangeLogTraceBarcode.setCreateTime(new Date());
reportResultChangeLogTraceBarcode.setReportResultChangeLogTraceId(context.getConfiguration().get("reportResultChangeLogTraceId"));
logMongoTemplate.save(reportResultChangeLogTraceBarcode);
JSONObject jsonResult = reportService.parseResultTxt(json, barcode, "17");
logMonitorService.printParseSnpError(jsonResult, barcode); //barCode报告生成状态
context.write(new Text(barcode), new Text(((String)jsonResult.get("okflag"))));
}
//清空spring缓存,停止心跳服务
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
springContenxt.close();
if(heartbeat!=null)
heartbeat.stopBeating();
}
}