【Spark分布式内存计算框架——Spark Core】7. RDD Checkpoint、外部数据源

news2025/1/11 18:07:07

第五章 RDD Checkpoint

RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;

在这里插入图片描述
演示范例代码如下:

import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD数据Checkpoint设置,案例演示
*/
object SparkCkptTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// TODO: 设置检查点目录,将RDD数据保存到那个目录
sc.setCheckpointDir("datas/spark/ckpt/")
// 读取文件数据
val datasRDD = sc.textFile("datas/wordcount/wordcount.data")
// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
datasRDD.checkpoint()
datasRDD.count()
// TODO: 再次执行count函数, 此时从checkpoint读取数据
datasRDD.count()
// 应用程序运行结束,关闭资源
Thread.sleep(100000)
sc.stop()
}
}

持久化和Checkpoint的区别:
1)、存储位置

  • Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存);
  • Checkpoint 可以保存数据到 HDFS 这类可靠的存储上;
    2)、生命周期
  • Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法;
  • Checkpoint的RDD在程序结束后依然存在,不会被删除;
    3)、Lineage(血统、依赖链、依赖关系)
  • Persist和Cache,不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出现了一些错误(例如 Executor 宕机),需要通过回溯依赖链重新计算出来;
  • Checkpoint会斩断依赖链,因为Checkpoint会把结果保存在HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链;
    在这里插入图片描述

第六章 外部数据源

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如下两个场景:
1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

  • 日志数据:电商网站的商家操作日志
  • 订单数据:保险行业订单数据
    2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL表中
  • 网站基本分析(pv、uv。。。。。)

6.1 HBase 数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。
在这里插入图片描述

HBase Sink
回 顾 MapReduce 向 HBase 表 中 写 入 数 据 , 使 用 TableReducer , 其 中 OutputFormat 为TableOutputFormat,读取数据Key:ImmutableBytesWritable,Value:Put。

写 入 数 据 时 , 需 要 将 RDD 转换为 RDD[(ImmutableBytesWritable, Put)] 类 型 , 调 用saveAsNewAPIHadoopFile方法数据保存至HBase表中。

HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。
在这里插入图片描述
范例演示:将词频统计结果保存HBase表,表的设计
在这里插入图片描述
代码如下:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 将RDD数据保存至HBase表中
*/
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// TODO: 1、构建RDD
val list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))
val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)
// TODO: 2、将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value)
// TODO: 组装RDD[(ImmutableBytesWritable, Put)]
/**
* HBase表的设计:
* 表的名称:htb_wordcount
* Rowkey: word
* 列簇: info
* 字段名称: count
*/
val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions{ iter =>
iter.map { case (word, count) =>
// 创建Put实例对象
val put = new Put(Bytes.toBytes(word))
// 添加列
put.addColumn(
// 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组
Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString)
)
// 返回二元组
(new ImmutableBytesWritable(put.getRow), put)
}
}
// 构建HBase Client配置信息
val conf: Configuration = HBaseConfiguration.create()
// 设置连接Zookeeper属性
conf.set("hbase.zookeeper.quorum", "node1.itcast.cn")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
// 设置将数据保存的HBase表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
/*
def saveAsNewAPIHadoopFile(
path: String,// 保存的路径
keyClass: Class[_], // Key类型
valueClass: Class[_], // Value类型
outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现
conf: Configuration = self.context.hadoopConfiguration // 配置信息
): Unit
*/
putsRDD.saveAsNewAPIHadoopFile(
"datas/spark/htb-output-" + System.nanoTime(), //
classOf[ImmutableBytesWritable], //
classOf[Put], //
classOf[TableOutputFormat[ImmutableBytesWritable]], //
conf
)
// 应用程序运行结束,关闭资源
sc.stop()
}
}

运行完成以后,使用hbase shell查看数据:
在这里插入图片描述
HBase Source
回 顾 MapReduce 从 读 HBase 表 中 的 数 据 , 使 用 TableMapper , 其 中 InputFormat 为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result。

从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下:
在这里插入图片描述

此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:
在这里插入图片描述
范例演示:从HBase表读取词频统计结果,代码如下

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 从HBase 表中读取数据,封装到RDD数据集
*/
object SparkReadHBase {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// TODO: 设置使用Kryo 序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// TODO: 注册序列化的数据类型
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// TODO: a. 读取HBase Client 配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node1.itcast.cn")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
// TODO: b. 设置读取的表的名称
conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
/*
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]
): RDD[(K, V)]
*/
val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf, //
classOf[TableInputFormat], //
classOf[ImmutableBytesWritable], //
classOf[Result] //
)
println(s"Count = ${resultRDD.count()}")
resultRDD
.take(5)
.foreach { case (rowKey, result) =>
println(s"RowKey = ${Bytes.toString(rowKey.get())}")
// HBase表中的每条数据封装在result对象中,解析获取每列的值
result.rawCells().foreach { cell =>
val cf = Bytes.toString(CellUtil.cloneFamily(cell))
val column = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
val version = cell.getTimestamp
println(s"\t $cf:$column = $value, version = $version")
}
}
// 应用程序运行结束,关闭资源
sc.stop()
}
}

运行结果:
在这里插入图片描述

6.2 MySQL 数据源

实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。

调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。

范例演示:将词频统计WordCount结果保存MySQL表tb_wordcount。

  • 建表语句
USE db_test ;
CREATE TABLE `tb_wordcount` (
`count` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`word` varchar(100) NOT NULL,
PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ;
  • 演示代码
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 将词频统计结果保存到MySQL表中
*/
object SparkWriteMySQL {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 1. 从HDFS读取文本数据,封装集合RDD
val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data")
// 2. 处理数据,调用RDD中函数
val resultRDD: RDD[(String, Int)] = inputRDD
// 3.a 每行数据分割为单词
.flatMap(line => line.split("\\s+"))
// 3.b 转换为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 3.c 按照Key分组聚合
.reduceByKey((tmp, item) => tmp + item)
// 3. 输出结果RDD保存到MySQL数据库
resultRDD
// 对结果RDD保存到外部存储系统时,考虑降低RDD分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter => saveToMySQL(iter)}
// 应用程序运行结束,关闭资源
sc.stop()
}
/**
* 将每个分区中的数据保存到MySQL表中
* @param datas 迭代器,封装RDD中每个分区的数据
*/
def saveToMySQL(datas: Iterator[(String, Int)]): Unit = {
// a. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// b. 获取连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
ode=true",
"root", "123456"
)
// c. 获取PreparedStatement对象
val insertSql = "INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?)"
pstmt = conn.prepareStatement(insertSql)
conn.setAutoCommit(false)
// d. 将分区中数据插入到表中,批量插入
datas.foreach{case (word, count) =>
pstmt.setString(1, word)
pstmt.setLong(2, count.toLong)
// 加入批次
pstmt.addBatch()
}
// TODO: 批量插入
pstmt.executeBatch()
conn.commit()
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
}
  • 运行程序,查看数据库表的数据
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/338888.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

六、mybatis与spring的整合

Spring整合Mybaits的步骤 引入依赖 在Spring整合Mybaits的时候需要引入一个中间依赖包mybatis-spring <dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.5.5</version> </dependency&g…

通过抓包分析gPRC协议

通过抓包分析gPRC协议 前言 gRPC 是一个高性能、开源和通用的 RPC 框架&#xff0c;面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本&#xff0c;分别是&#xff1a;grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C, Node.js, Python, Ruby, Objective-C, PHP 和 …

windows + vscode + rust

1 安装VSCODE略2 安装rust插件1、说明&#xff1a;第4步本人是一个一个点击状态。上图禁用按钮在没装之前是显示“安装”按钮&#xff0c;应该点击“安装”也可以。2、还需要安装C插件&#xff0c;搜索C即可&#xff0c;装微软的3 创建rust工程由于初次使用&#xff0c;不知道目…

实战项目-课程潜在会员用户预测(朴素贝叶斯&神经网络)

目录1、背景介绍2、朴素贝叶斯2.1 模型介绍2.2 模型实现3、人工神经网络1、背景介绍 目标&#xff1a;将根据用户产生的数据对课程潜在的会员用户&#xff08;可能产生购买会员的行为&#xff09;进行预测。 平台的一位注册用户是否购买会员的行为应该是建立在一定背景条件下…

TCP、UDP网络编程面试题

TCP、UDP、Socket、HTTP网络编程面试题 什么是网络编程 网络编程的本质是多台计算机之间的数据交换。数据传递本身没有多大的难度&#xff0c;不就是把一个设备中的数据发送给其他设备&#xff0c;然后接受另外一个设备反馈的数据。现在的网络编程基本上都是基于请求/响应方式…

linux 进程

文章目录1、进程的状态2、进程的组织3、进程的控制3.1、进程的创建fork 函数fork 拷贝和共享fork 原理fork 的写时复制exec 函数族exec 原理3.2、进程的终止exit 函数* 僵死进程* 孤儿进程3.3、进程的阻塞和唤醒3.4、进程的切换4、进程间通信5、进程调度算法进程是可执行程序的…

特斯拉无人驾驶解读

来源于Tesla AI Day Tesla无人驾驶算法的核心任务就是如何理解我们所看到的一切呢?也就是说,不使用高端的设备,比如激光雷达,仅仅使用摄像头就能够将任务做得很好。Tesla使用环绕型的8个摄像头获得输入。 第一步是特征提取模块Backbone,无论什么任务都离不开特征…

chatgpt怎么安装?国内怎么玩chatgpt?

关于chatgpt的传言最近真的是闹得沸沸扬扬&#xff0c;主要是这个chatgpt人工智能的冲击力实在是太大了&#xff0c;它学习了大量的语言知识&#xff0c;具有很强的语言能力&#xff0c;无论是写作&#xff0c;还是诗歌&#xff0c;甚至是代码都是不在话下&#xff0c;美国大学…

【数据库】 如何对数据库进行操作

目录 一&#xff0c;SQL语句基础 1&#xff0c; SQL简介 &#xff08;1&#xff09; SQL语句分类 &#xff08;2&#xff09;SQL语句的书写规范 二&#xff0c;数据库操作 1、查看 &#xff08;1&#xff09;查看所有数据库 &#xff08;2&#xff09;查看有没有指定的数…

ubuntu20下Qt5.14.2+OpenCV(含Contrib)-4.5.0环境搭建

Qt若要能处理图片和视频&#xff0c;就必须安装OpenCV&#xff0c;而OpenCV中很多的高级功能如人脸识别等都包含在Contrib扩展模块中&#xff0c;需要将Contrib与OpenCV一起联合编译&#xff0c;目前所用这两个版本都是4.5.0版。 一、下载OpenCV OpenCV的官方下载地址为http:…

SSM整合SpringSecurity简单使用

一、SpringSecurity 1.1 什么是SpringSecurity Spring Security 的前身是 Acegi Security &#xff0c;是 Spring 项目组中用来提供安全认证服务的框架。(官网地址) Spring Security 为基于J2EE企业应用软件提供了全面安全服务。特别是使用领先的J2EE解决方案-Spring框架开发…

用网络调试助手测试PLC-Reocrder收听模式的过程

目录 一、测试环境 二、步骤及要点说明 1、PLC-Recorder的通道配置 2、PLC-Recorder启动采集 3、配置网络调试助手 4、启动调试助手的连接&#xff0c;并点击“启动批量发送” 5、停止发送&#xff0c;查看发送和接收的情况 三、小结 一、测试环境 Windows10操作系统&a…

Docker进阶 - 11. Docker Compose 编排服务

注&#xff1a;本文只对一些重要步骤和yml文件进行一些讲解&#xff0c;其他的具体程序没有记录。 目录 1. 原始的微服务工程编排(不使用Compose) 2. 使用Compose编排微服务 2.1 编写 docker-compose.yml 文件 2.2 修改并构建微服务工程镜像 2.3 启动 docker-compose 服务…

数据结构(二):单向链表、双向链表

数据结构&#xff08;二&#xff09;一、什么是链表1.数组的缺点2.链表的优点3.链表的缺点4.链表和数组的区别二、封装单向链表1. append方法&#xff1a;向尾部插入节点2. toString方法&#xff1a;链表元素转字符串3. insert方法&#xff1a;在任意位置插入数据4.get获取某个…

RNN神经网络初探

目录1. 神经网络与未来智能2. 回顾数据维度和神经网络1. 神经网络与未来智能 2. 回顾数据维度和神经网络 循环神经网络&#xff0c;主要用来处理时序的数据&#xff0c;它对每个词的顺序是有要求的。 循环神经网络如何保存记忆功能&#xff1f; 当前样本只有 3 个特征&#x…

git基本概念图示【学习】

基本概念工作区&#xff08;Working Directory&#xff09;就是你在电脑里能看到的目录&#xff0c;比如名字为 gafish.github.com 的文件夹就是一个工作区本地版本库&#xff08;Local Repository&#xff09;工作区有一个隐藏目录 .git&#xff0c;这个不算工作区&#xff0c…

新方案:从错误中学习,点云分割中的自我规范化层次语义表示

前言 LiDAR 语义分割通过直接作用于传感器提供的原始内容来完成细粒度的场景理解而受到关注。最近的解决方案展示了如何使用不同的学习技术来提高模型的性能&#xff0c;而无需更改任何架构或数据集。遵循这一趋势&#xff0c;论文提出了一个从粗到精的设置&#xff0c;该设置从…

查找与排序 练习题

1、下列排序算法中&#xff0c;▁▁B▁▁ 是稳定的。 A.简单选择排序 B.冒泡排序 C.希尔排序 D.快速排序 解析&#xff1a;稳定排序是每次排序得到的结果是唯一的&#xff0c;不稳定排序得到的结果不唯一。 稳定&#xff1a;冒泡排序、归并排序、基数排序 不稳定&#x…

DolphinSchedule基于事件驱动的高性能并发编程

文章目录前言前置知识异步编程基于时间驱动的异步编程模式&#xff08;EAP Event-based Asynchronous Pattern &#xff09;实现EAPDolphinSchedule结合Netty实现Master与Worker之间的高性能处理能力的设计方案设计代码实现总结前言 研究DolphinSchedule的内因在于对调度系统并…

内存访问局部性特征

分享一道360的C语言笔试题。x是一个行列均为1000的二维数组&#xff0c;下面代码运行效率最高的是哪个&#xff1f; 二维数组大家都很熟悉&#xff0c;正常人遍历二维数组都是一行一行来的&#xff0c;为什么很少有人按列去遍历&#xff1f; 这道笔试题其实考察的就是遍历效率…