Spark实战-基于Spark日志清洗与数据统计以及Zeppelin使用

news2024/11/18 7:26:22

Saprk-日志实战

一、用户行为日志

1.概念

用户每次访问网站时所有的行为日志(访问、浏览、搜索、点击)

	用户行为轨迹,流量日志

2.原因

分析日志:
	网站页面访问量
	网站的粘性
	推荐

3.生产渠道

(1)Nginx

(2)Ajax

4.日志内容

在这里插入图片描述

日志数据内容:
	1.访问的系统属性:操作系统、浏览器等
	2.访问特征:点击URL,跳转页面(referer)、页面停留时间
	3.访问信息:seesion_id、访问id信息(地市\运营商)

注意:Nginx配置,可以获取指定信息

5.意义

(1)网站的眼睛
	投放广告收益
(2)网站的神经
	网站布局(影响用户体验)
(3)网站的大脑

二、离线数据处理

1.处理流程

1)数据采集
Flume:
	产生的Web日志,写入到HDFS
	
2)数据清洗
	Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
	
3)数据处理
	按照业务逻辑进行统计分析
	Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
	
4)处理结果入库
	RDBMS(MySQL)\NoSQL(HBase、Redis)
	
5)数据可视化展示
	通过图形化展示:饼图、柱状图、地图、折线图
	Echarts、HUE、Zeppelin

在这里插入图片描述

三、项目需求

code/video
需求一:
	统计imooc主站最受欢迎的课程/手记Top N访问次数
	
需求二:
	按地市统计imooc主站最受欢迎的Top N课程
	a.根据IP地址获取出城市信息
	b.窗口函数在Spark SQL中的使用
	
需求三:
	按流量统计imooc主站最受欢迎的Top N课程

四、日志内容

需要字段:
	访问时间、访问URL、访问过程耗费流量、访问IP地址

日志处理:
	一般的日志处理方式,我们是需要进行分区的,
	按照日志中的访问时间进行相应的分区,比如: d, h,m5(每5分钟一个分区)
	
输入:访问时间、访问URL、耗费的流量、访问IP地址信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

Maven打包

mvn install:install-file -Dfile=D:\ipdatabase-master\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar2 -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

五、数据清洗

1.原始日志解析

package com.saddam.spark.MuKe.ImoocProject

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

/**
  * 第一步清洗:抽取出所需要指定列数据
  *
  * 添加断点,可以查看各个字段
  */
object SparkStatFormatJob {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("SparkStatFormatJob")
      .master("local[2]")
      .getOrCreate()

    val logRDD = spark.sparkContext.textFile("D:\\Spark\\DataSets\\access.20161111.log")

//    logRDD.take(10).foreach(println)

    val result = logRDD.map(line => {
      
      val split = line.split(" ")
      val ip = split(0)
      /**
        * 原始日志的第三个和第四个字段拼接起来就是完整的时间字段:
        * [10/Nov/2016:00:01:02 +0800]==>yyyy-MM-dd HH
        */
      //TODO 使用时间解析工具类
      val time = split(3) + " " + split(4)

      //"http://www.imooc.com/code/1852" 引号需要放空
      val url = split(11).replaceAll("\"", "")

      val traffic = split(9)

      //使用元组
      // (ip,DateUtils.parse(time),url,traffic)
      
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).take(20).foreach(println)


//    result.saveAsTextFile("D:\\Spark\\OutPut\\log_local_2")
/*
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
(117.35.88.11,[10/Nov/2016:00:01:02 +0800]) 
(182.106.215.93,[10/Nov/2016:00:01:02 +0800])
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
 */

    spark.stop()
}
}

2.日期工具类

package com.saddam.spark.MuKe.ImoocProject

import java.util.{Date, Locale}
import org.apache.commons.lang3.time.FastDateFormat

/**
  * 日期时间解析工具类
  */
object DateUtils {

  // 输入文件日期时间格式
  //[10/Nov/2016:00:01:02 +0800]
  val YYYYMMDDHHMM_TIME_FOEMAT= FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z",Locale.ENGLISH)

   //目标日期格式
  //2016-11-10 00:01:02
  val TARGET_FORMAT=FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  
  /**
    *解析时间
    * @param time
    * @return
    */
  def parse(time:String)={
    TARGET_FORMAT.format(new Date(getTime(time)))
  }

  /**
    * 获取输入日志时间:long类型
    *
    * time:[10/Nov/2016:00:01:02 +0800]
    * @param time
    * @return
    */
  def getTime(time:String)= {
    try {
      YYYYMMDDHHMM_TIME_FOEMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
    } catch {
      case e: Exception => {
        0l
      }
    }
  }
    def main(args: Array[String]): Unit = {
      println(parse("[10/Nov/2016:00:01:02 +0800]"))

  }
}

在这里插入图片描述

六、项目需求

需求一

					统计imooc主站最受欢迎的课程/手记TopN访问次数
	
按照需求完成统计信息并将统计结果入库
	--使用DataFrame API完成统计分析
	--使用SQL API完成统计分析
package com.saddam.spark.MuKe

import java.sql.{Connection, DriverManager, PreparedStatement}


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object PopularVideoVisits {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
      .master("local[2]")
      .getOrCreate()

    val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")

    accessDF.printSchema()
    accessDF.show(false)
    /*
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|url                               |cmsType|cmsId|traffic|ip             |city|time               |day     |
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc.com/video/4500   |video  |4500 |304    |218.75.35.226  |鏈煡 |2017-05-11 14:09:14|20170511|
|http://www.imooc.com/video/14623  |video  |14623|69     |202.96.134.133 |鏈煡 |2017-05-11 15:25:05|20170511|
|http://www.imooc.com/article/17894|article|17894|115    |202.96.134.133 |鏈煡 |2017-05-11 07:50:01|20170511|
     */

      //代码重构
    val day="20170511"
      
      
    videoAccessTopNStat(spark,accessDF,day)

    //MySQL工具类测试
    println(MySQLUtils.getConnection())
    
    /**
      * 按照流量进行统计
      */
    def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame,day:String)={
      //隐式转换
      import spark.implicits._

      //TODO 统计方式一:DataFrame方式统计video
      val  videoAccessTopNDF= accessDF
        .filter($"day"===day && $"cmsType"==="video")
        .groupBy("day","cmsId")
        .agg(count("cmsId").as("times"))
      videoAccessTopNDF.printSchema()
      videoAccessTopNDF.show(false)

      //TODO 统计方式二:SQL方式统计article
      accessDF.createOrReplaceTempView("temp")
      val videoAccessTopNSQL = spark.sql("select " +
        "day,cmsId,count(1) as times " +
        "from temp " +
        "where day='20170511' and cmsType='article' " +
        "group by day,cmsId " +
        "order by times desc")
      videoAccessTopNSQL.show(false)


      /**
        * TODO 将最受欢迎的TopN课程统计结果写入MySQL
        *
        */
      try{
        videoAccessTopNSQL.foreachPartition(partitionOfRecords=>{
          val list =new ListBuffer[DayVideoAccessStat]

          partitionOfRecords.foreach(info=>{
            val day=info.getAs[Integer]("day").toString
            val cmsId=info.getAs[Long]("cmsId")
            val times=info.getAs[Long]("times")

            list.append(DayVideoAccessStat(day,cmsId,times))
          })

          StatDAO.insertDayVideoAccessTopN(list)

        })}catch {
        case e:Exception=>e.printStackTrace()
      }

    }


    spark.stop()
  }

  /**
    * 课程访问次数实体类
    */
  case class  DayVideoAccessStat(day:String,cmsId:Long,times:Long)


  /**
   * TODO MySQL操作工具类
   */
  object MySQLUtils{
    def getConnection()={
      DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
    }
    /**
      * 释放数据库连接等资源
      * @param connection
      * @param pstmt
      */
    def release(connection: Connection, pstmt: PreparedStatement): Unit = {
      try {
        if (pstmt != null) {
          pstmt.close()
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    }
  }

  /**
    * TODO DAO数据库接口
    */
  object StatDAO{
    /**
      * 批量保存DayVideoAccessStat到数据库
      * insertDayVideoAccessTopN:每天访问视频的
      */
    def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {

      var connection:Connection = null
      var pstmt:PreparedStatement = null

      try {
        connection =MySQLUtils.getConnection()

        connection.setAutoCommit(false) //设置手动提交

        val sql = "insert into day_video_access_topn_stat2(day,cms_id,times) values (?,?,?)"
        pstmt = connection.prepareStatement(sql)

        for (ele <- list) {
          pstmt.setString(1, ele.day)
          pstmt.setLong(2, ele.cmsId)
          pstmt.setLong(3, ele.times)

          pstmt.addBatch()
        }

        pstmt.executeBatch() // 执行批量处理
        connection.commit() //手工提交
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        MySQLUtils.release(connection, pstmt)
      }
    }
    
  }

}

需求二

				按地市统计imooc主站最受欢迎的Top N课程
package com.saddam.spark.MuKe

import java.sql.{Connection, DriverManager, PreparedStatement}


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object PopularCiytVideoVisits {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
      .master("local[2]")
      .getOrCreate()

    val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")

    accessDF.printSchema()
    accessDF.show(false)
      
     //代码重构
    val day="20170511"
      
    //TODO 按照地市进行统计TopN课程
    cityAccessTopNStat(spark,accessDF,day)
      
    /**
    * 按照地市进行统计TopN课程
    * @param spark
    * @param accessDf
    */
  def cityAccessTopNStat(spark: SparkSession,accessDF:DataFrame,day:String)={
    import spark.implicits._
        val cityAccessTopNDF=accessDF.filter($"day"===day&&$"cmsType"==="video").groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
        cityAccessTopNDF.printSchema()
        cityAccessTopNDF.show(false)

    //Windows函数在Spark SQL的使用

    val top3DF=cityAccessTopNDF.select(
    cityAccessTopNDF("day"),
    cityAccessTopNDF("city"),
    cityAccessTopNDF("cmsId"),
    cityAccessTopNDF("times"),

    row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
      .orderBy(cityAccessTopNDF("times").desc)
    ).as("times_rank")
    ).filter("times_rank <=3") //.show(false)  //Top3
    /**
      * 将地市进行统计TopN课程统计结果写入MySQL
      *
      */
    try{
      top3DF.foreachPartition(partitionOfRecords=>{
        val list =new ListBuffer[DayCityVideoAccessStat]

        partitionOfRecords.foreach(info=>{
          val day=info.getAs[Integer]("day").toString
          val cmsId=info.getAs[Long]("cmsId")
          val city=info.getAs[String]("city")
          val times=info.getAs[Long]("times")
          val timesRank=info.getAs[Int]("times_rank")
          list.append(DayCityVideoAccessStat(day,cmsId,city,times,timesRank))


        })

        StatDAO.insertDayCityVideoAccessTopN(list)

      })}catch {
      case e:Exception=>e.printStackTrace()
    }
  }
       spark.stop()
  }   
    
    /**
    * 实体类
    */
    case class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)
    
     /**
   * TODO MySQL操作工具类
   */
  object MySQLUtils{
    def getConnection()={
      DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
    }
    /**
      * 释放数据库连接等资源
      * @param connection
      * @param pstmt
      */
    def release(connection: Connection, pstmt: PreparedStatement): Unit = {
      try {
        if (pstmt != null) {
          pstmt.close()
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    }
  }
    
    
  /**
    * TODO DAO数据库接口
    */
  object StatDAO{
      /**
    * 批量保存DayCityVideoAccessStat到数据库
    */
  def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnection()

      connection.setAutoCommit(false) //设置手动提交

      val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
      pstmt = connection.prepareStatement(sql)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setString(3, ele.city)
        pstmt.setLong(4, ele.times)
        pstmt.setInt(5, ele.timesRank)
        pstmt.addBatch()
      }

      pstmt.executeBatch() // 执行批量处理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }
  }
}

需求三

					按流量统计imooc主站最受欢迎的Top N课程
package com.saddam.spark.MuKe

import java.sql.{Connection, DriverManager, PreparedStatement}


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object VideoTrafficVisits {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
      .master("local[2]")
      .getOrCreate()

    val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")

    accessDF.printSchema()
    accessDF.show(false)
    
    //代码重构
    val day="20170511"
      
    //TODO 按照流量进行统计
    videoTrafficsTopNStat(spark,accessDF,day)
      
    /**
    * 按照流量进行统计
    */
  def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame,day:String): Unit = {
    import spark.implicits._

    val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
      .groupBy("day", "cmsId").agg(sum("traffic").as("traffics"))
      .orderBy($"traffics".desc)
      //.show(false)

    /**
      * 将流量进行统计TopN课程统计结果写入MySQL
      *
      */
    try{
      cityAccessTopNDF.foreachPartition(partitionOfRecords=>{
        val list =new ListBuffer[DayVideoTrafficsStat]
        partitionOfRecords.foreach(info=>{
          val day=info.getAs[Integer]("day").toString
          val cmsId=info.getAs[Long]("cmsId")
          val traffics=info.getAs[Long]("traffics")
          list.append(DayVideoTrafficsStat(day,cmsId,traffics))
        })

        StatDAO.insertDayVideoTrafficsAccessTopN(list)

      })}catch {
      case e:Exception=>e.printStackTrace()
    }
  }
      
      spark.stop()
  }
    
  /**
    * 实体类
    */
  case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)
    
    
   
     /**
   * TODO MySQL操作工具类
   */
  object MySQLUtils{
    def getConnection()={
      DriverManager.getConnection("jdbc:mysql://121.37.2x.2xx1:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
    }
    /**
      * 释放数据库连接等资源
      * @param connection
      * @param pstmt
      */
    def release(connection: Connection, pstmt: PreparedStatement): Unit = {
      try {
        if (pstmt != null) {
          pstmt.close()
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    }
  }
    
     /**
    * TODO DAO数据库接口
    */
  object StatDAO{
      /**
    * 批量保存DayVideoTrafficsStat到数据库
    */
  def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnection()

      connection.setAutoCommit(false) //设置手动提交

      val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) "
      pstmt = connection.prepareStatement(sql)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.traffics)
        pstmt.addBatch()
      }

      pstmt.executeBatch() // 执行批量处理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }
  }
}

删除已有数据

  /**
    * 删除指定日期的数据
    */
  def deleteData(day: String): Unit = {

    val tables = Array("day_video_access_topn_stat",
      "day_video_city_access_topn_stat",
      "day_video_traffics_topn_stat")

    var connection:Connection = null
    var pstmt:PreparedStatement = null

    try{
      connection = MySQLUtils.getConnection()

      for(table <- tables) {
        // delete from table ....
        val deleteSQL = s"delete from $table where day = ?"
        pstmt = connection.prepareStatement(deleteSQL)
        pstmt.setString(1, day)
        pstmt.executeUpdate()
      }
    }catch {
      case e:Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }

七、Zeppelin

官网

https://zeppelin.apache.org/

1.解压缩

[root@hadoop src]# tar zxvf  zeppelin-0.7.1-bin-all

2.改名

[root@hadoop src]# mv  zeppelin-0.7.1-bin-all zeppelin

3.启动

[root@hadoop bin]# ./zeppelin-daemon.sh start

4.Web界面

http://121.37.2xx.xx:8080

5.修改JDBC驱动

在这里插入图片描述

com.mysql.jdbc.Driver

xxxxxx

jdbc:mysql://121.37.2x.xx:3306/imooc_project?

root

#mysql驱动
/usr/local/src/mysql-connector-java-5.1.27-bin.jar

在这里插入图片描述

6.创建note

在这里插入图片描述

7.查询表

%jdbc

show tables;

在这里插入图片描述

8.图形展示

%jdbc

select cms_id,times from day_video_access_topn_stat;

在这里插入图片描述

八、Spark on Yarn

Spark运行模式

1)Local:开发时使用
2)Standalone:Spark自带的,若一个集群是standalone,则需要在多台机器上同时部署Spark
3)YARN:建议生产上使用该模式,统一使用yarn进行整个集群作业(MR、Spark)的资源调度
4)Mesos


不管使用那种模式,Spark应用程序代码是一模一样的,只需要在提交的时候指定--master指定

1.概述

Spark支持可插拔的集群管理模式

对于yarn而言,Spark Application仅仅只是一个客户端而已

2.client模式

Driver运行在Client端(提交Spark作业的机器)

Client会和请求到的Container进行通信来完成作业的调度和执行,Client是不能退出的

日志信息在控制台输出,便于我们测试
在这里插入图片描述

3.cluster模式

Driver运行在ApplicationMaster中

Client提交完作业就可以关掉,因为作业已在Yarn上运行了

日志在终端输出,看控制台不到的,因为日志在Driver端,只能通过yarn logs -applicationId
在这里插入图片描述

4.两种模式对比

Driver运行位置

ApplicationMaster的职责

运行输出日志的位置

5.案例

设置HADOOP_CONF_DIR=?

Client模式
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --executor-memory 1G \
  --num-executors 1 \
  /usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar \
  4
Pi is roughly 3.1411378528446323
22/02/28 18:52:26 INFO server.ServerConnector: Stopped Spark@1b0a7baf{HTTP/1.1}{0.0.0.0:4040}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@8a589a2{/stages/stage/kill,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@192f2f27{/jobs/job/kill,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1bdf8190{/api,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@4f8969b0{/,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fefce9e{/static,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@74cec793{/executors/threadDump/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@f9b7332{/executors/threadDump,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@18e7143f{/executors/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@209775a9{/executors,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5db4c359{/environment/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2c177f9e{/environment,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@33617539{/storage/rdd/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@47874b25{/storage/rdd,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@290b1b2e{/storage/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1fc0053e{/storage,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@77307458{/stages/pool/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@389adf1d{/stages/pool,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@7bf9b098{/stages/stage/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@72e34f77{/stages/stage,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6e9319f{/stages/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fa590ba{/stages,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2416a51{/jobs/job/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@293bb8a5{/jobs/job,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@37ebc9d8{/jobs/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5217f3d0{/jobs,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.184.135:4040
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/02/28 18:52:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/02/28 18:52:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/02/28 18:52:26 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/02/28 18:52:26 INFO memory.MemoryStore: MemoryStore cleared
22/02/28 18:52:26 INFO storage.BlockManager: BlockManager stopped
22/02/28 18:52:26 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/02/28 18:52:26 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/02/28 18:52:26 INFO spark.SparkContext: Successfully stopped SparkContext
22/02/28 18:52:26 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:52:26 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a95834c0-d38b-457b-89b2-fed00d5bef56

Cluster模式
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn-cluster \
  --executor-memory 1G \
  --num-executors 1 \
  /usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar \
  4


Pi is roughly 3.1411378528446323
22/02/28 18:52:26 INFO server.ServerConnector: Stopped Spark@1b0a7baf{HTTP/1.1}{0.0.0.0:4040}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@8a589a2{/stages/stage/kill,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@192f2f27{/jobs/job/kill,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1bdf8190{/api,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@4f8969b0{/,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fefce9e{/static,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@74cec793{/executors/threadDump/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@f9b7332{/executors/threadDump,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@18e7143f{/executors/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@209775a9{/executors,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5db4c359{/environment/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2c177f9e{/environment,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@33617539{/storage/rdd/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@47874b25{/storage/rdd,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@290b1b2e{/storage/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1fc0053e{/storage,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@77307458{/stages/pool/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@389adf1d{/stages/pool,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@7bf9b098{/stages/stage/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@72e34f77{/stages/stage,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6e9319f{/stages/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fa590ba{/stages,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2416a51{/jobs/job/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@293bb8a5{/jobs/job,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@37ebc9d8{/jobs/json,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5217f3d0{/jobs,null,UNAVAILABLE,@Spark}
22/02/28 18:52:26 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.184.135:4040
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/02/28 18:52:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/02/28 18:52:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/02/28 18:52:26 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/02/28 18:52:26 INFO memory.MemoryStore: MemoryStore cleared
22/02/28 18:52:26 INFO storage.BlockManager: BlockManager stopped
22/02/28 18:52:26 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/02/28 18:52:26 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/02/28 18:52:26 INFO spark.SparkContext: Successfully stopped SparkContext
22/02/28 18:52:26 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:52:26 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a95834c0-d38b-457b-89b2-fed00d5bef56
[root@hadoop01 spark]# ./bin/spark-submit   --class org.apache.spark.examples.SparkPi   --master yarn-cluster   --executor-memory 1G   --num-executors 1   /usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar 4
Warning: Master yarn-cluster is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead.
22/02/28 18:54:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/28 18:54:32 WARN util.Utils: Your hostname, hadoop01.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.184.135 instead (on interface ens33)
22/02/28 18:54:32 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/28 18:54:32 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.184.135:8032
22/02/28 18:54:32 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
22/02/28 18:54:33 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
22/02/28 18:54:33 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
22/02/28 18:54:33 INFO yarn.Client: Setting up container launch context for our AM
22/02/28 18:54:33 INFO yarn.Client: Setting up the launch environment for our AM container
22/02/28 18:54:33 INFO yarn.Client: Preparing resources for our AM container
22/02/28 18:54:33 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144/__spark_libs__3085975169933820625.zip -> hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/__spark_libs__3085975169933820625.zip
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar -> hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/spark-examples_2.11-2.1.1.jar
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144/__spark_conf__2818552262823480245.zip -> hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/__spark_conf__.zip
22/02/28 18:54:35 INFO spark.SecurityManager: Changing view acls to: root
22/02/28 18:54:35 INFO spark.SecurityManager: Changing modify acls to: root
22/02/28 18:54:35 INFO spark.SecurityManager: Changing view acls groups to:
22/02/28 18:54:35 INFO spark.SecurityManager: Changing modify acls groups to:
22/02/28 18:54:35 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/02/28 18:54:35 INFO yarn.Client: Submitting application application_1646041633964_0004 to ResourceManager
22/02/28 18:54:35 INFO impl.YarnClientImpl: Submitted application application_1646041633964_0004
22/02/28 18:54:36 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)
22/02/28 18:54:36 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: default
         start time: 1646045675928
         final status: UNDEFINED
         tracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/
         user: root
22/02/28 18:54:37 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)
22/02/28 18:54:38 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)
22/02/28 18:54:39 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:39 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 192.168.184.135
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1646045675928
         final status: UNDEFINED
         tracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/
         user: root
22/02/28 18:54:40 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:41 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:42 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:43 INFO yarn.Client: Application report for application_1646041633964_0004 (state: FINISHED)
22/02/28 18:54:43 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 192.168.184.135
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1646045675928
         final status: SUCCEEDED
         tracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/
         user: root
22/02/28 18:54:43 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:54:43 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144

6.检测ID

[root@hadoop01 spark]# yarn logs -applicationId application_1646041633964_0003

22/02/28 18:59:05 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.184.135:8032
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
/tmp/logs/root/logs/application_1646041633964_0003 does not exist.
Log aggregation has not completed or is not enabled.

#未指定参数,看不到,未作聚合日志配置,需要通过webUI页面

7.WebUI查看结果

http://hadoop01:8042/node/containerlogs/container_1646041633964_0004_01_000001/root

在这里插入图片描述

九、Spark项目运行到YARN

maven打包依赖

在这里插入图片描述

1.IDEA项目代码-词频统计

package com.bigdata

import org.apache.spark.sql.SparkSession

object WordCountYARN {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .getOrCreate()

    if(args.length!=2){
      println("Usage:WordCountYARN <inputPath><outputPath>")
    }
    val Array(inputPath,outputPath)=args

    val rdd = spark.sparkContext.textFile(inputPath)

    val df = rdd.flatMap(x=>x.split("\t")).map(word=>(word,1)).reduceByKey((a,b)=>(a+b))

    df.saveAsTextFile(outputPath)

    spark.stop()
  }

}

2.spark-submit

spark-submit \
  --class com.bigdata.WordCountYARN \
  --name WordCount \
  --master yarn \
  --executor-memory 1G \
  --num-executors 1 \
  /usr/local/src/spark/spark_jar/BYGJ.jar \
  hdfs://hadoop01:9000/wordcount.txt hdfs://hadoop01:9000/wc_output

3.查询结果

[root@hadoop01 spark_jar]# hadoop fs -cat /wc_output/part-*

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(hive,5)
(spark,5)
(hadoop,2)
(hbase,3)

-----------------------------------------

1.IDEA项目代码-日志清洗

package com.saddam.spark.MuKe.ImoocProject.LogClean

import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkStatCleanJobYarn {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .getOrCreate()

    if(args.length!=2){
      println("Usage:WordCountYARN <inputPath><outputPath>")
    }
    val Array(inputPath,outputPath)=args

    val accessRDD = spark.sparkContext.textFile(inputPath)

    //TODO RDD->DF
    val accessDF=spark.createDataFrame(accessRDD.map(x=>AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)

    accessDF
        .coalesce(1)
      .write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .partitionBy("day")
      .save(outputPath)
    
    spark.stop()
  }
}

2.spark-submit

spark-submit \
  --class com.saddam.spark.MuKe.ImoocProject.LogClean.SparkStatCleanJobYarn \
  --name SparkStatCleanJobYarn \
  --master yarn \
  --executor-memory 1G \
  --num-executors 1 \
  --files /usr/local/src/spark/spark_jar/ipDatabase.csv,/usr/local/src/spark/spark_jar/ipRegion.xlsx \
  /usr/local/src/spark/spark_jar/Spark.jar \
  hdfs://hadoop01:9000/access.log hdfs://hadoop01:9000/log_output

3.查询结果

进入spark-shell

[root@hadoop01 datas]# spark-shell --master local[2] --jars /usr/local/src/mysql-connector-java-5.1.27-bin.jar

获取hdfs输出文件

/log_output/day=20170511/part-00000-36e30abb-3e42-4237-ad9f-a9f93258d4b2.snappy.parquet

读取文件

scala>     spark.read.format("parquet").parquet("/log_output/day=20170511/part-00000-36e30abb-3e42-4237-ad9f-a9f93258d4b2.snappy.parquet").show(false)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+----------------------------------+-------+-----+-------+---------------+----+-------------------+
|url                               |cmsType|cmsId|traffic|ip             |city|time               |
+----------------------------------+-------+-----+-------+---------------+----+-------------------+
|http://www.imooc.com/video/4500   |video  |4500 |304    |218.75.35.226  |    |2017-05-11 14:09:14|
|http://www.imooc.com/video/14623  |video  |14623|69     |202.96.134.133 |    |2017-05-11 15:25:05|
|http://www.imooc.com/article/17894|article|17894|115    |202.96.134.133 |    |2017-05-11 07:50:01|
|http://www.imooc.com/article/17896|article|17896|804    |218.75.35.226  |    |2017-05-11 02:46:43|
|http://www.imooc.com/article/17893|article|17893|893    |222.129.235.182|    |2017-05-11 09:30:25|
|http://www.imooc.com/article/17891|article|17891|407    |218.75.35.226  |    |2017-05-11 08:07:35|
|http://www.imooc.com/article/17897|article|17897|78     |202.96.134.133 |    |2017-05-11 19:08:13|
|http://www.imooc.com/article/17894|article|17894|658    |222.129.235.182|    |2017-05-11 04:18:47|
|http://www.imooc.com/article/17893|article|17893|161    |58.32.19.255   |    |2017-05-11 01:25:21|
|http://www.imooc.com/article/17895|article|17895|701    |218.22.9.56    |    |2017-05-11 13:37:22|
|http://www.imooc.com/article/17892|article|17892|986    |218.75.35.226  |    |2017-05-11 05:53:47|
|http://www.imooc.com/video/14540  |video  |14540|987    |58.32.19.255   |    |2017-05-11 18:44:56|
|http://www.imooc.com/article/17892|article|17892|610    |218.75.35.226  |    |2017-05-11 17:48:51|
|http://www.imooc.com/article/17893|article|17893|0      |218.22.9.56    |    |2017-05-11 16:20:03|
|http://www.imooc.com/article/17891|article|17891|262    |58.32.19.255   |    |2017-05-11 00:38:01|
|http://www.imooc.com/video/4600   |video  |4600 |465    |218.75.35.226  |    |2017-05-11 17:38:16|
|http://www.imooc.com/video/4600   |video  |4600 |833    |222.129.235.182|    |2017-05-11 07:11:36|
|http://www.imooc.com/article/17895|article|17895|320    |222.129.235.182|    |2017-05-11 19:25:04|
|http://www.imooc.com/article/17898|article|17898|460    |202.96.134.133 |    |2017-05-11 15:14:28|
|http://www.imooc.com/article/17899|article|17899|389    |222.129.235.182|    |2017-05-11 02:43:15|
+----------------------------------+-------+-----+-------+---------------+----+-------------------+
only showing top 20 rows

十、项目性能调优

1.集群优化

存储格式的选择:https://www.infoq.cn/article/bigdata-store-choose/

压缩格式的选择:
	默认:snapy
.config("spark.sql.parquet.compression.codec","gzip")修改

2.代码优化

选择高性能算子

复用已有的数据

3.参数优化

并行度:
	spark.sql.shuffle.partitions	
	200	
	配置在为联接或聚合进行数据洗牌时使用的分区数。
spark-submit:
	--conf spark.sql.shuffle.partitions=500
IDEA:
	.config("","")
	
	
分区字段类型推测:
	 spark.sql.sources.partitionColumnTypeInference.enabled
spark-submit:
	--conf spark.sql.sources.partitionColumnTypeInference.enabled=false
IDEA:
	.config("","")

262 |58.32.19.255 | |2017-05-11 00:38:01|
|http://www.imooc.com/video/4600 |video |4600 |465 |218.75.35.226 | |2017-05-11 17:38:16|
|http://www.imooc.com/video/4600 |video |4600 |833 |222.129.235.182| |2017-05-11 07:11:36|
|http://www.imooc.com/article/17895|article|17895|320 |222.129.235.182| |2017-05-11 19:25:04|
|http://www.imooc.com/article/17898|article|17898|460 |202.96.134.133 | |2017-05-11 15:14:28|
|http://www.imooc.com/article/17899|article|17899|389 |222.129.235.182| |2017-05-11 02:43:15|
±---------------------------------±------±----±------±--------------±—±------------------+
only showing top 20 rows


## 十、项目性能调优

### 1.集群优化

~~~markdown
存储格式的选择:https://www.infoq.cn/article/bigdata-store-choose/

压缩格式的选择:
	默认:snapy
.config("spark.sql.parquet.compression.codec","gzip")修改

2.代码优化

选择高性能算子

复用已有的数据

3.参数优化

并行度:
	spark.sql.shuffle.partitions	
	200	
	配置在为联接或聚合进行数据洗牌时使用的分区数。
spark-submit:
	--conf spark.sql.shuffle.partitions=500
IDEA:
	.config("","")
	
	
分区字段类型推测:
	 spark.sql.sources.partitionColumnTypeInference.enabled
spark-submit:
	--conf spark.sql.sources.partitionColumnTypeInference.enabled=false
IDEA:
	.config("","")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1499127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024_01蓝桥杯STEMA 考试 Scratch 中级试卷解析​​​​​​​

2024_01蓝桥杯STEMA 考试 Scratch 中级试卷解析一、选择题第一题、运行下列哪段程序后,蜜蜂会向上移动?(C ) 第二题、运行以下程序,输入下列哪个数后,角色会说“未通过”?( D) A. 90 B. 85 C. 60 D. 58第三题、运行以下程序后,n 的值为(B )。 A. 17 B…

机器学习 | 使用CatBoost处理缺失值

数据是任何分析或机器学习的基础。然而&#xff0c;现实世界的数据集并不完美&#xff0c;它们经常包含缺失值&#xff0c;这可能导致任何算法的训练阶段出现错误。处理缺失值至关重要&#xff0c;因为它们可能会导致数据分析和机器学习模型中出现偏差或不准确的结果。处理缺失…

第5章 HSA内存模型

5.1 引言 在共享内存环境中&#xff0c;独立的控制线程可以竞相修改单个位置。为程序以可预测的方式运行&#xff0c;程序员必须用同步来控制这些竞争。 “内存一致性模型”或“内存模型”定义了并行代理之间通信的基本规则。当这些规则含糊不清地定义或者更糟的是完全不存在…

OpenHarmony教程指南—Ability的启动模式

介绍 本示例展示了在一个Stage模型中&#xff0c;实现standard、singleton、specified多种模式场景。 本实例参考开发指南 。 本实例需要使用aa工具 查看应用Ability 模式信息。 效果预览 使用说明 1、standard模式&#xff1a; 1&#xff09;进入首页&#xff0c;点击番茄…

Linux ubuntu 写c语言Hello world

文章目录 创建hello.c 文件进入hello.c 文件使用vim 编辑器进行编辑下载gcc 编辑器调用gcc 进行编译hello.c 创建hello.c 文件 touch hello.c进入hello.c 文件 vi hello.c使用vim 编辑器进行编辑 下载gcc 编辑器 sudo apt update sudo apt install gcc第一个语句是更新&am…

政安晨:【深度学习处理实践】(二)—— 最大汇聚运算

最大汇聚运算&#xff08;Max Pooling Operation&#xff09;是深度学习领域卷积神经网络常用的一种汇聚运算方式。在卷积神经网络中&#xff0c;经过一系列卷积层和激活函数层后&#xff0c;数据在空间尺寸上逐渐减小&#xff0c;特征图的深度也逐渐增加。为了降低数据尺寸并提…

『python爬虫』requests实战-comicai绘画ai通过cookie签到(保姆级图文)

目录 实现效果实现思路登录查询积分数量签到 实现代码总结 欢迎关注 『python爬虫』 专栏&#xff0c;持续更新中 欢迎关注 『python爬虫』 专栏&#xff0c;持续更新中 实现效果 实现思路 登录 f12 打开控制台,进入网络清除所有信息后点击登录按钮 通过搜索login(通用写法)…

Spring Boot中实现图片上传功能的两种策略

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

学习Java的第二天

如何使用文本文档在cmd里打印出HelloWorld 1、创建一个文本文档&#xff0c;并命名为HelloWorld&#xff0c;将后缀改为java&#xff08;需要自己去把后缀打开显示出来&#xff09; 2、打开编辑 也可以双击打开 3、在里面写出以下代码 上面红框里为你要打印的语句&#xff0c;…

英伟达板子4----存储满了系统黑屏

记录一个bug&#xff0c;因为最近在做边缘端视频处理的内容&#xff0c;就把视频存储在边端设备&#xff0c;但是发现由于边缘端设备的存储太小了&#xff0c;导致把ubuntu端的存储&#xff08;只有28个Gib&#xff09;给吃满了。 然后搜了一篇博客说重启就能释放一些空间&…

KONG - API转发流程梳理

kong简介 Kong 是一个开源的API网关&#xff0c;集成了服务注册和发现、负载均衡、健康检查等功能&#xff0c;还可以通过插件来提供限流、熔断、监控、日志等能力&#xff0c; 在kong的微服务架构中&#xff0c;kong担当了注册中心的角色&#xff0c;服务提供者(Provider)首…

【b站咸虾米】ES6 Promise的用法,ES7 async/await异步处理同步化,异步处理进化史

课程地址&#xff1a;【ES6 Promise的用法&#xff0c;ES7 async/await异步处理同步化&#xff0c;异步处理进化史】 https://www.bilibili.com/video/BV1XW4y1v7Md/?share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 图文地址&#xff1a;https://www.b…

Java 集合类的高级特性介绍

在 Java 编程中&#xff0c;了解集合类的高级特性对于编写高效和可维护的代码至关重要。以下是一些你应该知道的 Java 集合类的高级特性&#xff0c;以及简单的例子来说明它们的用法。 1. 迭代器&#xff08;Iterators&#xff09;和列表迭代器&#xff08;ListIterators&#…

算法归纳【数组篇】

目录 二分查找1. 前提条件&#xff1a;2. 二分查找边界 2.移除元素有序数组的平方长度最小的子数组59.螺旋矩阵II54. 螺旋矩阵 二分查找 参考链接 https://programmercarl.com/0704.%E4%BA%8C%E5%88%86%E6%9F%A5%E6%89%BE.html#%E6%80%9D%E8%B7%AF 1. 前提条件&#xff1a; 数…

git revert 撤回之前的几个指定的提交

文章目录 Intro操作命令-n 选项 参考 Intro 在开发过程中&#xff0c;有的时候一开始只是一个小需求&#xff0c;可以改着改着事情超出了控制&#xff0c;比如说我一开始只是想调整一个依赖包的版本&#xff0c;可是改到后来类库不兼容甚至导致项目无法启动。 这个时候我就想&…

华为OD机试 - 服务器广播 - 矩阵(Java 2024 C卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述1、输入2、输出3、说明 四、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2024C卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷…

从数据到智能:探讨大数据在AI领域的核心作用

前言 大数据和人工智能已经成为当今社会的两大热门话题。它们之间究竟有何关系&#xff1f;又如何在各个领域发挥着重要作用&#xff1f; 概念区别与联系 一、大数据与人工智能的基本概念 大数据&#xff0c;顾名思义&#xff0c;指的是海量的、类型繁多的数据集合。这些数据…

万界星空科技MES系统中的车间管理的作用

在了解mes生产管理系统的作用包括哪些方面之前&#xff0c;我们先来了解一下作为生产管理信息化的关键部分&#xff0c;车间管理系统包含哪几个部分&#xff1a;一、mes系统中的车间管理通常包含以下部分&#xff1a; 1、设备管理&#xff1a;用于监控车间内的设备状态&#xf…

【日常聊聊】2024 年 AI 辅助研发趋势

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;日常聊聊 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 方向一&#xff1a;AI辅助研发的技术进展 方向二&#xff1a;行业应用案例 方向三&#xff1a; 面临的挑战与机遇 方向四&a…

学习人工智能:吴恩达《AI for everyone》2019 第3周:实现智能音箱和自动驾驶的几个步骤;无监督学习;增强学习

吴恩达 Andrew Ng&#xff0c; 斯坦福大学前教授&#xff0c;Google Brain项目发起人、领导者。 Coursera 的联合创始人和联合主席&#xff0c;在 Coursera 上有十万用户的《机器学习》课程&#xff1b;斯坦福大学计算机科学前教授。百度前副总裁、前首席科学家&#xff1b;谷…