【Spark分布式内存计算框架——离线综合实战】4. IP 工具类

news2024/11/29 12:40:48

2.2 IP 工具类

需要将IP地址代码封装到工具类中,方便后续使用,在包【cn.itcast.spark.utils】创建工具类:IpUtils.scala,定义方法【convertIpToRegion】,传递参数【ip地址和DbSearch对象】,返回Region对象:封装ip、province和city样例类。

1)、Region样例类代码:

package cn.itcast.spark.etl
/**
* 封装解析IP地址信息至CaseClass中
*
* @param ip ip地址
* @param province 省份
* @param city 城市
*/
case class Region(
ip: String, //
province: String, //
city: String //
)

2)、工具类IpUtils代码如下:

package cn.itcast.spark.utils
import cn.itcast.spark.etl.Region
import org.lionsoul.ip2region.{DataBlock, DbSearcher}
/**
* IP地址解析工具类
*/
object IpUtils {
/**
* IP地址解析为省份和城市
* @param ip ip地址
* @param dbSearcher DbSearcher对象
* @return Region 省份城市信息
*/
def convertIpToRegion(ip: String, dbSearcher: DbSearcher): Region = {
// a. 依据IP地址解析
val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
val region: String = dataBlock.getRegion // 中国|0|海南省|海口市|教育网
// b. 分割字符串,获取省份和城市
val Array(_, _, province, city, _) = region.split("\\|")
// c. 返回Region对象
Region(ip, province, city)
}
}

2.3 Hive 表创建

将广告数据ETL后保存到Hive 分区表中,启动Hive交互式命令行【$HIVE_HOME/bin/hive】(必须在Hive中创建,否则有问题),创建数据库【itcast_ads】和表【pmt_ads_info】。

1)、创建数据库【itcast_ads】

-- 创建数据库,不存在时创建
-- DROP DATABASE IF EXISTS itcast_ads;
CREATE DATABASE IF NOT EXISTS itcast_ads;
USE itcast_ads;

2)、创建表【pmt_ads_info】,设置日期分区字段【date_str】,数据存储为parquet格式

/*
在Spark中不创建表,直接saveAsTable写入表且指定分区列时,Hive中可以查询表数据但查不到表的创建和修改信息,
此时创建的表也不是分区表。
*/
-- 设置表的数据snappy压缩
set parquet.compression=snappy ;
-- 创建表,不存在时创建
-- DROP TABLE IF EXISTS itcast_ads.pmt_ads_info;
CREATE TABLE IF NOT EXISTS itcast_ads.pmt_ads_info(
adcreativeid BIGINT,
adorderid BIGINT,
adpayment DOUBLE,
adplatformkey STRING,
adplatformproviderid BIGINT,
adppprice DOUBLE,
adprice DOUBLE,
adspacetype BIGINT,
adspacetypename STRING,
advertisersid BIGINT,
adxrate DOUBLE,
age STRING,
agentrate DOUBLE,
ah BIGINT,
androidid STRING,
androididmd5 STRING,
androididsha1 STRING,
appid STRING,
appname STRING,
apptype BIGINT,
aw BIGINT,
bidfloor DOUBLE,
bidprice DOUBLE,
callbackdate STRING,
channelid STRING,
cityname STRING,
client BIGINT,
cnywinprice DOUBLE,
cur STRING,
density STRING,
device STRING,
devicetype BIGINT,
district STRING,
email STRING,
idfa STRING,
idfamd5 STRING,
idfasha1 STRING,
imei STRING,
imeimd5 STRING,
imeisha1 STRING,
initbidprice DOUBLE,
ip STRING,
iptype BIGINT,
isbid BIGINT,
isbilling BIGINT,
iseffective BIGINT,
ispid BIGINT,
ispname STRING,
isqualityapp BIGINT,
iswin BIGINT,
keywords STRING,
lang STRING,
lat STRING,
lomarkrate DOUBLE,
mac STRING,
macmd5 STRING,
macsha1 STRING,
mediatype BIGINT,
networkmannerid BIGINT,
networkmannername STRING,
openudid STRING,
openudidmd5 STRING,
openudidsha1 STRING,
osversion STRING,
paymode BIGINT,
ph BIGINT,
processnode BIGINT,
provincename STRING,
putinmodeltype BIGINT,
pw BIGINT,
rate DOUBLE,
realip STRING,
reqdate STRING,
reqhour STRING,
requestdate STRING,
requestmode BIGINT,
rtbcity STRING,
rtbdistrict STRING,
rtbprovince STRING,
rtbstreet STRING,
sdkversion STRING,
sessionid STRING,
sex STRING,
storeurl STRING,
tagid STRING,
tel STRING,
title STRING,
userid STRING,
uuid STRING,
uuidunknow STRING,
winprice DOUBLE,
province STRING,
city STRING
)
PARTITIONED BY (date_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat';
-- STORED AS PARQUET ;

3)、查看表的分区及删除分区命令

-- 待DataFrame将数据保存Hive表
SHOW CREATE TABLE itcast_ads.pmt_ads_info ;
SHOW PARTITIONS itcast_ads.pmt_ads_info ;
SELECT ip, province, city FROM itcast_ads.pmt_ads_info WHERE date_str = '' limit 10;
-- 删除分区表数据
ALTER TABLE itcast_ads.pmt_ads_info DROP IF EXISTS PARTITION (date_str='2020-04-23');

此外,需要注意,使用MySQL8作为Hive MetaStore存储,初始化MetaStore时,建议先创建数据库,在手动创建相关元数据表,命令如下:

-- 配置Hive MetaStore存储为MySQL8时,手动初始化
-- 第一步、创建数据库
CREATE DATABASE hive_metastore CHARACTER SET latin1;
-- 第二步、初始化
/export/server/hive/bin/schematool --dbType mysql --initSchema

2.4 日期获取

由于广告数据属于离线业务分析,每日处理前一天数据,所以程序中需要获取到今日日期(或昨日日期),有两种方式:

第一种方式、编写日期工具类:DateUtils.scala,使用FastDateFormat转换获取日期

package cn.itcast.spark.utils
import java.util.{Calendar, Date}
import org.apache.commons.lang3.time.FastDateFormat
/**
* 日期时间工具类
*/
object DateUtils {
/**
* 获取当前的日期,格式为:20190710
*/
def getTodayDate(): String = {
// a. 获取当前日期
val nowDate = new Date()
// b. 转换日期格式
FastDateFormat.getInstance("yyyy-MM-dd").format(nowDate)
}
/**
* 获取昨日的日期,格式为:20190710
*/
def getYesterdayDate(): String = {
// a. 获取Calendar对象
val calendar: Calendar = Calendar.getInstance()
// b. 获取昨日日期
calendar.add(Calendar.DATE, -1)
// c. 转换日期格式
FastDateFormat.getInstance("yyyy-MM-dd").format(calendar)
}
}

第二种方式、SparkSQL中自带函数库:date_sub、date_add、current_date
在这里插入图片描述

2.5 数据ETL

编写Spark Application类:PmtEtlRunner,完成数据ETL操作,主要任务三点:

/**
* 广告数据进行ETL处理,具体步骤如下:
* 第一步、加载json数据
* 第二步、解析IP地址为省份和城市
* 第三步、数据保存至Hive表
*/

全部基于SparkSQL中DataFrame数据结构,使用DSL编程方式完成,其中涉及到DataFrame转换为RDD方便操作,对各个部分业务逻辑实现,封装到不同方法中:

第一点、解析IP地址为省份和城市,封装到:processData方法,接收DataFrame,返回DataFrame
在这里插入图片描述
第二点、保存数据DataFrame至Hive表或Parquet文件,封装到:saveAsHiveTable或saveAsParquet方法,接收DataFrame,无返回值Unit
在这里插入图片描述
ETL运行主类【PmtEtlRunner】中MAIN方法业务实现具体思路步骤如下:

// 1. 创建SparkSession实例对象
// 2. 加载json数据
// 3. 解析IP地址为省份和城市
// 4. 保存ETL数据至Hive分区表
// 5. 应用结束,关闭资源

完整代码如下:

package cn.itcast.spark.etl
import cn.itcast.spark.config.ApplicationConfig
import cn.itcast.spark.utils.{IpUtils, SparkUtils}
import org.apache.spark.SparkFiles
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}
/**
* 广告数据进行ETL处理,具体步骤如下:
* 第一步、加载json数据
* 第二步、解析IP地址为省份和城市
* 第三步、数据保存至Hive表
* TODO: 基于SparkSQL中DataFrame数据结构,使用DSL编程方式
*/
object PmtEtlRunner{
/**
* 对数据进行ETL处理,调用ip2Region第三方库,解析IP地址为省份和城市
*/
def processData(dataframe: DataFrame): DataFrame = {
// 获取SparkSession对象,并导入隐式转换
val spark: SparkSession = dataframe.sparkSession
import spark.implicits._
// 解析IP地址数据字典文件分发
spark.sparkContext.addFile(ApplicationConfig.IPS_DATA_REGION_PATH)
// TODO: 由于DataFrame弱类型(无泛型),不能直接使用mapPartitions或map,建议转换为RDD操作
// a. 解析IP地址
val newRowsRDD: RDD[Row] = dataframe.rdd.mapPartitions { iter =>
// 创建DbSearcher对象,针对每个分区创建一个,并不是每条数据创建一个
val dbSearcher = new DbSearcher(new DbConfig(), SparkFiles.get("ip2region.db"))
// 针对每个分区数据操作, 获取IP值,解析为省份和城市
iter.map { row =>
// i. 获取IP值
val ipValue: String = row.getAs[String]("ip")
// ii. 调用工具类解析ip地址
val region: Region = IpUtils.convertIpToRegion(ipValue, dbSearcher)
// iii. 将解析省份和城市追加到原来Row中
val newSeq: Seq[Any] = row.toSeq :+
region.province :+
region.city
// iv. 返回Row对象
Row.fromSeq(newSeq)
}
}
// b. 自定义Schema信息
val newSchema: StructType = dataframe.schema // 获取原来DataFrame中Schema信息
// 添加新字段Schema信息
.add("province", StringType, nullable = true)
.add("city", StringType, nullable = true)
// c. 将RDD转换为DataFrame
val df: DataFrame = spark.createDataFrame(newRowsRDD, newSchema)
// d. 添加一列日期字段,作为分区列
df.withColumn("date_str", date_sub(current_date(), 1).cast(StringType))
}
/**
* 保存数据至Parquet文件,列式存储
*/
def saveAsParquet(dataframe: DataFrame): Unit = {
dataframe
// 降低分区数目,保存文件时为一个文件
.coalesce(1)
.write
// 选择覆盖保存模式,如果失败再次运行保存,不存在重复数据
.mode(SaveMode.Overwrite)
// 设置分区列名称
.partitionBy("date_str")
.parquet("dataset/pmt-etl/")
}
/**
* 保存数据至Hive分区表中,按照日期字段分区
*/
def saveAsHiveTable(dataframe: DataFrame): Unit = {
dataframe
.coalesce(1)
.write
.format("hive") // 一定要指定为hive数据源,否则报错
.mode(SaveMode.Append)
.partitionBy("date_str")
.saveAsTable("itcast_ads.pmt_ads_info")
}
def main(args: Array[String]): Unit = {
// 设置Spark应用程序运行的用户:root, 默认情况下为当前系统用户
System.setProperty("user.name", "root")
System.setProperty("HADOOP_USER_NAME", "root")
// 1. 创建SparkSession实例对象
val spark: SparkSession = SparkUtils.createSparkSession(this.getClass)
import spark.implicits._
// 2. 加载json数据
val pmtDF: DataFrame = spark.read.json(ApplicationConfig.DATAS_PATH)
//pmtDF.printSchema()
//pmtDF.show(10, truncate = false)
// 3. 解析IP地址为省份和城市
val etlDF: DataFrame = processData(pmtDF)
//etlDF.printSchema()
//etlDF.select($"ip", $"province", $"city", $"date_str").show(10, truncate = false)
// 4. 保存ETL数据至Hive分区表
//saveAsParquet(etlDF)
saveAsHiveTable(etlDF)
// 5. 应用结束,关闭资源
//Thread.sleep(1000000)
spark.stop()
}
}

运行完成以后,启动Spark JDBC/ODBC ThriftServer服务,beeline客户端连接,查看表分区和数据条目数:
在这里插入图片描述

2.6 Spark 分布式缓存

在解析IP地址时,创建DbSearcher对象,需要加载【数据字典文件ip2region.db】,实际项目中进行【分布式缓存】数据文件,节省存储资源和提升程序性能。在Spark引用程序中通过addFile函数来分发数据文件,将数据分发到计算节点中:

第一步、addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夹(如果是文件夹,必须是HDFS路径);
在这里插入图片描述
第二步、Spark的Driver和Exector可以通过SparkFiles.get()方法来获取文件的绝对路径
在这里插入图片描述
注意:addFile把添加的本地文件传送给所有的Worker,这样能够保证在每个Worker上正确访问到文件。另外,Worker会把文件放在临时目录下。因此,比较适合用于文件比较小,计算比较复杂的场景。如果文件比较大,网络传送的消耗时间也会增长。在SparkContext中有一个函数:addJar,添加在SparkContext实例运行的作业所依赖的jar,其实Spark内部通过spark.jars参数以及spark.yarn.dist.jars函数传进去的Jar都是通过这个函数分发到Task的

2.7 扩展:Spark on Hive

SparkSQL集成Hive,加载读取Hive表数据进行分析,称之为Spark on Hive;此外,Hive 框架底层分区引擎,可以将MapReduce改为Spark,称之为Hive on Spark,两种区别如下:

  • Spark on Hive:Spark通过Spark-SQL使用Hive语句,操作Hive,底层运行的还是Spark RDD
    • 1)、通过SparkSQL,加载Hive的配置文件,获取到Hive的元数据信息
    • 2)、SparkSQL获取到Hive的元数据信息之后,就可以拿到Hive的所有表的数据
    • 3)、接下来就可以通过SparkSQL来操作Hive表中的数据

在这里插入图片描述

  • Hive on Spark:
    • 把Hive查询从MapReduce计算引擎替换为Spark Rdd 操作。相对于Spark on Hive,这个要实现起来则麻烦很多, 必须重新编译Spark和导入jar包,不过目前大部分使用的是Spark on Hive。

Spark如何操作Hive分区表,包括利用Spark DataFrame创建Hive的分区表和Spark向已经存在Hive分区表里插入数据,通常是在Hive中创建好表,保存DataFrame数据至Hive表。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/367376.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构-树的理解

目录 一:要解决的问题,出发点 1.演进 树的定义: 树的深度(高度) 平衡二叉树(AVL树) 红黑树: B树: 深夜有感,灵感乍现,忽然感觉对这个数据结…

Unity(二)--通过简单例子了解UGUI几个常用对象操作(Text,Image,Button)

目录 文本框等UI对象的添加Canvas 画布给Canvas添加脚本,绑定要操作的对象文本框Text的使用图像Image的使用更换图片Type:显示图片相关按钮Button的使用过渡导航事件绑定文本框等UI对象的添加 Canvas 画布 当创建一个UI元素的时候,如果没有Canvas 画布,就会自动创建一个画布…

学习资料|SSH隧道端口转发功能详解

概念ssh隧道大致可以分为3种,分别为本地端口转发,远程端口转发,动态端口转发,本文将让你彻底搞懂这3个转发的命令表达形式,让你能够灵活运用解决生活中的各种特殊场景。如果你正在使用mobaxterm、xshell、secureCRT、p…

怎样深度学习?主题碾压式学习法

怎样最深度的学习?【主题碾压式!】 对一个学习主题,大体量投入学习资源 进行对比和实践 会取得突破 限定在社会科学和社会应用范围 趣讲大白话:大力出奇迹 【趣讲信息科技:84期,下期预告:很少有…

C++条件变量唤醒问题 notify_one() 唤醒不及时问题

条件变量唤醒问题 & notify_one() 唤醒不及时问题 因为我对于 C中条件变量的等待唤醒部分、notify_all & notify_one 的区别方面有些疑点,因此就有了以下的同 chatgpt 的沟通,希望同样能够帮助到大家 感叹于 chatgpt的强大 问题? 我比…

(三十二)大白话MySQL一起来看看INSRET语句的undo log回滚日志长什么样?

昨天我们讲解了undo log回滚日志的作用,说白了,就是你执行事务的时候,里面很多INSERT、UPDATE和DELETE语句都在更新缓存页里的数据,但是万一事务回滚,你必须有每条SQL语句对应的undo log回滚日志,根据回滚日…

Docker 名词介绍

Docker核心名词镜像文件镜像:简单理解为就是一个安装包,里面包含容器所需要运行的的基础文件和配置信息,比如:redis镜像、mysql镜像等。镜像的来源方式:1. 自己做镜像 比如(自己开发微服务项目)2. 拉取别人…

python学习笔记——数据类型总结

1.基本数据类型  数据类型对应的内置函数:将其他类型,转换成自己的类型。 int()float()bool()str()list()tuple()set()dict() 2.数据类型对比  3.列表 w [a,b,c] #查 print(w[0]) print(w[0:3:2]) #增 w.appe…

css 属性和属性值的定义

文章目录css文本属性作业列表属性背景属性作业css文本属性 序号属性描述说明1font-size字体大小浏览器默认16px;2font-family字体当字体是中文字体,英文字体,中间有空格时候,要加双引号,多字体之间用逗号隔开 默认微软…

绿通科技在创业板开启申购:超额募资约19亿元,收入依赖贴牌

2月23日,广东绿通新能源电动车科技股份有限公司(下称“绿通科技”,SZ:301322)开启申购。据贝多财经了解,绿通科技本次上市的发行价为131.11元/股,发行数量为1749万股,市盈率73.75倍。 按发行价…

为什么数字孪生技术对工业物联网基础设施至关重要

随着工业物联网基础设施的不断建设和发展,数字孪生技术的重要性也变得越来越明显。由于数字孪生模型是工厂或其资产的虚拟版本,其高度精确和详细的特点使决策者获得了更高的可见性。下面让我们了解一下数字孪生技术给工业物联网基础设施带来的有效帮助。…

【Axure教程】自动生成页码的中继器表格

当表格数据较多时,我们经常会分页显示,这时我们就需要用到页码的元件了。所以作者今天就教大家如何在Axure中制作一个能自动根据中继器表格的数据以及分页情况,自动生成对应页码的原型模板。一、效果展示1、页码能根据表格数据和每页显示条数…

Apache Commons FileUpload Apache Tomcat拒绝服务漏洞解决方案

近日,安全狗应急响应中心关注到Apache官方发布安全公告,披露在Apache Commons FileUpload<1.5版本中存在一处拒绝服务漏洞(CVE-2023-24998)。Commons FileUpload是Apache组织提供的免费的上传组件。由于Apache Commons…

pyaudio声卡信息中hostApi是什么意思?

hostApi是声卡驱动协议,声卡驱动模式,有如下很多类。下面的类型是网上找的PortAudio的类,不不确定是不是python的。typedef enum PaHostApiTypeId{paInDevelopment0, /* use while developing support for a new host API */paDirectSound1,p…

EMC诊断技术

第一课 探讨EMC诊断技术-滤波篇EMC法规:CISPR16-1、GB/T 6113.1 GB/T 7343dBuV3dB是1.41倍6dB是2倍20dB是10倍差模噪声在电源/信号到地上走,差模噪声是电源/地/信号/到EGNDEMI滤波器的性能指标:滤波器插入损坏(共模插损、差模插损)

儿童全脑九大能力,3-6岁的家长都应该知道

什么是全脑? 人的大脑分左右两个半球,形态虽然相似,功能却各有不同。其中,左脑负责文字、数学、计算、分析、逻辑、顺序、事实和记忆,掌管右侧肢体的感觉和运动;右脑则负责颜色、音乐、想象、韵律、感觉、…

【原创】java+swing+mysql物业管理系统设计与实现

之前的文章里也讲过物业管理系统的开发,不过使用的是javaweb技术,bs架构,网页的形式。今天我们主要介绍使用javaswing技术同样去开发一套物业管理系统。以方便管理员进行物业信息的管理。 功能分析: 物业管理系统主要是为了方便…

买卖股票的最佳时机II-力扣122-java贪心策略

一、题目描述给你一个整数数组 prices ,其中 prices[i] 表示某支股票第 i 天的价格。在每一天,你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可以先购买,然后在 同一天 出售。 返回 你能获得的 最大 利润 。…

SAP PI PO JDBC接口培训视频

SAP PI PO JDBC接口培训视频XML Document Format for the Message Protocol XML SQL Format You can modify one or more database tables by means of a message. Depending on the content of the message, you can either insert (INSERT), update (UPDATE), or delete (DEL…

Java ”框架 = 注解 + 反射 + 设计模式“ 之 反射详解

Java ”框架 注解 反射 设计模式“ 之 反射详解 每博一文案 无论幸福还是苦难,无论光荣还是屈辱,你都要自己遭遇与承受。—————— 《平凡的世界》 孙少平多少美好的东西消失和毁灭了,世界还像什么事也没有发生,是的&#…