SparkSQL函数综合实践

news2025/1/22 10:28:50

文章目录

  • 1. 实战概述
  • 2. 实战步骤
    • 2.1 创建项目
    • 2.2 添加依赖
    • 2.3 设置源目录
    • 2.4 创建日志属性文件
    • 2.5 创建hive配置文件
    • 2.6 创建数据分析对象
      • 2.6.1 导入相关类
      • 2.6.2 创建获取Spark会话方法
      • 2.6.3 创建表方法
      • 2.6.4 准备数据文件
      • 2.6.5 创建加载数据方法
      • 2.6.6 创建薪水排行榜方法
      • 2.6.7 创建主方法
      • 2.6.8 查看完整代码
    • 2.7 启动metastore服务
    • 2.8 运行程序,查看结果
    • 2.8 在Spark Shell里运行程序
  • 3. 实战小结

1. 实战概述

  • 通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。通过创建 Hive 表、加载 JSON 数据并使用 Spark SQL 查询每个城市工资最高的前 N 名员工,实现了数据的高效处理与分析。实战涵盖了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数的使用,适用于大数据处理场景。

2. 实战步骤

2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述
  • java目录改成scala目录
    在这里插入图片描述

2.2 添加依赖

  • pom.xml文件里添加相关依赖
    在这里插入图片描述
  • 刷新项目依赖
    在这里插入图片描述

2.3 设置源目录

  • pom.xml里设置源目录
    在这里插入图片描述

2.4 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

2.5 创建hive配置文件

  • resources里创建hive-site.xml文件
    在这里插入图片描述
  • bigdata1云主机上执行命令:$HIVE_HOME/conf/hive-site.xml,拷贝其内容到resources里的hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>bigdata1</value>
    </property>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata1:9083</value>
    </property>
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>
    <property>
        <name>hive.server2.active.passive.ha.enable</name>
        <value>true</value>
    </property>
</configuration>

2.6 创建数据分析对象

  • 添加scala-sdk到项目
    在这里插入图片描述

  • 单击【Add to Modules…】菜单项
    在这里插入图片描述

  • 单击【OK】按钮即可

  • 创建net.huawei.sql
    在这里插入图片描述

  • net.huawei.sql包里创建DataAnalysis对象
    在这里插入图片描述

2.6.1 导入相关类

  • 导入三个类:SparkConfSparkSessionDataFrame
    在这里插入图片描述

2.6.2 创建获取Spark会话方法

  • 创建getSparkSession()方法
    在这里插入图片描述
// 获取SparkSession对象                                      
def getSparkSession(): SparkSession = {                  
  // 创建SparkConf对象                                       
  val conf = new SparkConf()                             
  conf.setMaster("local[*]")                             
  conf.setAppName("DataAnalysis")                        
  conf.set("dfs.client.use.datanode.hostname", "true")   
                                                         
  // 创建SparkSession对象                                    
  SparkSession.builder()                                 
    .config(conf)                                        
    .enableHiveSupport()                                 
    .getOrCreate()                                       
}                                                                           

2.6.3 创建表方法

  • 创建createTable()方法
    在这里插入图片描述
// 创建表                                                   
def createTable(spark: SparkSession): Unit = {           
  spark.sql(                                             
    s"""                                                 
       |CREATE TABLE IF NOT EXISTS salary_info           
       |  (city string, name string, salary double)      
       |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  
       |""".stripMargin                                  
  )                                                      
}                                                        

2.6.4 准备数据文件

  • 在项目根目录创建data目录,在里面创建salary.json文件
    在这里插入图片描述
{"city": "北京", "name": "陈燕文", "salary": 5000.0}
{"city": "上海", "name": "李伟强", "salary": 8000.0}
{"city": "广州", "name": "王丽娜", "salary": 5500.0}
{"city": "北京", "name": "赵建国", "salary": 5200.0}
{"city": "上海", "name": "孙志强", "salary": 5300.0}
{"city": "广州", "name": "方云龙", "salary": 6800.0}
{"city": "北京", "name": "周晓峰", "salary": 6400.0}
{"city": "上海", "name": "吴雅婷", "salary": 5100.0}
{"city": "广州", "name": "郑文杰", "salary": 5600.0}
{"city": "上海", "name": "王海涛", "salary": 7500.0}
{"city": "北京", "name": "李雪梅", "salary": 5800.0}
{"city": "广州", "name": "童玉明", "salary": 7800.0}

2.6.5 创建加载数据方法

  • 创建loadData()方法
    在这里插入图片描述
// 加载数据                                                                          
def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {
  val fileDF: DataFrame = spark.read.format("json").load(inputPath)              
  fileDF.write.insertInto(tableName)                                             
}                                                                                

2.6.6 创建薪水排行榜方法

  • 创建salaryTopN()方法
    在这里插入图片描述
// 查询工资topN                                                                           
def salaryTopN(spark: SparkSession, topN: Int): Unit = {                              
  spark.sql(                                                                          
    s"""                                                                              
       |SELECT                                                                        
       |  city, name, salary                                                          
       |FROM                                                                          
       |  (                                                                           
       |    SELECT                                                                    
       |      city, name, salary,                                                     
       |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num   
       |    FROM                                                                      
       |      salary_info                                                             
       |  ) salary_rank                                                               
       |WHERE row_num <= $topN                                                        
       |""".stripMargin                                                               
  ).show()                                                                            
}                                                                                     
  • 代码说明salaryTopN 方法用于查询每个城市工资最高的前 topN 名员工。通过 row_number() 窗口函数按城市分组并按工资降序排序,生成行号 row_num,然后筛选出行号小于等于 topN 的记录。最终结果展示每个城市工资最高的前 topN 名员工的姓名和工资。

2.6.7 创建主方法

  • 通过 getSparkSession() 获取 SparkSession 实例,使用 createTable() 在 Hive 中创建表,调用 loadData() 加载数据并写入 Hive 表,通过 salaryTopN() 查询每个城市工资最高的前 N 名员工信息,最后释放资源。
    在这里插入图片描述
// 主方法                                                   
def main(args: Array[String]): Unit = {                  
  // 获取SparkSession对象                                    
  val spark = getSparkSession()                          
  // 创建表                                                 
  createTable(spark)                                     
  // 加载数据                                                
  loadData(spark, "data/salary.json", "salary_info")     
  // 查询工资top3                                            
  salaryTopN(spark, 3)                                   
}                                                        

2.6.8 查看完整代码

package net.huawei.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 功能:数据分析对象
 * 作者:华卫
 * 日期:2025年01月21日
 */
object DataAnalysis {
  // 获取SparkSession对象
  def getSparkSession(): SparkSession = {
    // 创建SparkConf对象
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("DataAnalysis")
    conf.set("dfs.client.use.datanode.hostname", "true")

    // 创建SparkSession对象
    SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
  }

  // 创建表
  def createTable(spark: SparkSession): Unit = {
    spark.sql(
      s"""
         |CREATE TABLE IF NOT EXISTS salary_info
         |  (city string, name string, salary double)
         |  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
         |""".stripMargin
    )
  }

  // 加载数据
  def loadData(spark: SparkSession, inputPath: String, tableName: String): Unit = {
    val fileDF: DataFrame = spark.read.format("json").load(inputPath)
    fileDF.write.insertInto(tableName)
  }

  // 查询工资topN
  def salaryTopN(spark: SparkSession, topN: Int): Unit = {
    spark.sql(
      s"""
         |SELECT
         |  city, name, salary
         |FROM
         |  (
         |    SELECT
         |      city, name, salary,
         |      row_number() OVER (PARTITION BY city ORDER BY salary DESC) AS row_num
         |    FROM
         |      salary_info
         |  ) salary_rank
         |WHERE row_num <= $topN
         |""".stripMargin
    ).show()
  }

  // 主方法
  def main(args: Array[String]): Unit = {
    // 获取SparkSession对象
    val spark = getSparkSession()
    // 创建表
    createTable(spark)
    // 加载数据
    loadData(spark, "data/salary.json", "salary_info")
    // 查询工资top3
    salaryTopN(spark, 3)
  }
}

2.7 启动metastore服务

  • 执行命令:hive --service metastore &
    在这里插入图片描述

2.8 运行程序,查看结果

  • 运行DataAnalysis对象
    在这里插入图片描述
  • hive客户端,查看创建的c
    在这里插入图片描述
  • 查看salary_info表的内容
    在这里插入图片描述
  • 在HDFS上查看salary_info表对应的目录
    在这里插入图片描述
  • 下载文件,查看内容
    在这里插入图片描述

2.8 在Spark Shell里运行程序

  • salary.json上传到HDFS的/data目录
    在这里插入图片描述

  • 在spark shell里执行命令::paste,粘贴代码
    在这里插入图片描述

  • Ctrl + D,查看结果
    在这里插入图片描述

3. 实战小结

  • 本次实战通过使用 Spark 和 Hive 进行数据分析,展示了从项目创建、依赖配置、数据加载到查询分析的完整流程。首先,我们创建了 Hive 表并加载了 JSON 数据,随后通过 Spark SQL 查询每个城市工资最高的前 N 名员工。实战中,我们使用了 SparkSession 初始化、Hive 表操作、数据加载及窗口函数等技术,实现了数据的高效处理与分析。通过本次实战,我们掌握了 Spark 和 Hive 的基本操作,并学会了如何在大数据场景下进行数据分析和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2280306.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux第103步_了解I2C总线框架

了解Linux中的I2C总线框架为后面做I2C实验做准备&#xff0c;学驱动&#xff0c;就是学习框架&#xff0c;了解是必须的。 1、了解Linux下的I2C子系统中的相关数据结构 struct i2c_adapter { struct module *owner; unsigned int class; /* classes to allow probing for …

开关电源基础

文章目录 线性电源与开关电源选用 开关稳压器脉宽调制简化的降压开关电源 开关电源类型输出电压分拓扑分 控制器与稳压器效率与 V o u t V_{out} Vout​ 同步与非同步隔离与非隔离非隔离式拓扑结构隔离式拓扑结构 线性电源与开关电源 线性稳压器就是我们通常说的LDO: 传输元件…

1. 基于图像的三维重建

1. 基于图像的三维重建 核心概念三维重建中深度图、点云的区别&#xff1f;深度图点云总结 深度图到点云还需要什么步骤&#xff1f;1. **获取相机内参**2. **生成相应的像素坐标**3. **计算三维坐标**4. **构建点云**5. **处理颜色信息&#xff08;可选&#xff09;**6. **去除…

智慧脚下生根,智能井盖监测终端引领城市安全新革命

在繁忙的都市生活中&#xff0c;我们往往只关注地面的繁华与喧嚣&#xff0c;却忽略了隐藏在地面之下的基础设施——井盖。这些看似不起眼的井盖&#xff0c;实则承担着排水、通讯、电力等重要功能&#xff0c;是城市安全运转的重要一环。然而&#xff0c;传统的井盖管理面临着…

62,【2】 BUUCTF WEB [强网杯 2019]Upload1

进入靶场 此处考点不是SQL&#xff0c;就正常注册并登录进去 先随便传一个 进行目录扫描&#xff0c;我先用爆破代替 先随便后面写个文件名 为了提供payload位置 www.tar.gz真的存在 返回浏览器修改url就自动下载了 看到tp5,应该是ThinkPHP5框架 参考此博客的思路方法c[强网杯…

IDEA导入Maven工程不识别pom.xml

0 现象 把阿里 sentinel 项目下载本地后&#xff0c;IDEA 中却没显示 maven 工具栏。 1 右键Maven Projects 点击IDEA右侧边栏的Maven Projects&#xff0c;再点击&#xff1a; 在出现的选择框中选择指定的未被识别的pom.xml即可&#xff1a; 2 Add as maven project 右键p…

LDD3学习9--数据类型和定时器

这部分对应的是第七章和第十一章&#xff0c;因为内容也不是很多&#xff0c;就一起写了。里面的内容基本上就是一个个的点&#xff0c;所以也就一个个点简单总结一下。 1 数据类型 1.1 数据长度 不同操作系统类型长度可能不一样&#xff0c;看图的话最好用u8&#xff0c;u16&…

latex去掉bibliography自带的reference

latex要去掉bibliography自带的reference其实很简单&#xff0c; 只需要加以下命令&#xff1a; \begingroup % 去掉thebibliography环境自带的“参考文献”标题 \renewcommand{\section}[2]{} 即可。 效果如图

AI在SEO中的关键词优化策略探讨

内容概要 在当今数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;正逐渐重塑搜索引擎优化&#xff08;SEO&#xff09;行业。AI技术的快速发展使得SEO策略发生了翻天覆地的变化&#xff0c;特别是在关键词优化方面。关键词优化的基本概念是通过选择与用户搜索意图密…

Windows 上安装 MongoDB 的 zip 包

博主介绍&#xff1a; 大家好&#xff0c;我是想成为Super的Yuperman&#xff0c;互联网宇宙厂经验&#xff0c;17年医疗健康行业的码拉松奔跑者&#xff0c;曾担任技术专家、架构师、研发总监负责和主导多个应用架构。 近期专注&#xff1a; RPA应用研究&#xff0c;主流厂商产…

Kotlin Bytedeco OpenCV 图像图像50 仿射变换 图像缩放

Kotlin Bytedeco OpenCV 图像图像50 仿射变换 图像缩放 1 添加依赖2 测试代码3 测试结果 在OpenCV中&#xff0c;仿射变换&#xff08;Affine Transformation&#xff09;和透视变换&#xff08;Perspective Transformation&#xff09;是两种常用的图像几何变换方法。 变换方…

传统企业怎样实现数字化转型升级?

​很多企业工厂都知道数字化转型是大势所趋。 但很多时候&#xff0c;仅仅知道是不行的&#xff0c;要贯彻落实才有意义。针对每个阶段&#xff0c;企业要有相应的行动方案&#xff0c;不行动&#xff0c;永远都是理论层面的&#xff0c;一点都没有用&#xff0c;这里就给大家…

HTML 表单和输入标签详解

HTML 表单是网页与用户交互的重要工具&#xff0c;它允许用户输入数据并将其提交到服务器。表单在网页中的应用非常广泛&#xff0c;例如登录、注册、搜索、评论等功能都离不开表单。本文将详细介绍 HTML 表单及其相关标签的使用方法&#xff0c;帮助你全面掌握表单的设计与实现…

机器学习(5):支持向量机

1 介绍 支持向量机&#xff08;Support Vector Machine&#xff0c;简称 SVM&#xff09;是一种监督学习算法&#xff0c;主要用于分类和回归问题。SVM 的核心思想是找到一个最优的超平面&#xff0c;将不同类别的数据分开。这个超平面不仅要能够正确分类数据&#xff0c;还要使…

ASP.NET Blazor部署方式有哪些?

今天我们来说说Blazor的三种部署方式&#xff0c;如果大家还不了解Blazor&#xff0c;那么我先简单介绍下Blazor Blazor 是一种 .NET 前端 Web 框架&#xff0c;在单个编程模型中同时支持服务器端呈现和客户端交互性&#xff1a; ● 使用 C# 创建丰富的交互式 UI。 ● 共享使用…

渗透测试--攻击常见的Web应用

本文章咱主要讨论&#xff0c;常见Web应用的攻击手法&#xff0c;其中并不完全&#xff0c;因为Web应用是在太多无法囊括全部&#xff0c;但其中的手法思想却值得我们借鉴&#xff0c;所以俺在此做了记录&#xff0c;希望对大家有帮助&#xff01;主要有以下内容&#xff1a; 1…

Spring Boot自动配置原理:如何实现零配置启动

引言 在现代软件开发中&#xff0c;Spring 框架已经成为 Java 开发领域不可或缺的一部分。而 Spring Boot 的出现&#xff0c;更是为 Spring 应用的开发带来了革命性的变化。Spring Boot 的核心优势之一就是它的“自动配置”能力&#xff0c;它极大地简化了 Spring 应用的配置…

PHP同城配送小程序

&#x1f680; 同城极速达——您生活中的极速配送大师 &#x1f4f1; 一款专为现代都市快节奏生活量身打造的同城配送小程序&#xff0c;同城极速达&#xff0c;集高效、便捷、智能于一身&#xff0c;依托ThinkPHPGatewayWorkerUniapp的强大架构&#xff0c;巧妙融合用户端、骑…

Kotlin Bytedeco OpenCV 图像图像57 图像ROI

Kotlin Bytedeco OpenCV 图像图像57 图像ROI 1 添加依赖2 测试代码3 测试结果 1 添加依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xmlns"http://maven.apache.o…

RabbitMQ集群安装rabbitmq_delayed_message_exchange

1、单节点安装rabbitmq安装延迟队列 安装延迟队列rabbitmq_delayed_message_exchange可以参考这个文章&#xff1a; rabbitmq安装延迟队列-CSDN博客 2、集群安装rabbitmq_delayed_message_exchange 在第二个节点 join_cluster 之后&#xff0c;start_app 就会报错了 (CaseC…