2.2 IP 工具类
需要将IP地址代码封装到工具类中,方便后续使用,在包【cn.itcast.spark.utils】创建工具类:IpUtils.scala,定义方法【convertIpToRegion】,传递参数【ip地址和DbSearch对象】,返回Region对象:封装ip、province和city样例类。
1)、Region样例类代码:
package cn.itcast.spark.etl
/**
* 封装解析IP地址信息至CaseClass中
*
* @param ip ip地址
* @param province 省份
* @param city 城市
*/
case class Region(
ip: String, //
province: String, //
city: String //
)
2)、工具类IpUtils代码如下:
package cn.itcast.spark.utils
import cn.itcast.spark.etl.Region
import org.lionsoul.ip2region.{DataBlock, DbSearcher}
/**
* IP地址解析工具类
*/
object IpUtils {
/**
* IP地址解析为省份和城市
* @param ip ip地址
* @param dbSearcher DbSearcher对象
* @return Region 省份城市信息
*/
def convertIpToRegion(ip: String, dbSearcher: DbSearcher): Region = {
// a. 依据IP地址解析
val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
val region: String = dataBlock.getRegion // 中国|0|海南省|海口市|教育网
// b. 分割字符串,获取省份和城市
val Array(_, _, province, city, _) = region.split("\\|")
// c. 返回Region对象
Region(ip, province, city)
}
}
2.3 Hive 表创建
将广告数据ETL后保存到Hive 分区表中,启动Hive交互式命令行【$HIVE_HOME/bin/hive】(必须在Hive中创建,否则有问题),创建数据库【itcast_ads】和表【pmt_ads_info】。
1)、创建数据库【itcast_ads】
-- 创建数据库,不存在时创建
-- DROP DATABASE IF EXISTS itcast_ads;
CREATE DATABASE IF NOT EXISTS itcast_ads;
USE itcast_ads;
2)、创建表【pmt_ads_info】,设置日期分区字段【date_str】,数据存储为parquet格式
/*
在Spark中不创建表,直接saveAsTable写入表且指定分区列时,Hive中可以查询表数据但查不到表的创建和修改信息,
此时创建的表也不是分区表。
*/
-- 设置表的数据snappy压缩
set parquet.compression=snappy ;
-- 创建表,不存在时创建
-- DROP TABLE IF EXISTS itcast_ads.pmt_ads_info;
CREATE TABLE IF NOT EXISTS itcast_ads.pmt_ads_info(
adcreativeid BIGINT,
adorderid BIGINT,
adpayment DOUBLE,
adplatformkey STRING,
adplatformproviderid BIGINT,
adppprice DOUBLE,
adprice DOUBLE,
adspacetype BIGINT,
adspacetypename STRING,
advertisersid BIGINT,
adxrate DOUBLE,
age STRING,
agentrate DOUBLE,
ah BIGINT,
androidid STRING,
androididmd5 STRING,
androididsha1 STRING,
appid STRING,
appname STRING,
apptype BIGINT,
aw BIGINT,
bidfloor DOUBLE,
bidprice DOUBLE,
callbackdate STRING,
channelid STRING,
cityname STRING,
client BIGINT,
cnywinprice DOUBLE,
cur STRING,
density STRING,
device STRING,
devicetype BIGINT,
district STRING,
email STRING,
idfa STRING,
idfamd5 STRING,
idfasha1 STRING,
imei STRING,
imeimd5 STRING,
imeisha1 STRING,
initbidprice DOUBLE,
ip STRING,
iptype BIGINT,
isbid BIGINT,
isbilling BIGINT,
iseffective BIGINT,
ispid BIGINT,
ispname STRING,
isqualityapp BIGINT,
iswin BIGINT,
keywords STRING,
lang STRING,
lat STRING,
lomarkrate DOUBLE,
mac STRING,
macmd5 STRING,
macsha1 STRING,
mediatype BIGINT,
networkmannerid BIGINT,
networkmannername STRING,
openudid STRING,
openudidmd5 STRING,
openudidsha1 STRING,
osversion STRING,
paymode BIGINT,
ph BIGINT,
processnode BIGINT,
provincename STRING,
putinmodeltype BIGINT,
pw BIGINT,
rate DOUBLE,
realip STRING,
reqdate STRING,
reqhour STRING,
requestdate STRING,
requestmode BIGINT,
rtbcity STRING,
rtbdistrict STRING,
rtbprovince STRING,
rtbstreet STRING,
sdkversion STRING,
sessionid STRING,
sex STRING,
storeurl STRING,
tagid STRING,
tel STRING,
title STRING,
userid STRING,
uuid STRING,
uuidunknow STRING,
winprice DOUBLE,
province STRING,
city STRING
)
PARTITIONED BY (date_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat';
-- STORED AS PARQUET ;
3)、查看表的分区及删除分区命令
-- 待DataFrame将数据保存Hive表
SHOW CREATE TABLE itcast_ads.pmt_ads_info ;
SHOW PARTITIONS itcast_ads.pmt_ads_info ;
SELECT ip, province, city FROM itcast_ads.pmt_ads_info WHERE date_str = '' limit 10;
-- 删除分区表数据
ALTER TABLE itcast_ads.pmt_ads_info DROP IF EXISTS PARTITION (date_str='2020-04-23');
此外,需要注意,使用MySQL8作为Hive MetaStore存储,初始化MetaStore时,建议先创建数据库,在手动创建相关元数据表,命令如下:
-- 配置Hive MetaStore存储为MySQL8时,手动初始化
-- 第一步、创建数据库
CREATE DATABASE hive_metastore CHARACTER SET latin1;
-- 第二步、初始化
/export/server/hive/bin/schematool --dbType mysql --initSchema
2.4 日期获取
由于广告数据属于离线业务分析,每日处理前一天数据,所以程序中需要获取到今日日期(或昨日日期),有两种方式:
第一种方式、编写日期工具类:DateUtils.scala,使用FastDateFormat转换获取日期
package cn.itcast.spark.utils
import java.util.{Calendar, Date}
import org.apache.commons.lang3.time.FastDateFormat
/**
* 日期时间工具类
*/
object DateUtils {
/**
* 获取当前的日期,格式为:20190710
*/
def getTodayDate(): String = {
// a. 获取当前日期
val nowDate = new Date()
// b. 转换日期格式
FastDateFormat.getInstance("yyyy-MM-dd").format(nowDate)
}
/**
* 获取昨日的日期,格式为:20190710
*/
def getYesterdayDate(): String = {
// a. 获取Calendar对象
val calendar: Calendar = Calendar.getInstance()
// b. 获取昨日日期
calendar.add(Calendar.DATE, -1)
// c. 转换日期格式
FastDateFormat.getInstance("yyyy-MM-dd").format(calendar)
}
}
第二种方式、SparkSQL中自带函数库:date_sub、date_add、current_date
2.5 数据ETL
编写Spark Application类:PmtEtlRunner,完成数据ETL操作,主要任务三点:
/**
* 广告数据进行ETL处理,具体步骤如下:
* 第一步、加载json数据
* 第二步、解析IP地址为省份和城市
* 第三步、数据保存至Hive表
*/
全部基于SparkSQL中DataFrame数据结构,使用DSL编程方式完成,其中涉及到DataFrame转换为RDD方便操作,对各个部分业务逻辑实现,封装到不同方法中:
第一点、解析IP地址为省份和城市,封装到:processData方法,接收DataFrame,返回DataFrame
第二点、保存数据DataFrame至Hive表或Parquet文件,封装到:saveAsHiveTable或saveAsParquet方法,接收DataFrame,无返回值Unit
ETL运行主类【PmtEtlRunner】中MAIN方法业务实现具体思路步骤如下:
// 1. 创建SparkSession实例对象
// 2. 加载json数据
// 3. 解析IP地址为省份和城市
// 4. 保存ETL数据至Hive分区表
// 5. 应用结束,关闭资源
完整代码如下:
package cn.itcast.spark.etl
import cn.itcast.spark.config.ApplicationConfig
import cn.itcast.spark.utils.{IpUtils, SparkUtils}
import org.apache.spark.SparkFiles
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}
/**
* 广告数据进行ETL处理,具体步骤如下:
* 第一步、加载json数据
* 第二步、解析IP地址为省份和城市
* 第三步、数据保存至Hive表
* TODO: 基于SparkSQL中DataFrame数据结构,使用DSL编程方式
*/
object PmtEtlRunner{
/**
* 对数据进行ETL处理,调用ip2Region第三方库,解析IP地址为省份和城市
*/
def processData(dataframe: DataFrame): DataFrame = {
// 获取SparkSession对象,并导入隐式转换
val spark: SparkSession = dataframe.sparkSession
import spark.implicits._
// 解析IP地址数据字典文件分发
spark.sparkContext.addFile(ApplicationConfig.IPS_DATA_REGION_PATH)
// TODO: 由于DataFrame弱类型(无泛型),不能直接使用mapPartitions或map,建议转换为RDD操作
// a. 解析IP地址
val newRowsRDD: RDD[Row] = dataframe.rdd.mapPartitions { iter =>
// 创建DbSearcher对象,针对每个分区创建一个,并不是每条数据创建一个
val dbSearcher = new DbSearcher(new DbConfig(), SparkFiles.get("ip2region.db"))
// 针对每个分区数据操作, 获取IP值,解析为省份和城市
iter.map { row =>
// i. 获取IP值
val ipValue: String = row.getAs[String]("ip")
// ii. 调用工具类解析ip地址
val region: Region = IpUtils.convertIpToRegion(ipValue, dbSearcher)
// iii. 将解析省份和城市追加到原来Row中
val newSeq: Seq[Any] = row.toSeq :+
region.province :+
region.city
// iv. 返回Row对象
Row.fromSeq(newSeq)
}
}
// b. 自定义Schema信息
val newSchema: StructType = dataframe.schema // 获取原来DataFrame中Schema信息
// 添加新字段Schema信息
.add("province", StringType, nullable = true)
.add("city", StringType, nullable = true)
// c. 将RDD转换为DataFrame
val df: DataFrame = spark.createDataFrame(newRowsRDD, newSchema)
// d. 添加一列日期字段,作为分区列
df.withColumn("date_str", date_sub(current_date(), 1).cast(StringType))
}
/**
* 保存数据至Parquet文件,列式存储
*/
def saveAsParquet(dataframe: DataFrame): Unit = {
dataframe
// 降低分区数目,保存文件时为一个文件
.coalesce(1)
.write
// 选择覆盖保存模式,如果失败再次运行保存,不存在重复数据
.mode(SaveMode.Overwrite)
// 设置分区列名称
.partitionBy("date_str")
.parquet("dataset/pmt-etl/")
}
/**
* 保存数据至Hive分区表中,按照日期字段分区
*/
def saveAsHiveTable(dataframe: DataFrame): Unit = {
dataframe
.coalesce(1)
.write
.format("hive") // 一定要指定为hive数据源,否则报错
.mode(SaveMode.Append)
.partitionBy("date_str")
.saveAsTable("itcast_ads.pmt_ads_info")
}
def main(args: Array[String]): Unit = {
// 设置Spark应用程序运行的用户:root, 默认情况下为当前系统用户
System.setProperty("user.name", "root")
System.setProperty("HADOOP_USER_NAME", "root")
// 1. 创建SparkSession实例对象
val spark: SparkSession = SparkUtils.createSparkSession(this.getClass)
import spark.implicits._
// 2. 加载json数据
val pmtDF: DataFrame = spark.read.json(ApplicationConfig.DATAS_PATH)
//pmtDF.printSchema()
//pmtDF.show(10, truncate = false)
// 3. 解析IP地址为省份和城市
val etlDF: DataFrame = processData(pmtDF)
//etlDF.printSchema()
//etlDF.select($"ip", $"province", $"city", $"date_str").show(10, truncate = false)
// 4. 保存ETL数据至Hive分区表
//saveAsParquet(etlDF)
saveAsHiveTable(etlDF)
// 5. 应用结束,关闭资源
//Thread.sleep(1000000)
spark.stop()
}
}
运行完成以后,启动Spark JDBC/ODBC ThriftServer服务,beeline客户端连接,查看表分区和数据条目数:
2.6 Spark 分布式缓存
在解析IP地址时,创建DbSearcher对象,需要加载【数据字典文件ip2region.db】,实际项目中进行【分布式缓存】数据文件,节省存储资源和提升程序性能。在Spark引用程序中通过addFile函数来分发数据文件,将数据分发到计算节点中:
第一步、addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夹(如果是文件夹,必须是HDFS路径);
第二步、Spark的Driver和Exector可以通过SparkFiles.get()方法来获取文件的绝对路径
注意:addFile把添加的本地文件传送给所有的Worker,这样能够保证在每个Worker上正确访问到文件。另外,Worker会把文件放在临时目录下。因此,比较适合用于文件比较小,计算比较复杂的场景。如果文件比较大,网络传送的消耗时间也会增长。在SparkContext中有一个函数:addJar,添加在SparkContext实例运行的作业所依赖的jar,其实Spark内部通过spark.jars参数以及spark.yarn.dist.jars函数传进去的Jar都是通过这个函数分发到Task的
2.7 扩展:Spark on Hive
SparkSQL集成Hive,加载读取Hive表数据进行分析,称之为Spark on Hive;此外,Hive 框架底层分区引擎,可以将MapReduce改为Spark,称之为Hive on Spark,两种区别如下:
- Spark on Hive:Spark通过Spark-SQL使用Hive语句,操作Hive,底层运行的还是Spark RDD
- 1)、通过SparkSQL,加载Hive的配置文件,获取到Hive的元数据信息
- 2)、SparkSQL获取到Hive的元数据信息之后,就可以拿到Hive的所有表的数据
- 3)、接下来就可以通过SparkSQL来操作Hive表中的数据
- Hive on Spark:
- 把Hive查询从MapReduce计算引擎替换为Spark Rdd 操作。相对于Spark on Hive,这个要实现起来则麻烦很多, 必须重新编译Spark和导入jar包,不过目前大部分使用的是Spark on Hive。
Spark如何操作Hive分区表,包括利用Spark DataFrame创建Hive的分区表和Spark向已经存在Hive分区表里插入数据,通常是在Hive中创建好表,保存DataFrame数据至Hive表。