Spark 对hadoopnamenode-log文件进行数据清洗并存入mysql数据库

news2025/1/15 7:11:46

一.查找需要清洗的文件

1.1查看hadoopnamenode-log文件位置

 1.2 开启Hadoop集群和Hive元数据、Hive远程连接

具体如何开启可以看我之前的文章:(10条消息) SparkSQL-liunx系统Spark连接Hive_难以言喻wyy的博客-CSDN博客

 1.3 将这个文件传入到hdfs中:

hdfs dfs -put hadoop-root-namenode-gree2.log   /tmp/hadoopNamenodeLogs/hadooplogs/hadoop-root-namenode-gree2.log

二.日志分析

将里面部分字段拿出来分析:

 2023-02-10 16:55:33,123 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
2023-02-10 16:55:33,195 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: createNameNode []
2023-02-10 16:55:33,296 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2023-02-10 16:55:33,409 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).

可以看出其可以以INFO来作为中间字段,用indexof读取出该位置索引,以截取字符段的方式来将清洗的数据拿出。

三.代码实现

3.1 对数据进行清洗

object hadoopDemo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("HadoopLogsEtlDemo").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    import spark.implicits._
    import org.apache.spark.sql.functions._
//    TODO 根据INFO这个字段来对数据进行封装到Row中。
    val row: RDD[Row] = sc.textFile("hdfs://192.168.61.146:9000/tmp/hadoopNamenodeLogs/hadooplogs/hadoop-root-namenode-gree2.log")
      .filter(x => {
        x.startsWith("2023")
      })
      .map(x => {
        val strings: Array[String] = x.split(",")
        val num1: Int = strings(1).indexOf(" INFO ")
        val num2: Int = strings(1).indexOf(":")

        if(num1!=(-1)){
          val str1: String = strings(1).substring(0, num1)
          val str2: String = strings(1).substring(num1 + 5, num2)
          val str3: String = strings(1).substring(num2 + 1, strings(1).length)
          Row(strings(0), str1, "INFO",str2, str3)
        }
        else {
          val num3: Int = strings(1).indexOf(" WARN ")
          val num4: Int = strings(1).indexOf(" ERROR ")
          if(num3!=(-1)&&num4==(-1)){
            val str1: String = strings(1).substring(0, num3)
            val str2: String = strings(1).substring(num3 + 5, num2)
            val str3: String = strings(1).substring(num2 + 1, strings(1).length)
            Row(strings(0), str1,"WARN", str2, str3)}else{
            val str1: String = strings(1).substring(0, num4)
            val str2: String = strings(1).substring(num4 + 6, num2)
            val str3: String = strings(1).substring(num2 + 1, strings(1).length)
            Row(strings(0), str1,"ERROR", str2, str3)
          }
        }
      })

    val schema: StructType = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("number", StringType),
        StructField("status", StringType),
        StructField("util", StringType),
        StructField("info", StringType),
      )
    )
    val frame: DataFrame = spark.createDataFrame(row, schema)
    frame.show(80,false)
  }


}

清洗后的效果图:

 

3.2  创建jdbcUtils来将其数据导入到数据库:

object jdbcUtils {
  val url = "jdbc:mysql://192.168.61.141:3306/jsondemo?createDatabaseIfNotExist=true"
  val driver = "com.mysql.cj.jdbc.Driver"
  val user = "root"
  val password = "root"

  val table_access_logs: String = "access_logs"
  val table_full_access_logs: String = "full_access_logs"
  val table_day_active:String="table_day_active"
  val table_retention:String="retention"

  val table_loading_json="loading_json"
  val table_ad_json="ad_json"
  val table_notification_json="notification_json"
  val table_active_background_json="active_background_json"
  val table_comment_json="comment_json"
  val table_praise_json="praise_json"

  val table_teacher_json="teacher_json"

  val properties = new Properties()
  properties.setProperty("user", jdbcUtils.user)
  properties.setProperty("password", jdbcUtils.password)
  properties.setProperty("driver", jdbcUtils.driver)

  def dataFrameToMysql(df: DataFrame, table: String, op: Int = 1): Unit = {
    if (op == 0) {
      df.write.mode(SaveMode.Append).jdbc(jdbcUtils.url, table, properties)
    } else {
      df.write.mode(SaveMode.Overwrite).jdbc(jdbcUtils.url, table, properties)
    }
  }

  def getDataFtameByTableName(spark:SparkSession,table:String):DataFrame={
    val frame: DataFrame = spark.read.jdbc(jdbcUtils.url, table, jdbcUtils.properties)
    frame
  }

}

3.3 数据导入

jdbcUtils.dataFrameToMysql(frame,jdbcUtils.table_day_active,1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/415884.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenAI Translator | 基于ChatGPT API全局翻译润色解析及ORC上传图像翻译插件

简介 OpenAI Translator,一款基于 ChatGPT API 的划词翻译的浏览器插件和跨平台桌面端应用,使用 ChatGPT API 进行划词翻译和文本润色,借助了 ChatGPT 强大的翻译能力,帮助用户更流畅地阅读外语和编辑外语,允许跨 55 …

Qt音视频开发35-左右通道音量计算和音量不同范围值的转换

一、前言 视频文件一般会有两个声音通道及左右声道,值有时候一样有时候不一样,很多场景下我们需要对其分开计算不同的音量值,在QAudioFormat中可以获取具体有几个通道,如果是一个通道,则左右通道值设定一样&#xff0…

【时序数据库】时间序列数据和MongoDB第三部分-查询、分析和呈现时间序列数据...

在《时间序列数据和MongoDB:第1部分-简介》「时序数据库」时间序列数据与MongoDB:第一部分-简介中,我们回顾了理解数据库的查询访问模式需要询问的关键问题。在《时间序列数据和MongoDB:第2部分-模式设计最佳实践》「时序数据库」时序数据库和MongoDB第二…

Java实验课的学习笔记(二)类的简单使用

本文章就讲的是很基础的类的使用 重点大概就是类的构造函数以及一些很基础的东西。 实验内容是些老生常谈的东西,Complex类,在当初学C面向对象的时候也是这个样子展开的。 内容如以下: public class Complex {float real;float imag;public…

APK瘦身

先看下APK打包流程:APK打包流程_贺兰猪的博客-CSDN博客 知道了APK打包流程后想要瘦身,其实无非就是把整个APK的一些文件进行一个瘦身。 看下apk的这个文件。包括class、资源,资源生成arsc(资源映射表),manifest清单,…

快排(非递归)及计数排序算法

都学了递归版的快速排序为何还要再学非递归实现?由于在递归过程中,如果数据量过大,那么实现时容易导致栈溢出,虽然代码没有问题,但是就是会崩,因此要将其改为非递归来实现 文章目录一、快速排序&#xff08…

如何使用Mac远程控制Windows电脑?

在你开始之前,设置您要远程处理的 Windows 计算机。 先安装 Microsoft Remote Desktop。 您可以在“应用程序”文件夹中检查它。 如果在个人计算机上安装,请转到 Apple App Store 并下载 Microsoft Remote Desktop。 如果在 TXST 计算机上安装&#xff0…

【C语言】递归解决经典题目(汉诺塔问题、青蛙跳台阶问题)

简单不先于复杂,而是在复杂之后。 目录 1. 汉诺塔问题 1.1 简介及思路 1.2 代码实现 2. 青蛙跳台阶问题 2.1 简介及思路 2.2 代码实现 1. 汉诺塔问题 1.1 简介及思路 汉诺塔问题是一种经典的递归问题,起源于印度传说中的塔 of Brahma。问题描…

手把手教你学习IEC104协议和编程实现 十 故障事件与复位进程

故障事件 目的 在IEC104普遍应用之前,据我了解多个协议,再综合自动化协议中,有这么一个概念叫“事故追忆”,意思是当变电站出现事故的时候,不但要记录事故的时间,还需记录事故前后模拟量的数据,从而能从一定程度上分析事故产生的原因,这个模拟量就是和今天讲解的故障…

silvaco 仿真BJT

本次实验为利用silvaco仿真BJT器件,分析不同p区厚度以及p区不同掺杂浓度研究其电流增益的变化。 一、器件要求 区域 掺杂方式 掺杂浓度或 峰值浓度(/cm3) 厚度(um) 宽度(um) N-漂移区 均匀…

微服务框架【笔记-Nacos注册中心】

接上篇,继续学习微服务框架中的Nacos注册中心。 Nacos注册中心 一、认识和安装Nacos 1.认识Nacos Nacos 是阿里巴巴的产品,现在是 SpringCloud 中的一个组件。相比 Eureka 功能更加丰富。 2.安装Nacos 下面给大家展示windows安装Nacos步骤:…

网络互联技术与实践教程(汪双硕、姚羽)——第四章 路由技术

第四章 路由技术 4.1 路由原理 路由是指通过相互连接的网络将数据从源地点转发到目标地点的过程。在路由过程中,数据通常会经过一个或多个中间节点,路由发生在网络层。路由包含两个主要的动作:确定最佳路径和通过网络传输信息,后…

刷题笔记【6】| 快速刷完67道剑指offer(Java版)

本文已收录于专栏🌻《刷题笔记》文章目录前言🎨 1、包含min函数的栈题目描述思路(双栈法)🎨 2、栈的压入弹出序列题目描述思路(辅助栈)🎨 3、从上往下打印二叉树题目描述思路&#x…

chapter-4-数据库语句

以下课程来源于MOOC学习—原课程请见:数据库原理与应用 考研复习 概述 SQL发展 注:关键词是哪些功能,尤其第一个create alter drop是定义功能 1.SQL功能强大,实现了数据定义、数据操纵、数据控制等功能 2.SQL语言简洁&#xff…

【Java版oj】day26跳台阶扩展问题、快到碗里来

目录 一、跳台阶扩展问题 (1)原题再现 (2)问题分析 (3)完整代码 二、快到碗里来 (1)原题再现 (2)问题分析 (3)完整代码 一、跳台…

tomcat配置虚拟路径映射磁盘文件列表图片回显和放大功能实现springboot项目的文件虚拟路径映射

tomcat映射磁盘图片1.以E盘为例,在E盘创建目录testReceive2.配置tomcat虚拟路径映射e盘本地文件3.代码层面创建上传文件(此处为图片)工具类3.1(校验图片格式、获取当前主机ip、上传图片至本机目的地,获取上传图片地址&…

javaWeb(HTTP、Tomcat、Servlet)

目录 HTTP Web 服务器 - Tomcat 简介 基本使用:下载、安装、卸载、启动、关闭、配置、部署项目 IDEA中创建 Maven Web项目​编辑 IDEA中使用 Tomcat Servlet 快速入门 Servlet 执行流程 Servlet 生命周期 Servlet 体系结构 Servlet urlPattern配置 XM…

【从零开始学Skynet】实战篇《球球大作战》(五):gateway代码设计(中)

1、编码和解码 我们来实现两个辅助方法str_unpack和str_pack,用于消息的解码和编码。 (1)str_unpack代码 local str_unpack function(msgstr)local msg {}while true dolocal arg, rest string.match( msgstr, "(.-),(.*)")if…

【web自动化测试】

文章目录web自动化测试第一章 web自动化入门1.什么是自动化?1.1 优点2.什么是自动化测试?2.1 自动化测试能解决什么问题?2.2 自动化相关知识2.2.1优点2.2.2 误区2.3 自动化测试分类3.什么是Web自动化测试?3.1 什么Web项目适合做自…

Flutter 了解 Element

一 Element 概念 这个玩意的概念。到底是什么 ? 官方解释是在树中特定位置的实例。 二 继承关系 element 有 ComponentElement 和 RenderObjectElement 之分 1 ComponentElement class StatelessElement extends ComponentElement class StatefulElement extend…