Hadoop积累---Hadoop判断job和map的开始和结束(带源码)

news2024/11/13 10:14:44

在Hadoop中,判断Job和Map任务的开始和结束可以通过多种方式实现。以下是几种常见的方法:

1. 使用Hadoop命令行工具

Hadoop提供了一些命令行工具来监控和查询Job的状态。

1.1 查看Job状态
hadoop job -status <job_id>

这条命令会显示Job的详细信息,包括Job的状态(如RUNNING, SUCCEEDED, FAILED等)。

1.2 查看Map任务状态
hadoop job -list

这条命令会列出所有正在运行的Job及其任务的状态。你可以通过Job ID进一步查询Map任务的状态。

2. 使用Web界面

Hadoop提供了Web UI来监控Job的状态。

2.1 访问JobTracker/ResourceManager Web UI
http://<jobtracker_or_resourcemanager_host>:50030/jobtracker.jsp

在Web UI中,你可以查看所有Job的列表,点击具体的Job可以查看其详细信息和Map任务的进度。

3. 使用API

如果你正在开发Hadoop应用程序,可以通过Hadoop API来监控Job和Map任务的状态。

3.1 使用JobClient
JobClient jobClient = new JobClient(new JobConf());
JobStatus[] jobStatuses = jobClient.getAllJobs();
for (JobStatus jobStatus : jobStatuses) {
    System.out.println("Job ID: " + jobStatus.getJobID() + ", Status: " + jobStatus.getRunState());
}
3.2 使用JobTracker
JobTracker jobTracker = JobTracker.getJobTracker();
JobStatus[] jobStatuses = jobTracker.getAllJobs();
for (JobStatus jobStatus : jobStatuses) {
    System.out.println("Job ID: " + jobStatus.getJobID() + ", Status: " + jobStatus.getRunState());
}

4. 使用Streaming或Pipes

如果你使用的是Hadoop Streaming或Pipes,可以通过标准输入输出流来获取Map任务的状态。

4.1 使用Streaming

Hadoop Streaming允许你使用脚本语言(如Python)编写Map和Reduce任务。你可以通过检查标准输入输出来判断任务的状态。

import sys

def mapper(line):
    # Map任务的逻辑
    pass

def main():
    for line in sys.stdin:
        mapper(line)

if __name__ == "__main__":
    main()

5. 日志文件

Hadoop会在各个节点上生成日志文件,你可以通过查看这些日志文件来判断Job和Map任务的状态。

5.1 访问HDFS上的日志

你可以通过HDFS命令行或HDFS Web UI访问日志文件。

hadoop fs -cat /path/to/logs/*

6. 使用监控工具

除了以上方法,你还可以使用一些第三方的监控工具来监控Hadoop集群的状态,如Ganglia、Nagios、Prometheus等。这些工具能够提供更详细的监控信息,包括Job和Map任务的状态。

7. 总结

  • 命令行工具:使用hadoop job -statushadoop job -list等命令查询Job和Map任务的状态。
  • Web UI:访问JobTracker/ResourceManager的Web界面查看Job和Map任务的进度。
  • API:在应用程序中使用Hadoop API监控Job和Map任务的状态。
  • 日志文件:查看Hadoop生成的日志文件以获取Job和Map任务的状态。
  • 监控工具:使用第三方监控工具提供更详细的监控信息。

通过这些方法,你可以有效地监控Hadoop中的Job和Map任务的开始和结束。

8. 源码

package com.mofang.data.hadoop.report;

import java.text.DateFormat;
import java.util.Date;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.mongodb.core.MongoTemplate;

import com.mofang.data.hadoop.ReportHadoopMain;
import com.mofang.data.hadoop.domain.SpringContextFactory;
import com.mofang.domain.entity.report.ReportResultChangeLogTrace;

public class ReportJob extends Configured implements Tool {

	private AbstractApplicationContext springContenxt = null;
	private ReportResultChangeLogTrace reportResultChangeLogTrace = new ReportResultChangeLogTrace();

	public int run(String[] arg0) throws Exception {
		
		
		System.out.println("job运行开始"+DateFormat.getDateTimeInstance().format(new Date()));
		springContenxt = SpringContextFactory.initSpringContext();
		reportResultChangeLogTrace.setId(UUID.randomUUID().toString());
		reportResultChangeLogTrace.setCreateTime(new Date());
		reportResultChangeLogTrace.setChangeStart(new Date());

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();

		conf.addResource("hadoop.xml"); // 不需要加classpath
		
		conf.set("reportResultChangeLogTraceId",reportResultChangeLogTrace.getId());

		Job job = Job.getInstance(conf);
		job.setJarByClass(ReportHadoopMain.class);

		job.setMapperClass(ReportMapper.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// job.setReducerClass(TraitRetioReudcer.class);
		// job.setNumReduceTasks(8);

		job.setInputFormatClass(NLineInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[1])); // 设置输入路径

		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(otherArgs[2]), true);
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); // 设置输出路径

		NLineInputFormat.setNumLinesPerSplit(job, 5000); // 一个分片5000个barCode
		job.waitForCompletion(true);
		System.out.println("job运行结束"+DateFormat.getDateTimeInstance().format(new Date()));
    	reportResultChangeLogTrace.setChangeEnd(new Date());
    	MongoTemplate logMongoTemplate = (MongoTemplate) springContenxt.getBean("logMongoTemplate");
    	logMongoTemplate.save(reportResultChangeLogTrace);
        springContenxt.close();
		System.exit(0);
		return 0;
	}

}

package com.mofang.data.hadoop.report;


import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.mongodb.core.MongoTemplate;

import com.alibaba.fastjson.JSONObject;
import com.mofang.data.domain.service.LogMonitorService;
import com.mofang.data.domain.service.ReportDataService;
import com.mofang.data.hadoop.domain.SpringContextFactory;
import com.mofang.data.hadoop.util.Heartbeat;
import com.mofang.domain.entity.report.ReportResultChangeLogTraceBarcode;

public class ReportMapper extends Mapper<LongWritable, Text, Text,Text>{
    
    private AbstractApplicationContext  springContenxt=null;
    
    private  Heartbeat heartbeat=null;
    
//    private ReportResultChangeLogTrace reportResultChangeLogTrace=new ReportResultChangeLogTrace();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        springContenxt=SpringContextFactory.initSpringContext();
        heartbeat=Heartbeat.createHeartbeat(context);
//        reportResultChangeLogTrace.setId(UUID.randomUUID().toString());
//        reportResultChangeLogTrace.setCreateTime(new Date());
//        reportResultChangeLogTrace.setChangeStart(new Date());
    }
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException { 
        
        String barcode=value.toString().trim();
        JSONObject json = new JSONObject();
        
        json.put("barCode", barcode);
        ReportDataService reportService = springContenxt.getBean(ReportDataService.class);
        LogMonitorService logMonitorService = springContenxt.getBean(LogMonitorService.class);
        MongoTemplate logMongoTemplate = (MongoTemplate) springContenxt.getBean("logMongoTemplate");
        ReportResultChangeLogTraceBarcode reportResultChangeLogTraceBarcode=new ReportResultChangeLogTraceBarcode();
        reportResultChangeLogTraceBarcode.setBarcode(barcode);
        reportResultChangeLogTraceBarcode.setCreateTime(new Date());
        reportResultChangeLogTraceBarcode.setReportResultChangeLogTraceId(context.getConfiguration().get("reportResultChangeLogTraceId"));
        logMongoTemplate.save(reportResultChangeLogTraceBarcode);
        JSONObject jsonResult = reportService.parseResultTxt(json, barcode, "17");
        logMonitorService.printParseSnpError(jsonResult, barcode); //barCode报告生成状态
        context.write(new Text(barcode), new Text(((String)jsonResult.get("okflag"))));
        
    }
    
    //清空spring缓存,停止心跳服务
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        springContenxt.close();
        if(heartbeat!=null)
            heartbeat.stopBeating();
    }
    
    
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL】MySQL基础知识复习(上)

前言 本篇博客将复习MySQL的基础知识&#xff0c;及着重复习CRUD&#xff08;增删查改&#xff09;操作。 目录 一.MySQL数据库基础知识 1.数据库操作 1.1显示当前的数据库 1.2 创建数据库 1.3 使用数据库 1.4 删除数据库 2.数据类型 2.1.数字类型 2.2字符串类型 2.3…

华为大变革?仓颉编程语言会代替ArkTS吗?

在华为鸿蒙生态系统中&#xff0c;编程语言的选择一直是开发者关注的焦点。近期&#xff0c;华为推出了自研的通用编程语言——仓颉编程语言&#xff0c;这引发了关于仓颉是否会取代ArkTS的讨论。本文将从多个角度分析这两种语言的特点、应用场景及未来趋势&#xff0c;探讨仓颉…

稀硫酸介质中 V 型球阀的材质选择与选型要点-耀圣

稀硫酸介质中 V 型球阀的材质选择与选型要点 在工业生产中&#xff0c;稀硫酸是一种常见的化学介质&#xff0c;对于输送和控制稀硫酸的阀门&#xff0c;正确的材质选择和选型至关重要。本文将介绍稀硫酸介质中 V 型球阀的材质选择&#xff0c;并提供一些选型的要点。 一、稀硫…

昇思大模型平台打卡体验活动:项目3基于MindSpore的GPT2文本摘要

昇思大模型平台打卡体验活动&#xff1a;项目3基于MindSpore的GPT2文本摘要 1. 环境设置 本项目可以沿用前两个项目的相关环境设置。首先&#xff0c;登陆昇思大模型平台&#xff0c;并进入对应的开发环境&#xff1a; https://xihe.mindspore.cn/my/clouddev 接着&#xff0…

定时器输入捕获实验配置

首先&#xff0c;第一个时基工作参数配置 HAL_TIM_IC_Init( ) 还是一样的套路&#xff0c;传参是一个句柄&#xff0c;先定义一个结构体 Instance&#xff1a;指向TIM_TypeDef的指针&#xff0c;表示定时器的实例。TIM_TypeDef是一个包含了定时器寄存器的结构体&#xff0c;用…

计算机视觉读书系列(1)——基本知识与深度学习基础

研三即将毕业&#xff0c;后续的工作可能会偏AI方向的计算机视觉方面&#xff0c;因此准备了两条线来巩固计算机视觉基础。 一个是本系列&#xff0c;阅读经典《Deep Learning for Vision System》&#xff0c;做一些总结跑一些例子&#xff0c;也对应本系列文章 二是OpenCV实…

运维智能化转型:AIOps引领IT运维新浪潮

1. AIOps是什么&#xff1f; AIOps&#xff08;Artificial Intelligence for IT Operations&#xff09;&#xff0c;即人工智能在IT运维中的应用&#xff0c;通过机器学习技术处理运维数据&#xff08;如日志、监控信息和应用数据&#xff09;&#xff0c;解决传统自动化运维…

SkyNet嵌入式系统目标检测实践测试分析

目标检测和跟踪对于资源受限的嵌入式系统来说是具有挑战性的任务。尽管这些任务是人工智能领域中计算量最大的任务之一&#xff0c;但它们在嵌入式设备上只能使用有限的计算和内存资源。与此同时&#xff0c;这种资源受限的实现通常需要满足额外的苛刻要求&#xff0c;如实时响…

「OC」SDWebimage的学习

「OC」SDWebimage的学习 前言 在知乎日报这个项目之中&#xff0c;我在很多情况下都会进行图片资源的网络申请。通过上网搜索我了解到了SDWebimage这个功能丰富的第三方库&#xff0c;进行了较为浅层的学习。因为SDWebimage这个库之中的相关内容还是较为多且复杂的&#xff0…

SIwave:释放 SIwizard 求解器的强大功能

SIwave 是一种电源完整性和信号完整性工具。SIwizard 是 SIwave 中 SI 分析的主要工具&#xff0c;也是本博客的主题。 SIwizard 用于研究 RF、clock 和 control traces 的信号完整性。该工具允许用户进行瞬态分析、眼图分析和 BER 计算。用户可以将 IBIS 和 IBIS-AMI 模型添加…

Kafka 可观测性最佳实践

Kafka 概述 Kafka 是由 LinkedIn 开发一个分布式的基于发布订阅模式的消息队列&#xff0c;是一个实时数据处理系统&#xff0c;可以横向扩展。与 RabbitMQ、RockerMQ 等中间件一样拥有几大特点&#xff1a; 异步处理服务解耦流量削峰 监控 Kafka 是非常重要的&#xff0c;因…

342--358作业整理(错误 + 重点)

目录 1. 在需要运行的类中 定义 main 方法 2. this 。访问逻辑&#xff1a;先访问本类中&#xff0c;再访问父类中可以访问的成员&#xff08;不包括和本类中重名的成员&#xff09; 3. super 。访问逻辑&#xff1a;super&#xff08;父类对象&#xff09;直接访问父类及以…

Android自启动管控

1. 自启动管控需求来源 自启动、关联启动、交叉启动、推送启动等现象的泛滥除了对个人信息保护带来隐患外&#xff0c;还会导致占用过多的系统CPU和内存资源&#xff0c;造成系统卡顿、发热、电池消耗过快&#xff1b;还可能引入一些包含“恶意代码”的进程在后台隐蔽启动&…

智能的编织:C++中auto的编织艺术

在C的世界里&#xff0c;auto这个关键字就像是一个聪明的助手&#xff0c;它能够自动帮你识别变量的类型&#xff0c;让你的代码更加简洁和清晰。下面&#xff0c;我们就来聊聊auto这个关键字的前世今生&#xff0c;以及它在C11标准中的新用法。 auto的前世 在C11之前&#x…

函数式编程Stream流(通俗易懂!!!)

重点&#xff1a;只关注传入的参数列表和方法体&#xff08;数据操作&#xff09; 1.Lambda表达式 本质是匿名内部类的优化&#xff0c;先写匿名内部类 1.1 基本用法 public class lambdaTest {public static void main(String[] args) { // int i calculateNum((…

C#里对数组的排序操作

一般情况下是采用 Array.Sort(a) 来进行排序。 例子代码如下: /** C# Program to Sort a String using Predefined Function*/ using System; class linSearch {public static void Main(){Console.WriteLine("Enter Number of Elements you Want to Hold in the Arra…

算法每日双题精讲——双指针(移动零,复写零)

&#x1f31f;快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 &#x1f31f; 别再犹豫了&#xff01;快来订阅我们的算法每日双题精讲专栏&#xff0c;一起踏上算法学习的精彩之旅吧&#xff01;&#x1f4aa;…

【Android】View—基础知识,滑动,弹性滑动

基础知识 什么是View 在 Android 中&#xff0c;View 是用户界面&#xff08;UI&#xff09;中的基本组件&#xff0c;用于绘制图形和处理用户交互。所有的 UI 组件&#xff08;如按钮、文本框、图片等&#xff09;都是 View 的子类。可以说&#xff0c;View 是构建 Android …

【Unity】Game Framework框架学习使用

前言 之前用过一段时间的Game Framework框架&#xff0c;后来有那么一段时间都做定制小软件&#xff0c;框架就没再怎么使用了。 现在要做大型项目了&#xff0c;感觉还是用框架好一些。于是又把Game Framework拾起来了。 这篇文章主要是讲Game Framework这个框架是怎么用的…