Spark 3:Spark Core RDD持久化

news2024/11/18 22:28:44

RDD 的数据是过程数据

b8da17ba7f4f4942b11bb5f211111136.png

RDD 的缓存

a8536b8352164f449bcb9d6c550cfe4a.png

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd1 = sc.textFile("../data/input/words.txt")
    rdd2 = rdd1.flatMap(lambda x: x.split(" "))
    rdd3 = rdd2.map(lambda x: (x, 1))

    # 缓存到内存中
    rdd3.cache()
    # 先放内存,如果不够则放硬盘,2份副本
    rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)

    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x: sum(x))
    print(rdd6.collect())

    # 清理缓存
    rdd3.unpersist()
    time.sleep(100000)

06100dee265d4fe587b2abfeb2291e1f.png

e68c6e5996a148c2991e884258f5bc4a.png

103ceba93d6c42daa77820cb985257f8.png

RDD 的CheckPoint

fa83627f868d444db9e8f3b5efc06e8d.png

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 告知spark, 开启CheckPoint功能
    sc.setCheckpointDir("hdfs://node1:8020/output/ckp")
    rdd1 = sc.textFile("../data/input/words.txt")
    rdd2 = rdd1.flatMap(lambda x: x.split(" "))
    rdd3 = rdd2.map(lambda x: (x, 1))

    # 调用checkpoint API 保存数据即可
    rdd3.checkpoint()

    rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
    print(rdd4.collect())

    rdd5 = rdd3.groupByKey()
    rdd6 = rdd5.mapValues(lambda x: sum(x))
    print(rdd6.collect())

    rdd3.unpersist()
    time.sleep(100000)

6297822106df4f648b5759fd320c6ca8.png

961fa643346945a1b842bddda7222db3.png

Cache和Checkpoint区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)。
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)。
Cache 和 CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快。
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本) 。

案例练习:搜索引擎日志分析

0dd1f3dcea24466fa400012cb8541847.png

bcdb47e47d32466aa25847a43dd57614.png

60c99356504f4f02aa731a890bfd9597.png

# coding:utf8

# 导入Spark的相关包
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import add

if __name__ == '__main__':
    # 0. 初始化执行环境 构建SparkContext对象
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 读取数据文件
    file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")

    # 2. 对数据进行切分 \t
    split_rdd = file_rdd.map(lambda x: x.split("\t"))

    # 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.
    split_rdd.persist(StorageLevel.DISK_ONLY)

    # TODO: 需求1: 用户搜索的关键`词`分析
    # 主要分析热点词
    # 将所有的搜索内容取出
    # print(split_rdd.takeSample(True, 3))
    context_rdd = split_rdd.map(lambda x: x[2])

    # 对搜索的内容进行分词分析
    words_rdd = context_rdd.flatMap(context_jieba)

    # print(words_rdd.collect())
    # 院校 帮 -> 院校帮
    # 博学 谷 -> 博学谷
    # 传智播 客 -> 传智播客
    filtered_rdd = words_rdd.filter(filter_words)
    # 将关键词转换: 传智播 -> 传智播客
    final_words_rdd = filtered_rdd.map(append_words)
    # 对单词进行 分组 聚合 排序 求出前5名
    result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(5)

    print("需求1结果: ", result1)

    # TODO: 需求2: 用户和关键词组合分析
    # 1, 我喜欢传智播客
    # 1+我  1+喜欢 1+传智播客
    user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
    # 对用户的搜索内容进行分词, 分词后和用户ID再次组合
    user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)
    # 对内容进行 分组 聚合 排序 求前5
    result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        take(5)

    print("需求2结果: ", result2)

    # TODO: 需求3: 热门搜索时间段分析
    # 取出来所有的时间
    time_rdd = split_rdd.map(lambda x: x[0])
    # 对时间进行处理, 只保留小时精度即可
    hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
    # 分组 聚合 排序
    result3 = hour_with_one_rdd.reduceByKey(add).\
        sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
        collect()

    print("需求3结果: ", result3)

    time.sleep(100000)


import jieba


def context_jieba(data):
    """通过jieba分词工具 进行分词操作"""
    seg = jieba.cut_for_search(data)
    l = list()
    for word in seg:
        l.append(word)
    return l


def filter_words(data):
    """过滤不要的 谷 \ 帮 \ 客"""
    return data not in ['谷', '帮', '客']


def append_words(data):
    """修订某些关键词的内容"""
    if data == '传智播': data = '传智播客'
    if data == '院校': data = '院校帮'
    if data == '博学': data = '博学谷'
    return (data, 1)


def extract_user_and_word(data):
    """传入数据是 元组 (1, 我喜欢传智播客)"""
    user_id = data[0]
    content = data[1]
    # 对content进行分词
    words = context_jieba(content)

    return_list = list()
    for word in words:
        # 不要忘记过滤 \谷 \ 帮 \ 客
        if filter_words(word):
            return_list.append((user_id + "_" + append_words(word)[0], 1))

    return return_list

提交到集群运行

4bda8984859e4bdb8760a429a5209708.png

e35dfa2d151241b5a10bd5d21198ad38.png为什么要在全部的服务器安装jieba库?
因为YARN是集群运行,Executor可以在所有服务器上执行,所以每个服务器都需要有jieba库提供支撑。
如何尽量提高任务计算的资源?
计算CPU核心和内存量,通过--executor-memory 指定executor内存,通过--executor-cores 指定
executor的核心数,通过--num-executors 指定总executor数量。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/569483.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

flask实现简易图书管理系统

项目结构 技术选型 flask 做后端, 提供数据和渲染html 暂时没有提供mysql, 后续会更新操作mysql和样式美化的版本 起一个flask服务 flask是python的一个web框架, 下面演示如何提供http接口, 并返回json数据 main.py # flask创建http接口 from flask import Flask, request, jso…

Linux 的软件生态 软件包管理器(yum)编译器 - vim

Linux的软件生态 在 手机 ipad 笔记本等等我们使用的产品当中,会有应用商店等等可以下载软件的 软件包管理器,当我们想要下载某一款软件的时候,就去这个应用商店当中搜索,或者是去寻找,找到之后,进行下载&a…

Linux Kernel源码阅读: x86-64 系统调用实现细节(超详细)

0、前言 本文采用Linux 内核 v3.10 版本 本文不涉及调试、跟踪及异常处理的细节 一、系统调用简介 系统调用是用户空间程序与内核交互的主要机制。系统调用与普通函数调用不同,因为它调用的是内核里的代码。使用系统调用时,需要特殊指令以使处理器权限转…

初识Linux操作系统及常用的Linux命令

Linux是一种自由和开放源码的类UNIX操作系统,也是一种基于POSIX和Unix的多用户、多任务、支持多线程和多CPU的操作系统。伴随互联网的发展,企业对服务器速度和安全的要求越来越高,Linux系统由于具有性能稳定、防火墙组件性能高效、配置简单等…

Java内存模型的抽象结构 JMM

并发编程模型的两个关键问题 线程之间如何通信及线程之间如何同步。 线程之间如何通信:共享内存,消息传递线程之间如何同步通信是指线程之间以何种机制来 交换信息同步是指程序中用于控制不同线程间 操作发生相对顺序 的机制在共享内存的并发模型里&a…

Maven 详细教程(万字长文)

目录 一、Maven的简介二、Maven安装与配置三、Maven POM四、创建 Maven 项目五、Maven项目的构建与测试六、Maven依赖七、Maven仓库(本地仓库远程仓库)八、Maven生命周期(cleansitedefault)九、Maven常用插件十、Maven 版本号约定…

大模型时代的prompt学习(持续更新)

目录 为什么要学prompt基本原则prompt撰写框架Base Prompt FrameworkCRISPE Prompt Framework 场景撰写文案文档竞品分析产品设计数据分析 chain of thoughtzero shotin context learning(few shot)Self-Consistency Program-Aidedprompt tipsTo Do and Not To Doadd examples引…

水下图像0

d_r_1_.jpg 一个拖着电线的水下六足机器人在海水中作业 A robot is exploring the reef on the sea floor A hexapod robot works next to reef at the bottom of the sea A rectangular deep-sea robot swims past a patch of reef An underwater robot is detecting coral …

神经网络视觉AI“后时代”自瞄实现与对抗

通俗一点来说,自瞄是在FPS射击游戏中最为常见的作弊手段之一,当下最火爆的CSGO也深受其扰,在此我说些我自己的看法,欢迎大家在下方留言讨论; (1)软件层面 在神经网络方面的视觉AI应用流行之前&…

面试官:工作三年,还来面初级软件测试?恐怕你的软件测试工程师的头衔要加双引号...

相信身为测试工程师的你可能经历过这些: 已经工作三年了,每个项目都会加班加点全力以赴去完成,薪资增长幅度却不如人意。 听说年后离职的老同事,金三刚拿下高薪offer,年薪直奔50万了。 由于现在的公司接触不到新技术&…

docker(一)安装部署卸载以及基础命令使用

文章目录 1、安装1.1、安装插件1.2、设置源:1.3、安装docker卸载docker:1.4、配置国内源 2、基础命令3、提交某个镜像为新的镜像4、docker 存档 1、安装 机器配置: 1.1、安装插件 [rootdophin ~]# yum -y install yum-utils1.2、设置源: …

Spring相关面试题(Spring核心)

Spring相关面试题 谁定义了bean的生命周期IOC初始化 IOC启动阶段 (Spring容器的启动流程)Spring-IOC是什么IOC是什么DI是什么 依赖注入 DI的三种方式Spring-AOP是什么OOPAOPAOP实现方式动JDK动态代理和CGLIB动态代理 JDK动态代理和CGLIB动态代理IOC 和 AOP 的联系和区别 BeanFa…

【2023 · CANN训练营第一季】应用开发(初级)第五章——媒体数据处理

1.媒体数据处理 受网络结构和训练方式等因素的影响,绝大多数神经网络模型对输入数据都有格式上的限制。在计算机视觉领域,这个限制大多体现在图像的尺寸、色域、归一化参数等。如果源图或视频的尺寸、格式等与网络模型的要求不一致时,我们需…

LLMs开源模型们和数据集简介

本篇文章整理下目前常用的LLMs模型们和数据集简介。 BackBones ​https://github.com/FreedomIntelligence/LLMZoo 可以看到目前被广泛用来作为LLMs的backbone的模型有以下特点: Backbone:基于某个开源backbone,如GLM、LLaMA、BLOOMZ&#…

Android 文本识别:MLKIT + PreviewView

随着移动设备的普及和摄像头的高像素化,利用相机进行文本识别成为了一种流行的方式。MLKit 是 Google 提供的一款机器学习工具包,其中包含了丰富的图像和语言处理功能,包括文本识别。PreviewView 是 Android Jetpack 的一部分,它提…

2 files found with path ‘lib/arm64-v8a/libwechatbacktrace.so‘ from inputs

2 files found with path lib/arm64-v8a/libwechatbacktrace.so from inputs 解决方案,在app module的build.gradle里面的 android { } 块里面添加: packagingOptions {exclude lib/arm64-v8a/libwechatbacktrace.so} 如果有多个,就再增加行…

“宝石与石头”:一道简单却巧妙的力扣算法题

本篇博客会讲解力扣“771. 宝石与石头”的解题思路,这是题目链接。 先来审题: 以下是输出示例: 以下是提示: 本题可以使用数组模拟哈希表来实现。先把宝石字符串中的字符标识到数组的对应位置,每次拿石头字符串中的…

ChatGpt免费的镜像网站

目录 1.ChatGpt 简介 2.ChatGpt 免费网站合集 2.1 https://chat21.zhulei.xyz/ 2.2 Vega AI 创作平台 2.3 AI文本工具站 2.4 FancyPig (jqrai.one) 2.5 AiDuTu 1.ChatGpt 简介 ChatGPT是美国人工智能研究实验室OpenAI新推出的一种人工智能技术驱动的自然语言处理工具&…

geotools简介

geotools简介 官网 https://docs.geotools.org/latest/userguide/index.html 架构图 特性 1. 主要特性 Geotools主要提供各种GIS算法,实现各种数据格式的读写和显示。在显示方面要差一些,只是用Swing实现了地图的简单查看和操作。用户可以根据Geoto…

运营-17.留存

如何定义留存 某段时间内的新增用户,经过一段时间后,又继续使用应用的被认作是留存 用户,这部分用户占当时新增用户的比例即是留存率,即用户没有流失; 例如: 5月份新增用户200,这200人在6月份启…