【Spark分布式内存计算框架——Spark Core】6. RDD 持久化

news2024/11/17 14:32:31

3.6 RDD 持久化

在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
在这里插入图片描述

缓存函数
可以将RDD数据直接缓存到内存中,函数声明如下:
在这里插入图片描述
但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:
在这里插入图片描述
缓存级别
在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
在这里插入图片描述
实际项目中缓存数据时,往往选择如下两种级别:
在这里插入图片描述
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
在这里插入图片描述
释放缓存
当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
在这里插入图片描述
此函数属于eager,立即执行。

何时缓存数据
在实际项目开发中,什么时候缓存RDD数据,最好呢???

第一点:当某个RDD被使用多次的时候,建议缓存此RDD数据

  • 比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据
    在这里插入图片描述
    第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据
  • 比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD:etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据
  • 案例:etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)

演示范例代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
*/
object SparkCacheTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
// 读取文本文件数据
val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
// 缓存数据: 将数据缓存至内存
inputRDD.cache()
inputRDD.persist()
// 使用Action函数触发缓存
println(s"Count = ${inputRDD.count()}")
// 释放缓存
inputRDD.unpersist()
// 缓存数据:选择缓存级别
/*
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
*/
inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
println(s"count: ${inputRDD.count()}")
// 应用程序运行结束,关闭资源
sc.stop()
}
}

第四章 SogouQ日志分析

使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php

  • 1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。

  • 2)、数据格式

    • 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
      在这里插入图片描述
    • 用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
  • 3)、数据下载:分为三个数据集,大小不一样

    • 迷你版(样例数据, 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip
    • 精简版(1天数据,63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip
    • 完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip

4.1 业务需求

针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析:
在这里插入图片描述
使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。

4.2 准备工作

在编程实现业务功能之前,首先考虑如何对【查询词】进行中文分词及将日志数据解析封装。

HanLP 中文分词
使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。
官方网站:http://www.hanlp.com/,添加Maven依赖

<!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>

演示范例:HanLP 入门案例,基本使用

import java.util
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import scala.collection.JavaConverters._
/**
* HanLP 入门案例,基本使用
*/
object HanLPTest {
def main(args: Array[String]): Unit = {
// 入门Demo
val terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频")
println(terms)
println(terms.asScala.map(_.word.trim))
// 标准分词
val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳")
println(terms1)
println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))
val words: Array[String] =
"""00:00:00 2982199073774412 [360安全卫
士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html"""
.split("\\s+")
words.foreach(println)
println(words(2).replaceAll("\\[|\\]", ""))
}
}

样例类 SogouRecord
将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:

/**
* 用户搜索点击网页记录Record
* @param queryTime 访问时间,格式为:HH:mm:ss
* @param userId 用户ID
* @param queryWords 查询词
* @param resultRank 该URL在返回结果中的排名
* @param clickRank 用户点击的顺序号
* @param clickUrl 用户点击的URL
*/
case class SogouRecord(
queryTime: String, //
userId: String, //
queryWords: String, //
resultRank: Int, //
clickRank: Int, //
clickUrl: String //
)

4.3 业务实现

先读取数据,封装到SougoRecord类中,再按照业务处理数据。

读取数据
构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。

// TODO: 1. 本地读取SogouQ用户查询日志数据
//val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample")
val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced")
//println(s"Count = ${rawLogsRDD.count()}")
// TODO: 2. 解析数据,封装到CaseClass样例类中
val recordsRDD: RDD[SogouRecord] = rawLogsRDD
// 过滤不合法数据,如null,分割后长度不等于6
.filter(log => null != log && log.trim.split("\\s+").length == 6)
// 对每个分区中数据进行解析,封装到SogouRecord
.mapPartitions{iter =>
iter.map{log =>
val arr: Array[String] = log.trim.split("\\s+")
SogouRecord(
arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), //
arr(3).toInt, arr(4).toInt, arr(5) //
)
}
}
println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")

搜索关键词统计
获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount
程序,具体代码如下:

// =================== 3.1 搜索关键词统计 ===================
// a. 获取搜索词,进行中文分词
val wordsRDD: RDD[String] = recordsRDD.mapPartitions{iter =>
iter.flatMap{record =>
// 使用HanLP中文分词库进行分词
val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
// 将Java中集合对转换为Scala中集合对象
import scala.collection.JavaConverters._
terms.asScala.map(term => term.word)
}
}
//println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")
// b. 统计搜索词出现次数,获取次数最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD
.map(word => (word, 1)) // 每个单词出现一次
.reduceByKey((tmp, item) => tmp + item) // 分组统计次数
.map(tuple => tuple.swap)
.sortByKey(ascending = false) // 词频降序排序
.take(10) // 获取前10个搜索词
top10SearchWords.foreach(println)

运行结果如下,仅仅显示搜索最多关键词,其中需要过滤谓词:
在这里插入图片描述
用户搜索点击统计
统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用
户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。

// =================== 3.2 用户搜索点击次数统计 ===================
/*
每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
*/
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
.map{record =>
// 获取用户ID和搜索词
val key = record.userId -> record.queryWords
(key, 1)
}
// 按照用户ID和搜索词组合的Key分组聚合
.reduceByKey((tmp, item) => tmp + item)
clickCountRDD
.sortBy(tuple => tuple._2, ascending = false)
.take(10).foreach(println)
println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")

程序运行结果如下:
在这里插入图片描述
搜索时间段统计
按照【访问时间】字段获取【小时】,分组统计各个小时段用户查询搜索的数量,进一步观察
用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:

// =================== 3.3 搜索时间段统计 ===================
/*
从搜索时间字段获取小时,统计个小时搜索次数
*/
val hourSearchRDD: RDD[(String, Int)] = recordsRDD
// 提取小时
.map{record =>
// 03:12:50
record.queryTime.substring(0, 2)
}
// 分组聚合
.map(word => (word, 1)) // 每个单词出现一次
.reduceByKey((tmp, item) => tmp + item) // 分组统计次数
.sortBy(tuple => tuple._2, ascending = false)
hourSearchRDD.foreach(println)

程序运行结果如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339018.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kubernetes集群-部署Java项目

Kubernetes集群-部署Java项目&#xff08;SSG&#xff09; k8s部署项目java流程图 第一步 打包制作镜像 打包 java源码&#xff1a; application.properties #在有pom.xml的路径下执行 mvn clean package制作镜像&#xff1a; 将刚才打包后的文件夹传到&#xff0c;装有dock…

(考研湖科大教书匠计算机网络)第三章数据链路层-第十一节:虚拟局域网VLAN概述和实现机制

获取pdf&#xff1a;密码7281专栏目录首页&#xff1a;【专栏必读】考研湖科大教书匠计算机网络笔记导航 文章目录一&#xff1a;VLAN概述&#xff08;1&#xff09;分割广播域&#xff08;2&#xff09;虚拟局域网VLAN二&#xff1a;VLAN实现机制&#xff08;1&#xff09;IEE…

LeetCode题目笔记——15. 三数之和

文章目录题目描述题目链接题目难度——中等方法一&#xff1a;暴力&#xff08;参考&#xff09;代码/Python 参考方法二&#xff1a;哈希代码/Python参考方法三&#xff1a;排序双指针代码/Python代码/C总结题目描述 龙门阵&#xff1a;这个n个数之和是不是有什么深意啊&#…

Python中的类和对象(6)

文章目录1.多态2.类继承的多态3.自定义函数的多态4.鸭子类型5.思维导图1.多态 多态在编程中是一个非常重要的概念&#xff0c;它是指同一个运算符、函数或对象在不同的场景下&#xff0c;具有不同的作用效果&#xff0c;这么一个技能。 我们知道加号&#xff08;&#xff09;…

加载sklearn新闻数据集出错 fetch_20newsgroups() HTTPError: HTTP Error 403: Forbidden解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。喜欢通过博客创作的方式对所学的知识进行总结与归纳,不仅形成深入且独到的理…

大数据框架之Hadoop:入门(五)Hadoop编译源码(面试重点)

5.1 前期准备工作 1.CentOS联网 配置CentOS能连接外网。Linux虚拟机ping www.baidu.com 是畅通的 注意&#xff1a;采用root角色编译&#xff0c;减少文件夹权限出现问题 2.jar包准备(hadoop源码、JDK8、maven、ant 、protobuf) &#xff08;1&#xff09;hadoop-2.7.7-sr…

【TCP的拥塞控制】基于窗口的拥塞控制

TCP的拥塞窗口CWND大小和传输轮次n的关系如下所示。&#xff08;本题10分&#xff09; cwnd12481632333435363738394041422122232425261248N1234567891011121314151617181920212223242526 问题&#xff1a; &#xff08;1&#xff09;慢开始阶段的时间间隔&#xff1f;&#…

NFC enable NFC使能流程

同学,别退出呀,我可是全网最牛逼的 WIFI/BT/GPS/NFC分析博主,我写了上百篇文章,请点击下面了解本专栏,进入本博主主页看看再走呗,一定不会让你后悔的,记得一定要去看主页置顶文章哦。 NFC enable NFC使能流程 认识nfc系统如何工作,最好的方法就是了解nfc的各个流程,…

linux系统下SVN服务器搭建

linux新手&#xff0c;整了好几天才搞好&#xff0c;做下笔记以备后续使用&#xff1a; 1、下载svn服务器 yum -y install subversion 2、创建仓库 svnadmin create /opt/svn/pro/respos1 svnadmin create /opt/svn/pro/respos2 3、配置用户以及权限 1:cd到仓库目录下&#…

py3常用返回规则字符串的函数+ascii与char的转换

文章目录py3常用返回规则字符串的函数字符转ascii以及ascii转字符的方法为&#xff1a;py3常用返回规则字符串的函数 注明原来的网址为&#xff1a;https://docs.python.org/3.8/library/string.html string.ascii_letters 返回所有的大写、小写字母 string.ascii_lowercase 返…

寒假安全作业nginx-host绕过实例复现

1.测试环境搭建 LNMP架构的话&#xff0c;肯定就是linux、nginx、mysql、php四大组件。在后面的复现中我们还会用到https的一部分知识&#xff0c;故这里的nginx就需要使用虚拟主机并且配置https证书&#xff0c;且具有php解析功能。 1.1 基础nginx配置 #1.创建web目录 mkdir …

MySQL-InnoDB行格式浅析

简介 我们知道读写磁盘的速度非常慢&#xff0c;和内存读写差了几个数量级&#xff0c;所以当我们想从表中获取某些记录时&#xff0c; InnoDB 存储引擎需要一条一条的把记录从磁盘上读出来么&#xff1f; 不&#xff0c;那样会慢死&#xff0c;InnoDB 采取的方式是&#xff1a…

雅思经验(十三)

感觉这篇其实有点小难&#xff0c;我在精听的才发现那是六个实验对象&#xff0c;但是叫做six subjects在精听的时候感觉有些手忙脚乱&#xff0c;像是一团乱麻一样&#xff0c;但是也是没有什么关系。其实这才是查漏补缺&#xff0c;cello player 这是大提琴师violinists 小提…

08- 数据升维 (PolynomialFeatures) (机器学习)

在做数据升维的时候&#xff0c;最常见的手段就是将已知维度进行相乘&#xff08;或者自乘&#xff09;来构建新的维度 使用 np.concatenate()进行简单的&#xff0c;幂次合并&#xff0c;注意数据合并的方向axis 1 数据可视化时&#xff0c;注意切片&#xff0c;因为数据升维…

SpringDI自动装配BeanSpring注解配置和Java配置类

依赖注入 上篇博客已经提到了DI注入方式的构造器注入&#xff0c;下面采用set方式进行注入 基于set方法注入 public class User {private String name;private Address address;private String[] books;private List<String> hobbys;private Map<String,String>…

【Linux】多线程编程 - 概念/pthread库调用接口/互斥

目录 一.线程概念 1.Linux中线程如何实现 2.POSIX线程库: pthread第三方线程库 3.线程与进程的数据存放问题 4.线程的"高效"具体体现在哪 5.线程优缺点 二.线程控制 0.关于pthread_t类型 1.线程创建 2.线程终止 3.线程等待 4.线程分离 5.代码验证 三.线…

你期待已久的《动手学深度学习》(PyTorch版)来啦!

《动手学深度学习》全新PyTorch版本&#xff0c;李沐和亚马逊科学家阿斯顿张等大咖作者强强联合之作&#xff0c;机器学习、深度学习领域重磅教程&#xff0c;交互式实战环境&#xff0c;配套资源丰富&#xff01; 面向中文读者的能运行、可讨论的深度学习入门书。 动手学深度…

一个优质软件测试工程师简历的范文(一定要收藏)

很多刚转行软件测试的小伙伴是不是不知道怎么写好一份优质的软件测试工程师的简历。今天呢&#xff0c;就给大家分享一下一个优质软件测试工程师简历的范文。记得收藏起来哦。 下面的案例&#xff1a;2-3年的软件测试工程的简历 姓 名&#xff1a;XXX 学历&#xff1a…

2023备战金三银四,Python自动化软件测试面试宝典合集(五)

接上篇八、抓包与网络协议8.1 抓包工具怎么用 我原来的公司对于抓包这块&#xff0c;在 App 的测试用得比较多。我们会使用 fiddler 抓取数据检查结果&#xff0c;定位问题&#xff0c;测试安全&#xff0c;制造弱网环境;如&#xff1a;抓取数据通过查看请求数据&#xff0c;请…

FPGA实现不同分辨率视频切换输出,串口协议帧控制,提供工程源码和技术支持

目录1、不同分辨率视频切换输出原理2、设计思想和架构详解3、vivado工程详解4、上板调试验证并演示5、福利&#xff1a;工程代码的获取1、不同分辨率视频切换输出原理 不同分辨率的视频输出对应不同的时序和时钟&#xff0c;一般情况下是不存在同时或分时输出的&#xff0c;但…