第三章 业务报表分析
一般的系统需要使用报表来展示公司的运营情况、 数据情况等,本章节对数据进行一些常见报表的开发,广告数据业务报表数据流向图如下所示:
具体报表的需求如下:
相关报表开发说明如下:
- 第一、数据源:每天的日志数据,即ETL的结果数据,存储在Hive分区表,依据分区查询数据;
- 第二、报表分为两大类:基础报表统计(上图中①)和广告投放业务报表统计(上图中②);
- 第三、不同类型的报表的结果存储在MySQL不同表中,上述7个报表需求存储7个表中:
各地域分布统计:region_stat_analysis
广告区域统计:ads_region_analysis
广告APP统计:ads_app_analysis
广告设备统计:ads_device_analysis
广告网络类型统计:ads_network_analysis
广告运营商统计:ads_isp_analysis
广告渠道统计:ads_channel_analysis
- 第四、由于每天统计为定时统计,各个报表中加上统计日期字段:report_date;
3.1 报表运行主类
所有业务报表统计放在一个应用程序中,在实际运行时,要么都运行,要么都不运行,创建报表运行主类:PmtReportRunner.scala,将不同业务报表需求封装到不同类中进行单独处理,其中编程逻辑思路如下:
// 1. 创建SparkSession实例对象
// 2. 从Hive表中加载广告ETL数据,日期过滤
// 3. 依据不同业务需求开发报表
// 4. 应用结束,关闭资源
具体代码PmtReportRunner.scala如下:
package cn.itcast.spark.report
import cn.itcast.spark.utils.SparkUtils
import org.apache.spark.sql.functions.{current_date, date_sub}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 针对广告点击数据,依据需求进行报表开发,具体说明如下:
* - 各地域分布统计:region_stat_analysis
* - 广告区域统计:ads_region_analysis
* - 广告APP统计:ads_app_analysis
* - 广告设备统计:ads_device_analysis
* - 广告网络类型统计:ads_network_analysis
* - 广告运营商统计:ads_isp_analysis
* - 广告渠道统计:ads_channel_analysis
*/
object PmtReportRunner {
def main(args: Array[String]): Unit = {
// 设置Spark应用程序运行的用户:root, 默认情况下为当前系统用户
System.setProperty("user.name", "root")
System.setProperty("HADOOP_USER_NAME", "root")
// 1. 创建SparkSession实例对象
val spark: SparkSession = SparkUtils.createSparkSession(this.getClass)
import spark.implicits._
// 2. 从Hive表中加载广告ETL数据
val pmtDF: DataFrame = spark.read
.table("itcast_ads.pmt_ads_info")
.where($"date_str" === date_sub(current_date(), 1))
//pmtDF.printSchema()
//pmtDF.select("uuid", "ip", "province", "city").show(20, truncate = false)
// 如果没有加载到数据,结束程序
if(pmtDF.isEmpty){
System.exit(-1)
}
// TODO: 由于多张报表的开发,使用相同的数据,所以缓存
pmtDF.persist(StorageLevel.MEMORY_AND_DISK)
// 3. 依据不同业务需求开发报表
/*
不同业务报表统计分析时,两步骤:
i. 编写SQL或者DSL分析
ii. 将分析结果保存MySQL数据库表中
*/
// 3.1. 地域分布统计:region_stat_analysis
//RegionStateReport.doReport(pmtDF)
// 3.2. 广告区域统计:ads_region_analysis
//AdsRegionAnalysisReport.doReport(pmtDF)
// 3.3. 广告APP统计:ads_app_analysis
//AdsAppAnalysisReport.processData(pmtDF)
// 3.4. 广告设备统计:ads_device_analysis
//AdsDeviceAnalysisReport.processData(pmtDF)
// 3.5. 广告网络类型统计:ads_network_analysis
//AdsNetworkAnalysisReport.processData(pmtDF)
// 3.6. 广告运营商统计:ads_isp_analysis
//AdsIspAnalysisReport.processData(pmtDF)
// 3.7. 广告渠道统计:ads_channel_analysis
//AdsChannelAnalysisReport.processData(pmtDF)
// 数据不再使用,释放资源
pmtDF.unpersist()
// 4. 应用结束,关闭资源
//Thread.sleep(1000000)
spark.stop()
}
}
上述代码中,考虑到如果要处理昨日广告数据ETL没有完成,那么Hive分区表中没有数据,所以加载数据以后,调用DataFrame.isEmpty判断是否不为空。
3.2 各地域数量分布
按照地域(省份province和城市city)统计广告数据分布情况,看到不同地区有多少数据,从而能够地区优化公司运营策略,最终结果如下图所示:
数据库创建表
在MySQL数据库中创建数据库【itcast_ads_report】和表【region_stat_analysis】,DDL语句:
-- 创建数据库,不存在时创建
-- DROP DATABASE IF EXISTS itcast_ads_report;
CREATE DATABASE IF NOT EXISTS itcast_ads_report;
USE itcast_ads_report;
-- 创建表
-- DROP TABLE IF EXISTS itcast_ads_report.region_stat_analysis ;
CREATE TABLE `itcast_ads_report`.`region_stat_analysis` (
`report_date` varchar(255) NOT NULL,
`province` varchar(255) NOT NULL,
`city` varchar(255) NOT NULL
`count` bigint DEFAULT NULL,
PRIMARY KEY (`report_date`,`province`,`city`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
后面将报表结果数据保存MySQL表中时,采用的是自己编写代码,并不使用DataFrame自带format(“jdbc”)方式,不能满足需求:当某日报表统计程序运行多次时,插入数据到结果表中,采用Append最佳方式,主键冲突;采用OverWrite方式,将会将表删除,以前统计结果也都删除。因此,调用DataFrame中foreachPartition方法,将每个分区数据保存到表中,INSTER语句:
INSERT
INTO
itcast_ads_report.region_stat_analysis
(report_date, province, city, count)
VALUES
(?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
count=VALUES(count) ;
说明:Navicate 连接MySQL8 时,可以会报错,进行如下相关设置:
SHOW VARIABLES LIKE 'validate_password%';
/*
+--------------------------------------+--------+
| Variable_name | Value |
+--------------------------------------+--------+
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 8 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | MEDIUM |
| validate_password.special_char_count | 1 |
+--------------------------------------+--------+
*/
set global validate_password.policy=LOW;
set global validate_password.length = 6 ;
SHOW VARIABLES LIKE 'validate_password%';
/*
+--------------------------------------+-------+
| Variable_name | Value |
+--------------------------------------+-------+
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 6 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | LOW |
| validate_password.special_char_count | 1 |
+--------------------------------------+-------+
*/
flush privileges;
SELECT user,host,plugin from mysql.user ;
/*
+------------------+-----------+-----------------------+
| user | host | plugin |
+------------------+-----------+-----------------------+
| root | % | caching_sha2_password |
| mysql.infoschema | localhost | caching_sha2_password |
| mysql.session | localhost | caching_sha2_password |
| mysql.sys | localhost | caching_sha2_password |
+------------------+-----------+-----------------------+
*/
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
编写SQL
在Beeline客户端,编写SQL语句,完成报表开发需求,语句如下:
SELECT CAST(DATE_SUB(NOW(), 1) AS STRING) AS report_date,
province,
city,
COUNT(1) AS count
FROM itcast_ads.pmt_ads_info
WHERE date_str="2020-04-25"
GROUP BY province, city
ORDER BY count
DESC LIMIT 10 ;
执行语句返回结果截图:
报表开发
编写【RegionStateReport.scala】类,创建【doReport】方法,接收DataFrame为参数,进行报表统计,并最终保存至MySQL表中,封装保存结果数据代码至saveToMySQL方法中。
1)、业务实现代码
package cn.itcast.spark.report
import cn.itcast.spark.config.ApplicationConfig
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
/**
* 报表开发:按照地域维度(省份和城市)分组统计广告被点击次数
* 地域分布统计:region_stat_analysis
*/
object RegionStateReport {
/**
* 不同业务报表统计分析时,两步骤:
* i. 编写SQL或者DSL分析
* ii. 将分析结果保存MySQL数据库表中
*/
def doReport(dataframe: DataFrame): Unit = {
// 导入隐式转换及函数库
import dataframe.sparkSession.implicits._
import org.apache.spark.sql.functions._
// i. 使用DSL(调用DataFrame API)报表开发
val resultDF: DataFrame = dataframe
// 按照地域维度分组(省份和城市)
.groupBy($"province", $"city")
// 直接count函数统计,列名称为count
.count()
// 按照次数进行降序排序
.orderBy($"count".desc)
// 添加报表字段(报表统计的日期)
.withColumn(
"report_date", // 报表日期字段
// TODO:首先获取当前日期,再减去1天获取昨天日期,转换为字符串类型
date_sub(current_date(), 1).cast(StringType)
)
//resultDF.printSchema()
resultDF.show(50, truncate = false)
// ii. 保存分析报表结果到MySQL表中
//saveResultToMySQL(resultDF)
// 将DataFrame转换为RDD操作,或者转换为Dataset操作
//resultDF.coalesce(1).rdd.foreachPartition(iter => saveToMySQL(iter))
}
}
运行PmtReportRunner报表主类程序,结果如下:
2)、可以直接使用DataFrame.format(“jdbc”)至MySQL表
/**
* 保存数据至MySQL表中,直接使用DataFrame Writer操作,但是不符合实际应用需求
*/
def saveResultToMySQL(dataframe: DataFrame): Unit = {
dataframe
.coalesce(1)
.write
// Overwrite表示,当表存在时,先删除表,再创建表和插入数据, 所以不用此种方式
//.mode(SaveMode.Overwrite)
// TODO: 当多次运行程序时,比如对某日广告数据报表分析运行两次,由于报表结果主键存在数据库表中,产生
冲突,导致报错失败
.mode(SaveMode.Append)
.format("jdbc")
// 设置MySQL数据库相关属性
.option("driver", ApplicationConfig.MYSQL_JDBC_DRIVER)
.option("url", ApplicationConfig.MYSQL_JDBC_URL)
.option("user", ApplicationConfig.MYSQL_JDBC_USERNAME)
.option("password", ApplicationConfig.MYSQL_JDBC_PASSWORD)
.option("dbtable", "itcast_ads_report.region_stat_analysis")
.save()
}
保存方式选择Append追加或覆写Overwrite,都会出现问题,所以在实际项目开发中,使用SparkSQL分析数据报表报错数据库时,往往不会使用dataframe.write.jdbc方式。
3)、自己编写JDBC代码,插入数据到数据库表中:当主键存在时更新值,不存在时插入值。
/**
* 方式一:
* REPLACE INTO test(title,uid) VALUES ('1234657','1003');
* 方式二:
* INSERT INTO table (a,b,c) VALUES (1,2,3) ON DUPLICATE KEY UPDATE c=c+1;
*/
定义方法【saveToMySQL】传递参数【Iterator[Row]:每个分区数据】,代码如下:
import java.sql.{Connection, DriverManager, PreparedStatement}
/**
* 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入
*/
def saveToMySQL(datas: Iterator[Row]): Unit = {
// a. 加载驱动类
Class.forName(ApplicationConfig.MYSQL_JDBC_DRIVER)
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// b. 获取连接
conn = DriverManager.getConnection(
ApplicationConfig.MYSQL_JDBC_URL, //
ApplicationConfig.MYSQL_JDBC_USERNAME, //
ApplicationConfig.MYSQL_JDBC_PASSWORD
)
// c. 获取PreparedStatement对象
val insertSql ="""
|INSERT
|INTO
| itcast_ads_report.region_stat_analysis
| (report_date, province, city, count)
|VALUES (?, ?, ?, ?)
| ON DUPLICATE KEY UPDATE
| count=VALUES (count)
|""".stripMargin
pstmt = conn.prepareStatement(insertSql)
conn.setAutoCommit(false)
// d. 将分区中数据插入到表中,批量插入
datas.foreach{ row =>
pstmt.setString(1, row.getAs[String]("report_date"))
pstmt.setString(2, row.getAs[String]("province"))
pstmt.setString(3, row.getAs[String]("city"))
pstmt.setLong(4, row.getAs[Long]("count"))
// 加入批次
pstmt.addBatch()
}
// TODO: 批量插入
pstmt.executeBatch()
conn.commit()
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
采用批量插入的方式将RDD分区数据插入到MySQL表中,提升性能。
完整代码
报表开发程序【RegionStateReport.scala】完整代码如下:
package cn.itcast.spark.report
import java.sql.{Connection, DriverManager, PreparedStatement}
import cn.itcast.spark.config.ApplicationConfig
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
/**
* 报表开发:按照地域维度(省份和城市)分组统计广告被点击次数
* 地域分布统计:region_stat_analysis
*/
object RegionStateReport {
/**
* 不同业务报表统计分析时,两步骤:
* i. 编写SQL或者DSL分析
* ii. 将分析结果保存MySQL数据库表中
*/
def doReport(dataframe: DataFrame): Unit = {
// 导入隐式转换及函数库
import dataframe.sparkSession.implicits._
import org.apache.spark.sql.functions._
// i. 使用DSL(调用DataFrame API)报表开发
val resultDF: DataFrame = dataframe
// 按照地域维度分组(省份和城市)
.groupBy($"province", $"city")
// 直接count函数统计,列列名称为count
.count()
// 按照次数进行降序排序
.orderBy($"count".desc)
// 添加报表字段(报表统计的日期)
.withColumn(
"report_date", // 报表日期字段
// TODO:首先获取当前日期,再减去1天获取昨天日期,转换为字符串类型
date_sub(current_date(), 1).cast(StringType)
)
//resultDF.printSchema()
resultDF.show(10, truncate = false)
// ii. 保存分析报表结果到MySQL表中
//saveResultToMySQL(resultDF)
// 将DataFrame转换为RDD操作,或者转换为Dataset操作
resultDF.coalesce(1).rdd.foreachPartition(iter => saveToMySQL(iter))
}
/**
* 保存数据至MySQL表中,直接使用DataFrame Writer操作,但是不符合实际应用需求
*/
def saveResultToMySQL(dataframe: DataFrame): Unit = {
dataframe
.coalesce(1)
.write
// Overwrite表示,当表存在时,先删除表,再创建表和插入数据, 所以不用此种方式
//.mode(SaveMode.Overwrite)
// TODO: 当多次运行程序时,比如对某日广告数据报表分析运行两次,由于报表结果主键存在数据库表中,产生
冲突,导致报错失败
.mode(SaveMode.Append)
.format("jdbc")
// 设置MySQL数据库相关属性
.option("driver", ApplicationConfig.MYSQL_JDBC_DRIVER)
.option("url", ApplicationConfig.MYSQL_JDBC_URL)
.option("user", ApplicationConfig.MYSQL_JDBC_USERNAME)
.option("password", ApplicationConfig.MYSQL_JDBC_PASSWORD)
.option("dbtable", "itcast_ads_report.region_stat_analysis")
.save()
}
/**
* 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入
*/
def saveToMySQL(datas: Iterator[Row]): Unit = {
// a. 加载驱动类
Class.forName(ApplicationConfig.MYSQL_JDBC_DRIVER)
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// b. 获取连接
conn = DriverManager.getConnection(
ApplicationConfig.MYSQL_JDBC_URL, //
ApplicationConfig.MYSQL_JDBC_USERNAME, //
ApplicationConfig.MYSQL_JDBC_PASSWORD
)
// c. 获取PreparedStatement对象
val insertSql ="""
|INSERT
|INTO
| itcast_ads_report.region_stat_analysis
| (report_date, province, city, count)
|VALUES (?, ?, ?, ?)
| ON DUPLICATE KEY UPDATE count= VALUES(count)
|""".stripMargin
pstmt = conn.prepareStatement(insertSql)
conn.setAutoCommit(false)
// d. 将分区中数据插入到表中,批量插入
datas.foreach{ row =>
pstmt.setString(1, row.getAs[String]("report_date"))
pstmt.setString(2, row.getAs[String]("province"))
pstmt.setString(3, row.getAs[String]("city"))
pstmt.setLong(4, row.getAs[Long]("count"))
// 加入批次
pstmt.addBatch()
}
// TODO: 批量插入
pstmt.executeBatch()
conn.commit()
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
}