文章目录
- Spark On Hive 详解
- 一、项目配置
- 1. 创建工程
- 2. 配置文件
- 3. 工程目录
- 二、代码实现
- 2.1 Class SparkFactory
- 2.2 Object SparkFactory
Spark On Hive 详解
本文基于Spark重构基于Hive的电商数据分析的项目需求,在重构的同时对Spark On Hive的全流程进行详细的讲解。
一、项目配置
1. 创建工程
首先,创建一个空的Maven工程,在创建之后,我们需要检查一系列配置,以保证JDK版本的一致性。同时,我们需要创建出Scala的编码环境。具体可参考以下文章:
Maven工程配置与常见问题解决指南
和
Scala01 —— Scala基础
2. 配置文件
2.1 在Spark On Hive
的项目中,我们需要有两个核心配置文件。
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ybg</groupId>
<artifactId>warehouse_ebs_2</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>3.1.2</spark.version>
<spark.scala.version>2.12</spark.scala.version>
<hadoop.version>3.1.3</hadoop.version>
<mysql.version>8.0.33</mysql.version>
<hive.version>3.1.2</hive.version>
<hbase.version>2.3.5</hbase.version>
<jackson.version>2.10.0</jackson.version>
</properties>
<dependencies>
<!-- spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- mysql -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- HBase 驱动 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
</project>
- log4j.properties
log4j.properties
文件的主要作用是配置日志系统的行为,包括控制日志信息的输出和实现滚动事件日志。
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
----------------------- 滚动事件日志代码 -----------------------
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.logfile.DatePattern='.'yyyy-MM-dd
log4j.appender.logfile.append=true
---------------------------------------------------------------
log4j.appender.logfile.File=log/spark_first.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
2.2 组件核心配置文件
在工程的resources
目录下,需要存放在虚拟机中大数据服务的核心组件的配置文件,以便于Spark On Hive中调用大数据组件服务能够正常进行。
3. 工程目录
二、代码实现
-
创建数据校验方法 check:
用于确保配置项的值有效。
检查值是否为 null。
对字符串类型的值进行非空和正则表达式匹配校验。 -
创建配置设置方法 set:
先校验配置项名称和值的有效性。
使用 SparkConf.set 方法设置有效的配置项和值。 -
单例对象 SparkFactory:
提供基础配置方法,如设置应用名称、主节点等。
提供 baseConfig 方法集中进行基础配置。
提供 end 方法返回配置好的 SparkSession 实例。 -
在 SparkFactory 类中实现上述方法:
定义 build 方法,返回包含 check 和 set 方法的 Builder 对象。
在 Builder 对象中实现各种配置方法,每个方法都调用 set 方法。
使用 SparkSession.builder() 方法在 end 方法中创建并返回 SparkSession 实例。
SparkFactory配置表如下:
配置表
2.1 Class SparkFactory
- 作用:
SparkFactory
类的作用是能够工厂化地创建和配置SparkSession
实例,通过一系列的set
和check
方法来确保配置项的有效性和正确性,并最终生成一个配置好的SparkSession
实例。 - 注意:我们需要在Spark官网配置页获取所有配置项的标准名称。
- 代码
class SparkFactory {
def build():Builder={
new Builder {
val conf = new SparkConf()
/**
* 数据校验
* @param title 校验主题
* @param value 待校验的值
* @param regex 若待校验值为字符串,且有特定的规则,那么提供正则表达式进一步验证格式
*/
private def check(title:String,value:Any,regex:String=null)={
if (null == value) {
throw new RuntimeException()(s"value for $title null pointer exception")
}
if(value.isInstanceOf[String]){
if(value.toString.isEmpty){
throw new RuntimeException(s"value for $title empty string exception")
}
if(regex!=null){
if(!value.toString.matches(regex)){
throw new RuntimeException(s"$title is not match regex $regex")
}
}
}
}
/**
* 先检查配置项名称是否正确
* 再检查配置项的值是否正确
* @param item 配置项名称
* @param value 配置项值
* @param regexValue 配置项正则规则
*/
private def set(item:String,value:String,regexValue:String=null)={
check("name_of_config_item",item,"^spark\\..*")
check(item,value,regexValue)
conf.set(item,value)
}
// Base
private def setBaseAppName(appName:String)={
set("spark.app.name",appName,"^\\w+$")
}
private def setBaseMaster(master:String)={
set("spark.master",master,"yarn|spark://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}|local(\\[\\*|[1-9][0-9]*])")
}
private def setBaseDeployMode(deployMode:String)={
set("spark.submit.deployMode",deployMode,"client|cluster")
}
private def setBaseEventLogEnabled(eventLogEnabled:Boolean)={
set("spark.eventLog.enabled",s"$eventLogEnabled")
}
override def baseConfig(appName: String, master: String = "local[*]", deployMode: String = "client", eventLogEnabled: Boolean = false): Builder = {
setBaseAppName(appName)
setBaseMaster(master)
setBaseDeployMode(deployMode)
setBaseEventLogEnabled(eventLogEnabled)
this
}
// Driver
private def setDriverMemory(memoryGB:Int)={
set("spark.driver.memory",s"${memoryGB}g","[1-9]\\d*")
}
private def setDriverCoreNum(coreNum: Int) = {
set("spark.driver.cores", s"${coreNum}g", "[1-9]\\d*")
}
private def setDriverMaxResultGB(maxRstGB:Int)={
set("spark.driver.maxResultSize",s"${maxRstGB}g","[1-9]\\d*")
}
private def setDriverHost(driverHost:String)={
set("spark.submit.deployMode",driverHost,"localhost|[a-z]\\w+")
}
override def optimizeDriver(memoryGB: Int = 2, coreNum: Int = 1, maxRstGB: Int = 1, driverHost: String = "localhost"): Builder = {
setDriverCoreNum(coreNum)
setDriverMemory(memoryGB)
/**
* 每一个Spark行动算子触发的所有分区序列化结果大小上限
*/
setDriverMaxResultGB(maxRstGB)
/**
* Standalone 模式需要设置 DriverHost,便于 executor 与 master 通信
*/
if (conf.get("spark.master").startsWith("spark://")) {
setDriverHost(driverHost)
}
setDriverHost(driverHost)
this
}
// Executor
private def setExecutorMemory(memoryGB: Int) = {
set("spark.executor.memory", s"${memoryGB}g", "[1-9]\\d*")
}
private def setExecutorCoreNum(coreNum: Int) = {
set("spark.executor.cores", s"$coreNum", "[1-9]\\d*")
}
override def optimizeExecutor(memoryGB:Int=1,coreNum:Int=1):Builder={
setExecutorMemory(memoryGB)
/**
* Yarn模式下只能由1个核
* 其他模式下,核数为所有可用的核
*/
if(!conf.get("spark.master").equals("yarn")){
setExecutorCoreNum(coreNum)
}
this
}
// Limit
private def setLimitMaxCores(maxCores:Int)={
set("spark.cores.max",s"$maxCores","[1-9]\\d*")
}
private def setLimitMaxTaskFailure(maxTaskFailure:Int)={
set("spark.task.maxFailures",s"$maxTaskFailure","[1-9]\\d*")
}
private def setLimitMaxLocalWaitS(maxLocalWaitS:Int)={
set("spark.locality.wait",s"${maxLocalWaitS}s","[1-9]\\d*")
}
override def optimizeLimit(maxCores:Int=4,maxTaskFailure:Int=3,maxLocalWaitS:Int=3):Builder={
if (conf.get("spark.master").startsWith("spark://")) {
setLimitMaxCores(maxCores)
}
/**
* 单个任务允许失败最大次数,超出会杀死本次任务
*/
setLimitMaxTaskFailure(maxTaskFailure)
/**
* 数据本地化读取加载的最大等待时间
* 大任务:建议适当增加此值
*/
setLimitMaxLocalWaitS(maxLocalWaitS)
this
}
// Serializer
override def optimizeSerializer(serde:String="org.apache.spark.serializer.JavaSerializer",clas:Array[Class[_]]=null):Builder={
/**
* 设置将需要通过网络发送或快速缓存的对象序列化工具类
* 默认为JavaSerializer
* 为了提速,推荐设置为KryoSerializer
* 若采用 KryoSerializer,需要将所有自定义的实体类(样例类)注册到配置中心
*/
set("spark.serializer",serde,"([a-z]+\\.)+[A-Z]\\w*")
if(serde.equals("org.apache.spark.serializer.KryoSerializer")){
conf.registerKryoClasses(clas)
}
this
}
// Net
private def setNetTimeout(netTimeoutS:Int)={
set("spark.cores.max",s"${netTimeoutS}s","[1-9]\\d*")
}
private def setNetSchedulerMode(schedulerMode:String)={
set("spark.scheduler.mode",schedulerMode,"FAIR|FIFO")
}
override def optimizeNetAbout(netTimeOusS:Int=120,schedulerMode:String="FAIR"):Builder={
/**
* 所有和网络交互相关的超时阈值
*/
setNetTimeout(netTimeOusS)
/**
* 多人工作模式下,建议设置为FAIR
*/
setNetSchedulerMode(schedulerMode)
this
}
// Dynamic
private def setDynamicEnabled(dynamicEnabled:Boolean)={
set("spark.dynamicAllocation.enabled",s"$dynamicEnabled")
}
private def setDynamicInitialExecutors(initialExecutors:Int)={
set("spark.dynamicAllocation.initialExecutors",s"$initialExecutors","[1-9]\\d*")
}
private def setDynamicMinExecutors(minExecutors:Int)={
set("spark.dynamicAllocation.minExecutors",s"$minExecutors","[1-9]\\d*")
}
private def setDynamicMaxExecutors(maxExecutors:Int)={
set("spark.dynamicAllocation.maxExecutors",s"$maxExecutors","[1-9]\\d*")
}
override def optimizeDynamicAllocation(dynamicEnabled:Boolean=false,initialExecutors:Int=3,minExecutors:Int=0,maxExecutors:Int=6):Builder={
/**
* 根据应用的工作需求,动态分配executor
*/
setDynamicEnabled(dynamicEnabled)
if(dynamicEnabled){
setDynamicInitialExecutors(initialExecutors)
setDynamicMinExecutors(minExecutors)
setDynamicMaxExecutors(maxExecutors)
}
this
}
override def optimizeShuffle(parallelism:Int=3,shuffleCompressEnabled:Boolean=false,maxSizeMB:Int=128,shuffleServiceEnabled:Boolean=true):Builder={
null
}
override def optimizeSpeculation(enabled:Boolean=false,interval:Int=15,quantile:Float=0.75F):Builder={
null
}
override def warehouseDir(hdfs:String):Builder={
null
}
override def end():SparkSession={
SparkSession.builder().getOrCreate()
}
}
}
}
2.2 Object SparkFactory
object SparkFactory {
trait Builder{
// 默认值能给就给
/**
* 基本配置
* @param appName
* @param master 默认是本地方式
* @param deployMode 默认是集群模式
* @param eventLogEnabled 生产环境打开,测试环境关闭
* @return
*/
def baseConfig(appName:String,master:String="local[*]",deployMode:String="client",eventLogEnabled:Boolean=false):Builder
/**
* 驱动端优化配置
* @param memoryGB 驱动程序的内存大小
* @param coreNum 驱动程序的核数
* @param maxRstGB 驱动程序的最大结果大小
* @param driverHost 驱动程序的主机地址:驱动程序会在主机地址上运行,并且集群中的其他节点会通过这个地址与驱动程序通信
* @return
*/
def optimizeDriver(memoryGB:Int=2,coreNum:Int=1,maxRstGB:Int=1,driverHost:String="localhost"):Builder
def optimizeExecutor(memoryGB:Int=1,coreNum:Int=1):Builder
/**
* 整体限制配置
* @param maxCores 整体可用的最大核数
* @param maxTaskFailure 单个任务失败的最大次数
* @param maxLocalWaitS 容错机制:数据读取阶段允许等待的最长时间,超过时间切换到其他副本。
* @return
*/
def optimizeLimit(maxCores:Int=4,maxTaskFailure:Int,maxLocalWaitS:Int=30):Builder
/**
* 默认使用:Java序列化
* 推荐使用:Kryo序列化 提速或对速度又要i去
* 所有的自定义类型都要注册到Spark中,才能完成序列化。
* @param serde 全包路径
* @param classes 自定义类型,默认认为不需要指定,Class[_]表示类型未知。
* @return Builder
*/
def optimizeSerializer(serde:String="org.apache.spark.serializer.JavaSerializer",clas:Array[Class[_]]=null):Builder
/**
* 在Spark的官方配置中,netTimeOutS可能被很多超时的数据调用。
* @param netTimeOusS 判定网络超时的时间
* @param schedulerMode 可能很多任务一起跑,因此公平调度
* @return
*/
def optimizeNetAbout(netTimeOusS:Int=180,schedulerMode:String="FAIR"):Builder
/**
* 动态分配->按需分配
* 类似于配置线程池中的最大闲置线程数,根据需要去做动态分配
* @param dynamicEnabled 是否开启动态分配
* @param initialExecutors 初始启用的Executors的数量
* @param minExecutors 最小启用的Executors的数量
* @param maxExecutors 最大启用的Executors的数量
* @return
*/
def optimizeDynamicAllocation(dynamicEnabled:Boolean=false,initialExecutors:Int=3,minExecutors:Int=0,maxExecutors:Int=6):Builder
/**
* 特指在没有指定分区数时,对分区数的配置。
* 并行度和初始启用的Executors的数量一致,避免额外开销。
*
* @param parallelism
* @param shuffleCompressEnabled
* @param maxSizeMB
* @param shuffleServiceEnabled
* @return
*/
def optimizeShuffle(parallelism:Int=3,shuffleCompressEnabled:Boolean=false,maxSizeMB:Int=128,shuffleServiceEnabled:Boolean=true):Builder
/**
* 推测执行,将运行时间长的任务,放到队列中,等待运行时间短的任务运行完成后,再运行。
* @param enabled
* @param interval Spark检查任务执行时间的时间间隔,单位是秒。
* @param quantile 如果某个任务的执行时间超过指定分位数(如75%的任务执行时间),则认为该任务执行时间过长,需要启动推测执行。
*/
def optimizeSpeculation(enabled:Boolean=false,interval:Int=15,quantile:Float=0.75F):Builder
def warehouseDir(hdfs:String):Builder
def end():SparkSession
}
}