Spark SQL实战(08)-整合Hive

news2024/10/7 12:27:01

1 整合原理及使用

Apache Spark 是一个快速、可扩展的分布式计算引擎,而 Hive 则是一个数据仓库工具,它提供了数据存储和查询功能。在 Spark 中使用 Hive 可以提高数据处理和查询的效率。

场景

历史原因积累下来的,很多数据原先是采用Hive来进行处理的,现想改用Spark操作数据,须要求Spark能够无缝对接已有的Hive的数据,实现平滑过渡。

MetaStore
Hive底层的元数据信息是存储在MySQL中,$HIVE_HOME/conf/hive-site.xml

Spark若能直接访问MySQL中已有的元数据信息 $SPARK_HOME/conf/hive-site.xml

前置条件

在使用 Spark 整合 Hive 之前,需要安装配置以下软件:

  • Hadoop:用于数据存储和分布式计算。
  • Hive:用于数据存储和查询。
  • Spark:用于分布式计算。

整合 Hive

在 Spark 中使用 Hive,需要将 Hive 的依赖库添加到 Spark 的类路径中。在 Java 代码中,可以使用 SparkConf 对象来设置 Spark 应用程序的配置。下面是一个示例代码:

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

public class SparkHiveIntegration {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SparkHiveIntegration")
                .setMaster("local[*]")
                .set("spark.sql.warehouse.dir", "/user/hive/warehouse");
        SparkSession spark = SparkSession.builder()
                .config(conf)
                .enableHiveSupport()
                .getOrCreate();
        spark.sql("SELECT * FROM mytable").show();
        spark.stop();
    }
}

在上面的代码中,首先创建了一个 SparkConf 对象,设置了应用程序的名称、运行模式以及 Hive 的元数据存储路径。然后,创建了一个 SparkSession 对象,启用了 Hive 支持。最后,使用 Spark SQL 查询语句查询了一个名为 mytable 的 Hive 表,并将结果打印出来。最后,停止了 SparkSession 对象。

需要注意的是,Spark SQL 语法与 Hive SQL 语法略有不同,可以参考 Spark SQL 官方文档。

2 ThiriftServer使用

javaedge@JavaEdgedeMac-mini sbin % pwd
/Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/sbin

javaedge@JavaEdgedeMac-mini sbin % ./start-thriftserver.sh --master local --jars /Users/javaedge/.m2/repository/mysql/mysql-connector-java/8.0.15/mysql-connector-java-8.0.15.jar

starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /Users/javaedge/Downloads/soft/spark-2.4.3-bin-2.6.0-cdh5.15.1/logs/spark-javaedge-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-JavaEdgedeMac-mini.local.out

beeline

内置了一个客户端工具:

javaedge@JavaEdgedeMac-mini bin % ./beeline -u jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to: Spark SQL (version 2.4.3)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000>

当你执行一条命令后:

就能在 Web UI 看到该命令记录:

3 通过代码访问数据

总是手敲命令行肯定太慢了,我们更多是代码访问:

package com.javaedge.bigdata.chapter06

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object JDBCClientApp {

  def main(args: Array[String]): Unit = {
    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn: Connection = DriverManager.getConnection("jdbc:hive2://localhost:10000")
    val pstmt: PreparedStatement = conn.prepareStatement("show tables")

    val rs: ResultSet = pstmt.executeQuery()

    while (rs.next()) {
      println(rs.getObject(1) + " : " + rs.getObject(2))
    }
  }
}

最后打成 jar 包,扔到服务器定时运行即可执行作业啦。

ThiriftServer V.S Spark Application 例行作业

Thrift Server 独立的服务器应用程序,它允许多个客户端通过网络协议访问其上运行的 Thrift 服务。Thrift 服务通常是由一组 Thrift 定义文件定义的,这些文件描述了可以从客户端发送到服务器的请求和响应消息的数据结构和协议。Thrift Server 可以使用各种编程语言进行开发,包括 Java、C++、Python 等,并支持多种传输和序列化格式,例如 TSocket、TFramedTransport、TBinaryProtocol 等。使用 Thrift Server,您可以轻松地创建高性能、可伸缩和跨平台的分布式应用程序。

Spark Application,基于 Apache Spark 的应用程序,它使用 Spark 编写的 API 和库来处理大规模数据集。Spark Application 可以部署在本地计算机或云环境中,并且支持各种数据源和格式,如 Hadoop 分布式文件系统(HDFS)、Apache Cassandra、Apache Kafka 等。Spark Application 可以并行处理数据集,以加快数据处理速度,并提供了广泛的机器学习算法和图形处理功能。使用 Spark Application,您可以轻松地处理海量数据,提取有价值的信息和洞察,并帮助您做出更明智的业务决策。

因此,Thrift Server 和 Spark Application 适用不同的场景和应用程序:

  • 需要创建一个分布式服务并为多个客户端提供接口,使用 Thrift Server
  • 需要处理大规模数据集并使用分布式计算和机器学习算法来分析数据,使用 Spark Application

4 Spark 代码访问 Hive 数据

5 Spark SQL 函数实战

parallelize

SparkContext 一个方法,将一个本地数据集转为RDD。parallelize` 方法接受一个集合作为输入参数,并根据指定的并行度创建一个新的 RDD。

语法:

// data表示要转换为 RDD 的本地集合
// numSlices表示 RDD 的分区数,通常等于集群中可用的 CPU 核心数量。 
val rdd = 
sc.parallelize(data, numSlices)

将一个包含整数值的本地数组转换为RDD:

import org.apache.spark.{SparkConf, SparkContext}

// 创建 SparkConf 对象
val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local[*]")

// 创建 SparkContext 对象
val sc = new SparkContext(conf)

// 定义本地序列
val data = Seq(1, 2, 3, 4, 5)

// 使用 parallelize 方法创建 RDD
val rdd = sc.parallelize(data)

// 执行转换操作
val result = rdd.map(_ * 2)

// 显示输出结果
result.foreach(println)

创建了一个包含整数值的本地序列 data,然后使用 parallelize 方法将其转换为一个 RDD。接下来,我们对 RDD 进行转换操作,并打印输出结果。

使用 parallelize 方法时,请确保正确配置 Spark 应用程序,并设置正确 CPU 核心数量和内存大小。否则,可能会导致应用程序性能下降或崩溃。

5.1 内置函数

都在这:

统计 PV、UV 实例

package com.javaedge.bigdata.chapter06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 内置函数
 */
object BuiltFunctionApp {

  def main(args: Array[String]): Unit = {


    val spark: SparkSession = SparkSession.builder()
      .master("local").appName("HiveSourceApp")
      .getOrCreate()

    // day  userid
    val userAccessLog = Array(
      "2016-10-01,1122",
      "2016-10-01,1122",
      "2016-10-01,1123",
      "2016-10-01,1124",
      "2016-10-01,1124",
      "2016-10-02,1122",
      "2016-10-02,1121",
      "2016-10-02,1123",
      "2016-10-02,1123"
    )

    import spark.implicits._

    // Array ==> RDD
    val userAccessRDD: RDD[String] = spark.sparkContext.parallelize(userAccessLog)

    val userAccessDF: DataFrame = userAccessRDD.map(x => {
      val splits: Array[String] = x.split(",")
      Log(splits(0), splits(1).toInt)
    }).toDF

    userAccessDF.show()

    import org.apache.spark.sql.functions._

    // select day, count(user_id) from xxx group by day;
    userAccessDF.groupBy("day").agg(count("userId").as("pv")).show()

    userAccessDF.groupBy("day").agg(countDistinct("userId").as("uv")).show()
    spark.stop()
  }

  private case class Log(day: String, userId: Int)
}

5.2 自定义函数

package com.javaedge.bigdata.chapter06

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


/**
 * 统计每个人爱好的个数
 * pk:3
 * jepson: 2
 *
 *
 * 1)定义函数
 * 2)注册函数
 * 3)使用函数
 */
object UDFFunctionApp {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local").appName("HiveSourceApp")
      .getOrCreate()


    import spark.implicits._

    val infoRDD: RDD[String] = spark.sparkContext.textFile(
      "/Users/javaedge/Downloads/sparksql-train/data/hobbies.txt")
    val infoDF: DataFrame = infoRDD.map(_.split("###")).map(x => {
      Hobbies(x(0), x(1))
    }).toDF

    infoDF.show(false)

    // TODO... 定义函数 和 注册函数
    spark.udf.register("hobby_num", (s: String) => s.split(",").size)

    infoDF.createOrReplaceTempView("hobbies")

    //TODO... 函数的使用
    spark.sql("select name, hobbies, hobby_num(hobbies) as hobby_count from hobbies").show(false)

    // select name, hobby_num(hobbies) from xxx

    spark.stop()
  }

  private case class Hobbies(name: String, hobbies: String)
}

output:
+------+----------------------+
|name  |hobbies               |
+------+----------------------+
|pk    |jogging,Coding,cooking|
|jepson|travel,dance          |
+------+----------------------+

+------+----------------------+-----------+
|name  |hobbies               |hobby_count|
+------+----------------------+-----------+
|pk    |jogging,Coding,cooking|3          |
|jepson|travel,dance          |2          |
+------+----------------------+-----------+

6 总结

通过上述示例代码,可以看到如何在 Java 中使用 Spark 整合 Hive。通过使用 Hive 的数据存储和查询功能,可以在 Spark 中高效地处理和分析数据。当然,还有许多其他功能和配置可以使用,例如设置 Spark 应用程序的资源分配、数据分区、数据格式转换等等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/441896.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Node内置模块 【url模块与queryString】

文章目录 🌟前言🌟url 模块🌟 URL各部分说明🌟 将URL字符串转换为对象🌟 将对象格式化为URL字符串:url.format(urlObj)🌟 URL路径处理:url.resolve(from, to) 🌟 querySt…

MySQL-四大类日志

目录 🍁MySQL日志分为4大类 🍁错误日志 🍃修改系统配置 🍁二进制日志 🍃查看二进制日志 🍃删除二进制日志 🍃暂时停止二进制日志的功能 🍁事务日志(或称redo日志) 🍁慢查…

SSM整合、环境配置以及详细综合测试(单表查询、多表查询和数据分页、前后端分离、Vue3)

SSM整合、环境配置以及基础综合测试 准备:创建maven项目以及项目框架准备 SSM整合简介 介绍: SSM(SpringSpringMVCMyBatis) 整合,就是三个框架协同开发。Spring整合Mybatis就是将Mybatis核心配置文件当中数据源的配置、事务处理、以及工厂的配置&…

OpenGL入门教程之 深入三角形

一、引言 本教程使用GLEW和GLFW库。  通过本教程,你能轻松的、深入的理解OpenGL如何绘制一个三角形。  如果你不了解OpenGL是什么,可以阅读OpenGL深入理解。 二、基本函数和语句介绍 通过阅读以下的函数,你的大脑里能留下关于OpenGL基本函…

通过CSIG—走进合合信息探讨生成式AI及文档图像处理的前景和价值

一、前言 最近有幸参加了由中国图象图形学学会(CSIG)主办,合合信息、CSIG文档图像分析与识别专业委员会联合承办的“CSIG企业行——走进合合信息”的分享会,这次活动以“图文智能处理与多场景应用技术展望”为主题,聚…

安全防御第四天:防病毒网关

一、恶意软件 1.按照传播方式分类 (1)病毒 病毒是一种基于硬件和操作系统的程序,具有感染和破坏能力,这与病毒程序的结构有关。病毒攻击的宿主程序是病毒的栖身地,它是病毒传播的目的地,又是下一次感染的出…

尚融宝21-整合springcloud

目录 一、整合注册中心nacos 二、整合openFeign (一)准备工作 (二)导入依赖 (三)接口的远程调用 (四)配置超时控制和日志打印 三、整合Sentinel 四、整合gateway服务网关 …

【Spring从成神到升仙系列 五】从根上剖析 Spring 循环依赖

👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙…

基于SpringBoot+Vue家乡特色推荐系统

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦。 💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精…

【李老师云计算】HBase+Zookeeper部署及Maven访问(HBase集群实验)

索引 前言1. Zookeeper1.1 主机下载Zookeeper安装包1.2 主机解压Zookeeper1.3 ★解决解压后文件缺失1.4 主机配置Zookeeper文件1.4.1 配置zoo_sample.cfg文件1.4.2 配置/data/myid文件 1.5 主机传输Zookeeper文件到从机1.6 从机修改Zookeeper文件1.6.1 修改zoo.cfg文件1.6.2 修…

一文带你了解MySQL的前世今生,架构,组成部分,特点,适用场景

文章目录 一、MySQL的由来二、MySQL的架构2.1 客户端2.2 服务器 三、 MySQL的主要组成部分3.1 连接管理器3.2 查询缓存3.3 解析器3.4 查询优化器3.5 执行器3.6 存储引擎 四、MySQL的特点五、MySQL的应用场景六、总结 一、MySQL的由来 MySQL最初是由瑞典公司MySQL AB的Michael …

4年功能测试,我一进阶python接口自动化测试,跳槽拿了20k......

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 很多人在这求职市…

让ChatGPT告诉你Java的发展前景

Java版电商购物系统项目实战 最近很多人问我Java的发展前景怎么样?该怎么学Java基础?java这么卷还该不该学等等。那今天老王以电商场景为例,再结合ChatGPT的回答和大家聊的一下Java有哪些应用前景和技术层面的落地方案。(在收获干…

【Spring】-- 02 -- Spring中Bean的配置、作用域

一、Bean的配置 Spring用于生产和管理Spring容器中的Bean,需要开发者对Spring的配置文件进行配置。在实际开发中,最常采用XML格式的配置方式,即通过XML文件来注册并管理Bean之间的依赖关系。 在Spring中,XML配置文件的根元素是…

iOS问题记录 - Xcode 14.3版本运行项目报错

文章目录 前言开发环境问题描述问题分析解决方案最后 前言 看到Xcode有新版本,没忍住点了升级,然后问题来了。 开发环境 macOS 13.3Xcode: 14.3 问题描述 Xcode 14.2版本运行项目一切正常,升级到14.3版本后运行报错。 运行到模拟器的报…

【PWN刷题__ret2text】——CTFHub之 简单的 ret2text

萌新第一阶段自然是了解做题的套路、流程,简单题要多做滴 目录 前言 一、checksec查看 二、IDA反汇编 三、exp编写 前言 经典的ret2text流程 一、checksec查看 64位程序,什么保护都没有,No canary found——可以栈溢出控制返回 二、IDA反汇…

“MySQL5.6”、“索引优化”,其实都是索引下推

如果你在面试中,听到“MySQL5.6”、“索引优化” 之类的词语,你就要立马get到,这个问的是“索引下推”。 什么是索引下推 索引下推(Index Condition Pushdown,简称ICP),是MySQL5.6版本的新特性,它能减少回…

学习实践-Alpaca-Lora (羊驼-Lora)(部署+运行+微调-训练自己的数据集)

Alpaca-Lora模型GitHub代码地址 1、Alpaca-Lora内容简单介绍 三月中旬,斯坦福发布的 Alpaca (指令跟随语言模型)火了。其被认为是 ChatGPT 轻量级的开源版本,其训练数据集来源于text-davinci-003,并由 Meta 的 LLaMA …

OpenAI对实现强人工智能AGI的规划:《Planing for AGI and beyond》

OpenAI对实现AGI的长期和短期的计划:《Planing for AGI and beyond》 返回论文和资料目录 原文地址 1.导读 OpenAI最近这些年发布了很多令人印象深刻的模型,毫无疑问,OpenAI已经走在了人工智能领域的最前沿。但是很多人只注意到这些模型&…

Nacos Docker Kubernetes ⽣态

博主介绍:✌全网粉丝4W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战、定制、远程,博主也曾写过优秀论文,查重率极低,在这方面…