Spark 4:Spark Core 共享变量

news2024/11/29 2:37:09

广播变量

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    stu_info_list = [(1, '张大仙', 11),
                     (2, '王晓晓', 13),
                     (3, '张甜甜', 11),
                     (4, '王大力', 11)]
    # 1. 将本地Python List对象标记为广播变量
    broadcast = sc.broadcast(stu_info_list)

    score_info_rdd = sc.parallelize([
        (1, '语文', 99),
        (2, '数学', 99),
        (3, '英语', 99),
        (4, '编程', 99),
        (1, '语文', 99),
        (2, '编程', 99),
        (3, '语文', 99),
        (4, '英语', 99),
        (1, '语文', 99),
        (3, '英语', 99),
        (2, '编程', 99)
    ])

    def map_func(data):
        id = data[0]
        name = ""
        # 匹配本地list和分布式rdd中的学生ID  匹配成功后 即可获得当前学生的姓名
        # 2. 在使用到本地集合对象的地方, 从广播变量中取出来用即可
        for stu_info in broadcast.value:
            stu_id = stu_info[0]
            if id == stu_id:
                name = stu_info[1]

        return (name, data[1], data[2])


    print(score_info_rdd.map(map_func).collect())

"""
场景: 本地集合对象 和 分布式集合对象(RDD) 进行关联的时候
需要将本地集合对象 封装为广播变量
可以节省:
1. 网络IO的次数
2. Executor的内存占用
"""

累加器

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

    # Spark提供的累加器变量, 参数是初始值
    acmlt = sc.accumulator(0)

    def map_func(data):
        global acmlt
        acmlt += 1
        # print(acmlt)

    rdd2 = rdd.map(map_func)
    rdd2.cache()
    rdd2.collect()

    rdd3 = rdd2.map(lambda x:x)
    rdd3.collect()
    print(acmlt)

综合案例

# coding:utf8
import time

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import re

if __name__ == '__main__':
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 读取数据文件
    file_rdd = sc.textFile("../data/input/accumulator_broadcast_data.txt")

    # 特殊字符的list定义
    abnormal_char = [",", ".", "!", "#", "$", "%"]

    # 2. 将特殊字符list 包装成广播变量
    broadcast = sc.broadcast(abnormal_char)

    # 3. 对特殊字符出现次数做累加, 累加使用累加器最好
    acmlt = sc.accumulator(0)

    # 4. 数据处理, 先处理数据的空行, 在Python中有内容就是True None就是False
    lines_rdd = file_rdd.filter(lambda line: line.strip())

    # 5. 去除前后的空格
    data_rdd = lines_rdd.map(lambda line: line.strip())

    # 6. 对数据进行切分, 按照正则表达式切分, 因为空格分隔符某些单词之间是两个或多个空格
    # 正则表达式 \s+ 表示 不确定多少个空格, 最少一个空格
    words_rdd = data_rdd.flatMap(lambda line: re.split("\s+", line))

    # 7. 当前words_rdd中有正常单词 也有特殊符号.
    # 现在需要过滤数据, 保留正常单词用于做单词计数, 在过滤 的过程中 对特殊符号做计数
    def filter_func(data):
        """过滤数据, 保留正常单词用于做单词计数, 在过滤 的过程中 对特殊符号做计数"""
        global acmlt
        # 取出广播变量中存储的特殊符号list
        abnormal_chars = broadcast.value
        if data in abnormal_chars:
            # 表示这个是 特殊字符
            acmlt += 1
            return False
        else:
            return True

    normal_words_rdd = words_rdd.filter(filter_func)
    # 8. 正常单词的单词计数逻辑
    result_rdd = normal_words_rdd.map(lambda x: (x, 1)).\
        reduceByKey(lambda a, b: a + b)

    print("正常单词计数结果: ", result_rdd.collect())
    print("特殊字符数量: ", acmlt)

广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候,降低内存占用以及减少网络IO传输,提高性能。
累加器解决了什么问题?
分布式代码执行中,进行全局累加。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/763632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式day03

01gradle极速安装与配置入门 下载6.8.2版本,配置环境变量 配置镜像仓库 给gradle安装目录下init.d文件夹,放一个init.gradle文件,内容如下: gradle.projectsLoaded {rootProject.allprojects {buildscript {repositories {def JCENTER_URL…

go语言计算推算心率算法 http服务

目的 为了计算心率和并且将心率计算作为http服务来运行 几种计算方法 1 基本数据 a) hrv heart rate variability b) 呼吸 2 傅里叶变换 计算频率 高频和低频 3 隐形马尔科夫 模型 hmm 重在于推测概率 根据最近的心率计算 4 神经网络计算 基本计算 …

APP外包开发原生和H5的对比

在开发APP的技术中,除了原生开发外也可以使用H5框架来开发。原生开发的特点是质量高,用户体验更好,但成本高,适用于对质量要求高的APP项目。H5框架的特点是通用性较强,对开发人员的要求相对较低,成本也低&a…

23款奔驰GLE350轿跑加装前排原厂座椅通风系统,夏天必备的功能

通风座椅的主动通风功能可以迅速将座椅表面温度降至适宜程度,从而确保最佳座椅舒适性。该功能启用后,车内空气透过打孔皮饰座套被吸入座椅内部,持续时间为 8 分钟。然后,风扇会自动改变旋转方向,将更凉爽的环境空气从座…

怎么批量查询快递单号的物流状态并导出查询数据

随着互联网技术的不断发展,电商行业想做大做强,售后环节一定要做到位。想要做好售后服务,需要借助一些技巧与手段。今天小编给大家安利一款软件:“固乔快递查询助手”,这是一款可以批量查询快递信息的软件,…

python_PyQt5开发股票日数据更新工具

写在前面: 该工具更新的股票日数据来自优矿,为了把股票日数据在本地存储一份,这就面临需要定期更新的需求,为此开发了这个工具。 定期更新的股票日数据特征: 1 旧股票日数据(也就是上次更新的数据&#…

【机器学习】吴恩达课程2-单变量线性回归

一、单变量线性回归 1. 模型描述 监督学习的流程 & 单变量线性回归函数 代价函数:,其中 m 表示训练样本的数量 公式为预测值与实际值的差,平方为最小二乘法和最佳平方/函数逼近。 目标:最小化代价函数,即 2. 只…

PR模板-33组故障干扰文字标题动画 Motion Glitch Typography

Motion Glitch Typography包含33组故障干扰文字标题动画pr模板。不需要任何插件或脚本,只需点击几下,您的视频就有很酷的故障标题动画,适用于预告片、宣传片或任何类型的视频。 适用软件:Premiere Pro 2020 或更高版本 分辨率&a…

Go语言之函数补充defer语句,递归函数,章节练习

defer语句是go语言提供的一种用于注册延迟调用的机制,是go语言中一种很有用的特性。 defer语句注册了一个函数调用,这个调用会延迟到defer语句所在的函数执行完毕后执行,所谓执行完毕是指该函数执行了return语句、函数体已执行完最后一条语句…

密码学学习笔记(十四):SHA-3 Sponge Construction - 海绵结构

SHA-3算法满足了哈希函数的三个安全属性,并且和SHA-2的变体达到同样级别的安全性。此外,SHA-3算法不容易受到长度扩展攻击,并可用于计算秘密消息的哈希值。 SHA-3是一种建立在Permutation(置换)之上的密码算法。 置换就是假设有两个数组a和…

Hadoop第一课之环境配置

1.配置一个模板机 要求:IP DNS地址页 网址 防火墙 安装包 1.ip ifconfig 查询 先用虚拟机看一下自己的网关 vim search/provides 命令 查找 # 修改网络配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 如果提示找不到vim命令,使用yum下载v…

Springboot Excel 最简单的 多sheet表 导入导出

前言 上周学习群里有人问到,多个sheet的导出导入,我第一反应就是easypoi不是自己就有方法了么? 后面一想,可能有些看客还处于是 找工具类,然后调试 的写代码 的 阶段,可能还不会去看jar包的一些函数。 既然…

SpringMVC入门篇5 --- 拦截器

目录 1. 简介 拦截器(Interceptor)是一种动态拦截方法调用的机制。 作用: 在指定的方法调用前后执行预先设定后的代码。阻止原始方法的执行。 拦截器与过滤器的区别 归属不同:Filter属于Servlet技术,Interceptor…

使用vue3 + Ts + Vite + ElementPlus实现一个抽奖程序

一. 说明 这是一个通过vue3 Ts Vite ElementPlus实现的一个抽奖程序。项目链接 二. 整体架构与功能描述 左侧设置了奖品说明,每个奖项配有文字和图片简介。总共设置了四个奖项,分别是特等奖1名,一等奖2名,二等奖5名&#xf…

平安养老险党委书记、董事长甘为民:聚焦养老主业 助推养老保障事业高质量发展

每经记者 涂颖浩 每经编辑 马子卿 随着人口老龄化趋势加剧,中国养老金融市场呈现出巨大的潜力,逐步迈入养老新时代。近日,平安养老险党委书记、董事长甘为民在接受《每日经济新闻》记者专访时表示,过往单纯的养老发展模式难以满足…

Jmeter性能测试 —— 性能测试的概念

性能测试的概念 性能测试是指通过特定方式,对被测系统按照一定策略施加压力,获取系统 响应时间、TPS(Transaction Per Second)、吞吐量、资源利用率等性能指标,以期保证生产系统的性能能够满足用户需求的过程。 性能…

浅析编译与链接

生成可执行文件的四个过程 当编写和构建计算机程序时,预处理、编译、汇编和链接是将源代码转化为可执行程序的关键过程。以下是对每个阶段的详细解释: 1. 预处理(Preprocessing):将.c/.cpp文件中的头文件展开、宏展开…

【PostgreSQL内核学习(一)—— Ubuntu源码安装PostgreSQL】

Ubuntu源码安装PostgreSQL 1. PostgreSQL官网下载压缩包2. 解压&安装2.1 解压文件2.2 安装依赖2.3 执行安装2.4 执行安装2.5 添加路径到文件 3. 初始化数据库与使用3.1 初始化数据库3.2 启动数据库服务3.3 启动数据库 1. PostgreSQL官网下载压缩包 下载地址:ht…

【黑马头条之freemarker入门】

本笔记内容为黑马头条项目的freemarker部分 目录 一、freemarker 介绍 二、环境搭建&&快速入门 1、创建测试工程 2、配置文件 3、创建模型类 4、创建模板 5、创建controller 6、创建启动类 7、测试 三、freemarker基础 1、基础语法种类 2、集合指令&#…

【iOS】—— 面向对象,Runtime,ARC等问题总结

对于暑假学习大多数是对之前学习的一个复习,在这里只做对之前学习欠缺知识的补充以及这些知识点涉及的一些问题,从问题入手学习。 文章目录 面向对象1.一个NSObject对象占多少内存?2.对象的isa指针指向哪里?3.OC的类信息存放在哪…