SparkSQL与Hive的整合

news2024/12/12 6:03:59

文章目录

      • SparkSQL与Hive的整合
        • 1.1. Spark On Hive
          • 1.1.1. Hive的准备工作
          • 1.1.2. Spark的准备工作
          • 1.1.3. Spark代码开发
          • 1.1.4. Spark On Hive案例
        • 1.2. Hive On Spark
        • 1.3. SparkSQL命令行
        • 1.4. SparkSQL分布式查询引擎
          • 1.4.1. 开启ThriftServer服务
          • 1.4.2. beeline连接ThriftServer
          • 1.4.3. 代码连接
          • 1.4.4. 任务查看

SparkSQL与Hive的整合

1.1. Spark On Hive

SparkSQL其实就是一个Spark框架下的执行引擎,可以对结构化的数据使用SQL的方式,将SQL翻译成为SparkCore的代码去完成计算。SparkSQL支持不同的数据源,可以读取各种数据文件的数据、可以通过JDBC读取MySQL的数据,在实际开发过程中,有时候我们需要使用SparkSQL去处理Hive中的数据。这就是SparkSQL与Hive的整合方式之一:Spark On Hive

其实Spark只是一个计算引擎,本身是没有元数据管理的功能的。而我们在前面使用到的无论是DSL风格的处理方式,还是SQL风格的处理方式,所谓的“元数据”、“表”,其实都是向DataFrame注册的。DataFrame中记录了“表”、“字段”、“类型”等信息,就可以将SQL语句解析成为Spark程序来运行了。

但是Hive不同,Hive本身就是有一个元数据库(MetaStore)的,因此我们需要使用SparkSQL处理Hive的数据的时候,无需再注册表、注册字段等信息,直接从Hive的元数据库(MetaStore)中获取元数据即可。

1.1.1. Hive的准备工作
  1. 配置Hive的元数据服务:修改hive的配置文件 hive-site.xml

    <!-- 配置Hive的MetaStore服务,使用thrift协议,设置好主机名和端口号 -->
    <property>
    	<name>hive.metastore.uris</name>
    	<value>thrift://qianfeng01:9083</value>
    </property>
    
  2. 启动Hive的元数据服务

    # 开启Hive的metastore服务
    # 这种方式开启的服务是一个前台进程,不方便使用
    hive --service metastore
    
    # 开启Hive的metastore服务,并设置为后台进程
    # 这种方式开启的元数据服务是后台进程,方便交互了,但是不方便查看日志,并且随着session的退出,服务会中断
    hive --service metastore &
    
    # 启动后台进程,将日志输出到指定位置
    nohup hive --service metastore > /var/log/metastore.log 2>&1 &
    
1.1.2. Spark的准备工作
  1. 在spark的conf目录下,创建hive-site.xml文件,存放连接到hive的配置信息

    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <configuration>
        <property>
            <name>hive.metastore.warehouse.dir</name>
            <value>/user/hive/warehouse</value>
        </property>
        <property>
            <name>hive.metastore.uris</name>
            <value>thrift://qianfeng01:9083</value>
        </property>
    </configuration>
    

    Spark程序在运行的时候,相关的配置信息的加载次序:

    • 首先加载conf目录下的配置文件。
    • 再加载代码中进行的配置。

    其实只需要让SparkSQL程序知道metastore服务在哪里就可以了,如果不配置上面的这个文件也可以,不过就需要在代码中配置了。为了避免每一次在写程序的时候,都在代码里面去配置,简单起见,就直接创建这个文件,将连接到Hive元数据服务的配置都放进去。这样每次Spark程序在启动的时候,都可以自动的加载到。

  2. 准备MySQL的驱动包

    因为Hive的元数据保存到了MySQL数据库,Spark SQL程序如果需要访问的话,肯定需要从MySQL数据库中读取元数据信息。此时就必须要这个jar包了。

    将准备好的mysql-connector-java-8.0.26.jar文件存放到spark的jars目录下。

    注意:

    • 如果需要运行本地模式,那么本地的Spark的jars目录下需要存放有这个jar包。
    • 如果需要运行集群模式,那么集群中的Spark的jars目录下需要存放有这个jar包。
1.1.3. Spark代码开发
# @Author   : 千锋大数据教研院
# @Company  : 北京千锋互联科技有限公司

from pyspark.sql import SparkSession

# 这里的 .enableHiveSupport() 表示的就是打开Hive支持,此时就可以访问到Hive的数据了。
# 注意:
# 如果没有在spark的conf目录下面创建hive-site.xml并正确的设置hive的元数据服务
# 那么在创建SparkSession对象的时候,就必须要设置hive的元数据服务信息
# .config("spark.sql.warehouse.dir", "hdfs://qianfeng01:9820/user/hive/warehouse")
# .config("hive.metastore.uris", "thrift://qianfeng01:9083")
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("hive-enable")\
    .enableHiveSupport()\
    .getOrCreate()

# spark.sql("select * from mydb.emp").show()
spark.sql("select * from mydb.emp join mydb.dept on mydb.emp.deptno = mydb.dept.deptno;").show()

spark.stop()
1.1.4. Spark On Hive案例

基本的Spark On Hive的程序就编写完成了。我们也可以结合之前的内容,整合其他的数据源与Hive配合使用

在Hive中有一张表,存储了用户的名字与身份证号。读取这个表中的数据,通过身份证号解析出生日、性别、年龄等信息,并将结果保存到Hive中。

"""
需求: 从 Hive 的mydb.users表中通过身份证号,解析出用户的生日、年龄、性别信息,并将结果存入到一个新的表中
res:
    username
    idcard
    phone
    birthday
    age
    gender
create table if not exists mydb.res(
    username string,
    idcard string,
    phone string,
    birthday string,
    age string,
    gender string
)
row format delimited
fields terminated by ','
"""


import os
import re
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType, StringType


os.environ.setdefault("HADOOP_USER_NAME", "root")


def calculate_age(year, month, day) -> int:
    now = datetime.datetime.now()
    age = now.year - year
    if now.month < month:
        age -= 1
    elif now.month == month and now.day < day:
        age -= 1

    return age


def parse_idcard(idcard: str) -> dict:
    # 1. 验证身份证号码是否合法
    m = re.fullmatch(r'(\d{6})'
                     r'(?P<year>(19|20)\d{2})'
                     r'(?P<month>0[1-9]|1[0-2])'
                     r'(?P<day>[012][0-9]|10|20|30|31)'
                     r'\d{2}'
                     r'(?P<gender>\d)'
                     r'[0-9xX]', idcard)
    if m is None:
        return {}

    # 2. 解析每一部分
    year = m.group('year')
    month = m.group('month')
    day = m.group('day')
    age = calculate_age(int(year), int(month), int(day))
    gender = '男' if int(m.group('gender')) % 2 != 0 else '女'
    birthday = '-'.join([year, month, day])

    return {"birthday": birthday, "age": age, "gender": gender}


with SparkSession.builder.master("local[*]").appName("exercise").enableHiveSupport().getOrCreate() as spark:
    # 注册 UDF 函数
    spark.udf.register("parse_idcard", parse_idcard, MapType(StringType(), StringType()))

    # 查询数据
    res = spark.sql("""
                select
                    username, 
                    idcard,
                    phone,
                    parse_idcard(idcard)['birthday'] as birthday,
                    parse_idcard(idcard)['age'] as age,
                    parse_idcard(idcard)['gender'] as gender
                from   
                    mydb.users
            """)

    # 将查询结果写出到 Hive 指定的表中,这个表需要提前存在
    res.write.insertInto("mydb.res")
1.2. Hive On Spark

其实Hive On Spark的意思就是,将Hive的底层计算引擎替换成Spark!Hive默认的计算引擎是MapReduce,而这个是可以替换的。只需要使用set hive.execution.engine=spark即可完成替换,同时需要指定Spark的Master。

# 使用Hive On Spark非常简单
# 只要用set hive.execution.engine命令设置Hive的执行引擎为spark即可
# 默认是mr
set hive.execution.engine=spark;
# 这里,是完全可以将其设置为Spark Master的URL地址的
set spark.master=spark://192.168.10.101:7077
# 注意上面这种配置是只适用于匹配的版本才可以,如果高版本的话现在是没有这种功能的,需要自行编译
# 参考官方文档:https://cwiki.apache.org//confluence/display/Hive/Hive+on+Spark:+Getting+Started

但是需要注意,HiveOnSpark并不是适合所有场景的,因为Spark是内存计算的计算引擎,需要消耗大量的内存资源,不利于其他程序的计算应用。因此需要使用Spark来处理Hive的数据的时候,SparkOnHive是一个比较常见的选择。

1.3. SparkSQL命令行

在Spark的bin目录下,有一个脚本文件spark-sql,这个脚本文件会启动一个命令交互界面,可以使得我们在命令行上直接使用Spark来操作Hive的数据。

在3.3.1.章节的部分,已经在spark的conf目录下面创建出来一个hive-site.xml文件,其中定义了hive的元数据相关的信息,这样我们就可以直接使用了。

image-20230214181016887

1.4. SparkSQL分布式查询引擎

在Spark中有一个服务是ThriftServer服务,通过这个服务,用户可以通过JDBC连接ThriftServer来访问SparkSQL的数据。连接后可以直接通过编写SQL语句访问SparkSQL的数据。在配置ThriftServer的时候,至少需要配置ThriftServer的主机名和端口号,如果需要使用Hive的数据的话,还需要再提供Hive的Metastore的URIs。

如果你前面已经配置完成了Spark On Hive,那么在你的Spark的conf目录下已经存在了一个文件:hive-site.xml,在这个文件中已经配置好了Hive的Metastore的URIs了。

1.4.1. 开启ThriftServer服务
$SPARK_HOME/bin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000\
--hiveconf hive.server2.thrift.bind.host=qianfeng01\
--master local[*]

这里的--master可以设置为local模式、Standalone模式或者YARN模式。

1.4.2. beeline连接ThriftServer

ThriftServer服务启动之后,默认件监听的是10000端口,我们可以使用一些客户端工具来连接到这个服务。例如beeline。

image-20230216144155303

1.4.3. 代码连接

如果需要需要使用ThriftServer连接到SparkSQL,进而操作Hive的数据的话,我们需要安装Hive的依赖。

pip3 install pyhive
# @Author   : 千锋大数据教研院
# @Company  : 北京千锋互联科技有限公司


from pyhive import hive

# 通过Spark ThriftServer,创建到Hive的连接对象,
conn = hive.Connection(host="qianfeng01", port=10000, username="root", database="mydb")
# 创建一个光标对象,用来操作hive
cursor = conn.cursor()

with conn, cursor:
    # 执行SQL语句
    cursor.execute("select * from emp join dept on emp.deptno = dept.deptno")
    result = cursor.fetchall()

    for r in result:
        print(r)
1.4.4. 任务查看

ThriftServer提交到Spark的任务,我们可以通过http://192.168.10.101:4040/jobs/来查看到。

image-20230216145609484

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2258080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

借助Dynamsoft的批量条码扫描,推动无人机仓储管理新高度

随着企业规模扩大和供应链变得越来越复杂&#xff0c;库存管理已成为仓库运营商面临的一项重大挑战。传统技术主要依赖于人工&#xff0c;而现在正被无人机和条形码识别等先进技术所取代。这些创新可以提高库存跟踪的准确性和效率&#xff0c;同时最大限度地减少人为错误并优化…

web网页连接MQTT,显示数据与下发控制命令

web网页连接MQTT&#xff0c;显示数据与下发控制命令 零、前言 在完成一些设备作品后&#xff0c;常常会因为没有一个上位机用来实时检测数据和下发命令而苦恼&#xff0c;在上一篇文章中提到了怎么白嫖阿里云服务器&#xff0c;并且在上面搭建了属于自己的web网站。那么现在…

数学学院项目开发总结

数学学院项目开发总结 学生成长平台 后端:技术栈gogingorm 负责内容:入团申请审核后端部分 前台 学生入团申请表单的提交根据审核状态判断不同的跳转页面 后台 活动的创建和关闭和信息提交和审核时间的管理 表单的审核流转: 班级审批基本信息审批 - > 学生会纪权部道…

flutter编译e: Daemon compilation failed: null java.lang.Exception错误解决

文章目录 错误描述解决方法修复步骤1. 清理项目缓存2. 检查项目路径一致性3. 强制禁用增量编译4. 更新依赖项5. 检查 Kotlin 和 Gradle 插件版本6. 删除 Kotlin 编译器守护进程7. 重新编译项目 错误描述 flutter应用编译时报如下错误e: Daemon compilation failed: null java.…

如何通过看板进行跨境电商的圣诞商品数据分析与优化选品流程?

引言 随着圣诞季的临近&#xff0c;跨境电商迎来了重要的销售时机。选品工作对于跨境电商的成功至关重要&#xff0c;直接关系到销售业绩和利润。本文结合相关网页信息&#xff0c;深入探讨跨境电商在圣诞期间如何利用信息整合工具展开选品工作&#xff0c;并优化选品流程。同…

Maven学习(依赖版本维护、依赖传递、解决Maven依赖冲突的3种方式)

目录 一、Maven的依赖版本维护。 &#xff08;1&#xff09;为什么需要依赖版本维护&#xff1f; &#xff08;2&#xff09;依赖统一管理的具体操作步骤。 第一步。在pom.xml文件中使用标签定义jar包的版本。 第二步。在的对应jar的中使用"${}"引入上面定义好的版本…

OpenCV 功能函数介绍

一&#xff0c; 二值化函数 功能&#xff1a; 用于对图像进行二值化处理 参数&#xff1a; cv2.threshold(输入你的图像所对应的灰度图&#xff0c; 阈值&#xff1a;是浮点还是整数取决予图像的数据类型 最大值;高于阈值的像素值&#xff0c; 阈值类型&#xff1a;cv2.THR…

JAVA根据Word模板生成word文件

本次要做一个小工具&#xff0c;读取excel数据&#xff0c;然后生成word文件。 直接上代码&#xff1a; 一、引用包 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>3.15</version></dep…

什么是定性数据分析?有哪些定性数据分析技术及应用实践?

众所周知&#xff0c;定性数据分析软件&#xff08;QDA 软件&#xff09;为研究人员提供了显著的优势&#xff0c;特别是在节省文书工作时间方面&#xff0c;“让研究人员可以从事更有意义的分析工作”。 使用它是从事实际有效研究的最关键步骤之一。然而&#xff0c;知道您需要…

【前端】JavaScript中的闭包与垃圾回收机制详解

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 前端 文章目录 &#x1f4af;前言&#x1f4af;垃圾回收机制&#xff08;Garbage Collection, GC&#xff09;垃圾回收的核心原理核心过程 函数作用域与垃圾回收运行分析输出结果 垃圾回收的局限性与挑战 &#x1f4a…

android studio 读写文件操作(应用场景三)

android studio版本&#xff1a;2023.3.1 patch2 例程&#xff1a;filesaveandread 其实我写这个都是我记录我要做后个数独小游戏&#xff0c;每一个都是为了解决一个问题。即是分享也是备忘&#xff0c;反正我什么都不会&#xff0c;就是一顿瞎改&#xff0c;不行就研究。这…

分库分表基本概念讲解

一、基本概念 产生背景 在数据爆炸的年代&#xff0c;单表数据达到千万级别&#xff0c;甚至过亿的量&#xff0c;都是很常见的情景。这时候再对数据库进行操作就是非常吃力的事情了&#xff0c;select个半天都出不来数据&#xff0c;这时候业务已经难以维系。不得已&#xf…

华为自反ACL实验

一、实验背景 做这个实验的原因是最近公司里上了三台小程序服务器&#xff0c;由于三台服务器的端口都映射出去了&#xff0c;领导要求A网段的三台服务器不能访问内网B&#xff0c;C网段&#xff0c;同时B、C网段内网用户可以访问A段的94、95、96服务器&#xff1b; 也就是PC4\…

美图撕掉蔡文胜标签

卖掉比特币的美图不投机了。 作者|周立青 编辑|杨舟 12月5日&#xff0c;比特币突破10万美元大关&#xff0c;曾花费1亿美元购入虚拟货币的美图宣布已出售所有加密货币。 美图在港交所发布公告称&#xff0c;自2024年11月起&#xff0c;公司已开始出售其持有的加密货币&…

git拉取代码报错问题:Pulling is not possible because you have unmerged files. hint

我们在工作中&#xff0c;需要切换到另外一个分支&#xff0c;拉取代码的时候会报这样的问题&#xff1a; Pulling is not possible because you have unmerged files. hint: Fix them up in the work tree, and then use git add/rm <file> hint: as appropriate to ma…

青训营-豆包MarsCode技术训练营试题解析三十七

引言 随着AI领域的发展&#xff0c;底层算法确实起到了决定性的作用。为了跟上这个快速发展的领域&#xff0c;我们需要不断学习和提升自己的技能。刷题是一种很好的方式&#xff0c;可以帮助我们巩固基础知识&#xff0c;提高解决问题的能力。 介绍 ‌豆包青训营‌是由字节…

openlayers地图缓存添加

//通过安装包localforage&#xff08;npm install localforage&#xff09;或https://cdnjs.cloudflare.com/ajax/libs/localforage/1.10.0/localforage.min.js tileCacheStore.js import localforage from localforage var tileCacheStorenull;// 从缓存中获取该瓦片 functio…

《IP 电话:选型指南与应用优势》

《IP 电话&#xff1a;选型指南与应用优势》 一、IP 电话概述二、IP 电话的选型1. 功能与应用2. 性能及可靠性3. 兼容及可升级性4. 外观设计及管理需求5. 性价比 三、IP 电话的应用场景1. 企业办公2. 工厂和仓库3. 酒店和旅游业4. 医疗机构5. 零售业6. 教育机构7. 个人用户 四、…

解决Logitech G hub 无法进入一直转圈的方案(2024.12)

如果你不是最新版本无法加载尝试以下方案&#xff1a;删除AppData 文件夹下的logihub文件夹 具体路径&#xff1a;用户名根据实际你的请情况修改 C:\Users\Administrator\AppData\Local 如果你有通过lua编译脚本&#xff0c;记得备份&#xff01;&#xff01; ↓如果你是最新…

[网络爬虫] Jsoup : HTML 解析工具

1 概述 简介 Jsoup是一款基于Java的HTML解析器&#xff0c;它提供了一种简单、灵活且易于使用的API&#xff0c;用于从URL、文件或字符串中解析HTML文档。它可以帮助开发人员从HTML文档中提取数据、操作DOM元素、处理表单提交等。 主要特点 Jsoup的主要特点包括&#xff1a; 简…