Python第二语言(十三、PySpark实战)

news2025/1/11 22:40:26

目录

1.开篇

2. PySpark介绍

3. PySpark基础准备

3.1 PySpark安装

3.2 掌握PySpark执行环境入口对象的构建

3.3 理解PySpark的编程模型

4. PySpark:RDD对象数据输入

4.1 RDD对象概念:PySpark支持多种数据的输入,完成后会返回RDD类的对象;

4.2 Python数据容器转RDD对象.parallelize(数据容器对象)

4.3 RDD存在很多计算的方法

4.4 读取文件转RDD对象:通过SparkContext入口对象来读取文件,构建RDD对象;

5. PySpark:RDD对象数据计算(一)

5.1 给Spark设置环境变量(不设置的时候,控制台会报错,出现找不到python.exe解释器的情况)

5.2 RDD的map方法:将RDD的数据根据函数进行一条条处理

5.3 RDD的flatMap方法:基本和map一样,但是多一个功能:将嵌套list给转成单list;[[1, 2, 3], [4, 5, 6]]转成[1, 2, 3, 4, 5, 6]

5.4 RDD的reduceByKey方法:将key分组后进行value逻辑处理;

6. 数据计算案例(一):完成使用PySpark进行单词技术的案例

7. PySpark:RDD对象数据计算(二)

7.1 RDD的filter方法:传入T泛型数据,返回bool,为false 的数据丢弃,为true的数据保留;(函数对RDD数据逐个处理,得到True的保留至返回值的RDD中)

7.2 RDD的distinct方法:对RDD数据进行去重,返回新RDD;

7.3 RDD的sortBy方法:对RDD的容器按照指定规则排序,返回新RDD;

8. 数据计算案例(二):计算城市中的商品以及销售额

8.1 需求

8.2 文件数据

8.3 需求一实现:处理结果自动返回的是一个二元元组;

8.4 需求二实现:将字典中的数据处理,返回一个list;

8.5 需求三实现:过滤除北京的数据,并只返回一个参数category,是list列表,并进行去重,去重后的结果进行collect输出;

9. 将RDD的结果数据输出为Python对象的各类方法


导航:

Python第二语言(一、Python start)-CSDN博客

Python第二语言(二、Python语言基础)-CSDN博客

Python第二语言(三、Python函数def)-CSDN博客

Python第二语言(四、Python数据容器)-CSDN博客

Python第二语言(五、Python文件相关操作)-CSDN博客

Python第二语言(六、Python异常)-CSDN博客

Python第二语言(七、Python模块)-CSDN博客

Python第二语言(八、Python包)-CSDN博客

Python第二语言(九、Python第一阶段实操)-CSDN博客

Python第二语言(十、Python面向对象(上))-CSDN博客

Python第二语言(十一、Python面向对象(下))-CSDN博客

Python第二语言(十二、SQL入门和实战)-CSDN博客

Python第二语言(十三、PySpark实战)-CSDN博客

Python第二语言(十四、高阶基础)-CSDN博客

1.开篇

  • PySpark大数据计算第三方库,Spark是大数据开发的核心技术;
  • python的spark中使用map时 Python worker exited unexpectedly (crashed)
    • 将原本的python12解释器降低版本到python10版本解释器,降低python解释器版本,因为版本不兼容;
    •  
    •  记得下载使用的包;

2. PySpark介绍

  • Apache Spark是用于大规模数据(large-scala data)处理的统一(unifield)分析引擎;
  • Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据;
  • Python On Spark:Python语言,是Spark重点支持的方向;

PySpark第三方库:

  • PySpark是由Spark官方开发的Python语言第三方库;
  • Python开发者可以使用pip程序快速安装PySpark并像其它第三方库一样使用;
  • 主要作用:
    • 进行数据处理;
    • 提交至Spark集群,进行分布式集群计算;

3. PySpark基础准备

3.1 PySpark安装

安装命令: pip install pyspark

加速下载命令:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

3.2 掌握PySpark执行环境入口对象的构建
  • PySpark是分布式集群的操作,setMaster(xxx).\setAppName(xxx)是用来控制集群的代码,图中代码用的是单机的;
  • setAppName是Spark任务的名称;
  • PySpark的执行环境入口对象是:类SparkContext的类对象,所有PySpark的功能都是从SparkContext对象作为开始;
# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").\
    setAppName("test_spark_app")

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行
sc.stop()

3.3 理解PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口;

  • PySpark的编程三大步骤:
    1. 数据输入:通过SparkContex类对象的成员方法完成数据的读取操作,读取后得到RDD类对象;
    2. 数据处理计算:通过RDD类对象的成员方法,完成各种数据计算的需求;
    3. 数据输出:将处理完成后的RDD对象,调用各种成员方法完成,写出文件,转换位list等操作;

4. PySpark:RDD对象数据输入

  • RDD就是PySpark计算后返回的对象容器;
4.1 RDD对象概念:PySpark支持多种数据的输入,完成后会返回RDD类的对象;

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets);

  • PySpark针对数据的处理,都是以RDD对象作为载体;
    1. 数据存储在RDD内;
    2. 各类数据的计算方法,也都是RDD的成员方法;
    3. RDD的数据计算方法,返回值依旧是RDD对象;
  • 比如说JSON文件、文本文件、数据库数据,都是可以通过SparkContext类对象,经过RDD对象的处理,并返回给文件文件或JSON文件,或者数据库;
4.2 Python数据容器转RDD对象.parallelize(数据容器对象)
  • 提示:
    1. 字符串会被拆分出1个个的字符,存入RDD对象;
    2. 字典仅有key会被存入RDD对象;
    3. RDD对象返回的是容器,与list一样结果;
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内,称为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

# 使用collect方法查看RDD中的内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()

4.3 RDD存在很多计算的方法

4.4 读取文件转RDD对象:通过SparkContext入口对象来读取文件,构建RDD对象;
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

# 通过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("dataText")

# 打印RDD内容
print(rdd.collect())
sc.stop()

小结:

  • RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,可以:
    1. 提供数据存储;
    2. 提供数据计算的各类方法;
    3. 数据计算的方法,返回值依旧是RDD(RDD迭代计算);

5. PySpark:RDD对象数据计算(一)

  • 可以对list容器计算,可以对dict字典容器计算,可以对str字符串进行计算,所有的容器都可以通过RDD计算;
5.1 给Spark设置环境变量(不设置的时候,控制台会报错,出现找不到python.exe解释器的情况)

os.path.exists 返回值为True或False;

from pyspark import SparkConf, SparkContext
import os

# 配置Spark环境变量
os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'

# 检查PYSPARK_PYTHON路径
print(os.path.exists('C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'))
# 检查PYSPARK_DRIVER_PYTHON路径
print(os.path.exists('C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'))

5.2 RDD的map方法:将RDD的数据根据函数进行一条条处理

1. 介绍:

  • RDD对象内置丰富的:成员方法(算子)
  • map算子:是将RDD的数据一条条处理(处理的逻辑是将python中的函数作为参数进行传递,这个函数,参数会将RDD种的每条数据都进行处理)最终返回一个新的RDD对象;
    • map()中的参数 (T) → U:T代表传入一个参数,U代表一个返回值;(意思代表传入的参数是一个,还有一个返回值,T是泛型,不用指定数据类型)
    • map()中的参数 (T) → T:T代表传入一个参数,T代表一个返回值;(意思代表传入的参数是一个,还有一个返回值,T是泛型,传入的是什么值,那么返回的就是什么类型)

2. func函数传递:

        func函数作为参数:代表的是RDD中的每个值,都会进行func函数的处理;是RDD中的每一个元素都会被RDD处理一遍;

可以简写成:rdd2 = rdd.map(lambda x: x * 10) # 简写的函数

3. 案例:

  • 这里存在一个大坑,如果是python312版本去使用map函数,会报错 Python worker exited unexpectedly (crashed) ,降低版本即可,我用的版本10;
  • 结果:RDD中的每一个元素都会被传递给func进行处理,*10操作;
from pyspark import SparkConf, SparkContext
import os

# 配置Spark环境变量
os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.map(lambda x: x * 10)  # 简写的函数
print(rdd2.collect())
sc.stop()

4. map链式调用:

from pyspark import SparkConf, SparkContext
import os

# 配置Spark环境变量
os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5])

rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)  # 链式调用:将map进行第一个*10数据计算,再进行map+5数据计算

print(rdd2.collect())

5. 小结:

  1. map算子(成员方法):
    • 接受一个处理函数,可用lambda表达式快速编写;
    • 对RDD内的元素逐个处理,并返回一个新的RDD;
  2. 链式调用:对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子;
5.3 RDD的flatMap方法:基本和map一样,但是多一个功能:将嵌套list给转成单list;[[1, 2, 3], [4, 5, 6]]转成[1, 2, 3, 4, 5, 6]
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(["zhangSan lisi yiyi", "zhangSan yiyi wangWu", "wangWu yiyi zhangSan"])

    print(rdd.map(lambda x: x.split(" ")).collect())
    print("-----------------------------------------")
    print(rdd.flatMap(lambda x: x.split(" ")).collect())  # 将嵌套list转成单list,对数据接触嵌套

5.4 RDD的reduceByKey方法:将key分组后进行value逻辑处理;
  • 二元元组:[('a', 1), ('a', 1), ('b', 1)]这就是二元元组,元组中只有两个元素;

  • 自动按照key分组,完成组内数据(value)的聚合操作:就是会按照元组中的key,就是'a', 'a', 'b'进行key的value聚合,1, 1, 1是value;(value聚合的逻辑是,按照传入的func函数逻辑来进行聚合)

    假设这是二元元组数据要进行reduceByKey算子处理:

reduceByKey计算方式:

1. 思路:

  • 先分组,key值等于a和a一组,b和b一组:然后在进行函数lambda a, b: a+b进行处理,也即是分组后,a=a+a, b=b+b+b;结果[('b', 3), ('a', 2)]
  • 再解释:b有三个值,那么lambda a, b: a+b中表示的是b:1, 1, 1 的三个值,去进行函数处理的时候,先是第一个1和第二1进行相加,这时候相加是a+b,分组后与key无关系,那么第一个1和第二个1相加后等于2,这时候发现还有第三个1,这时候再次把第一次相加的结果,与第三个1进行a+b处理,2+1是前后者参数的相加处理;最终得到按照key分组聚合value的结果;
  • 最终解释:将数据分组后,每个组的数据进行lambda a, b: a + b 操作,每个组中的数据,进行a + b操作,意思就是将当前组的所有value进行相加操作;

2. 实现:

  • 功能:针对KV型RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作;
  • rdd.reduceByKey(func):
    from pyspark import SparkConf, SparkContext
    import os
    
    if __name__ == '__main__':
        os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
        conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
        sc = SparkContext(conf=conf)
    
        rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
    
        result = rdd.reduceByKey(lambda a, b: a + b)  # 分组计算
        print(result.collect())

6. 数据计算案例(一):完成使用PySpark进行单词技术的案例

  • 题目:读取文件,求出文件中单词出现的次数;
  • 文件:
  • 思路:

    先将字符串进行读取,然后按照空格分割['key', 'key'],在进行分割后的数组重组为(key, 1) 的形式,后面利用rdd的reduceByKey方法,将分组后的key,进行聚合操作,因为value都是1,所以可以得出对单词出现的次数,进行统计操作;

  • 根据 (key, 1) 重组后的数据应该是:

    [('key1', 1), ['key1', 1], ('key2', 1), ['key2', 1]]

  • 然后得出最终结果:
    from pyspark import SparkConf, SparkContext
    import os
    
    if __name__ == '__main__':
        os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
        conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
        sc = SparkContext(conf=conf)
    
        # 1.读取数据文件
        """
        假设你有一个大文件,里面有 300MB 的数据,如果你指定分区数为 3,Spark 会尝试将这个文件分成 3 个分区,每个分区大约 100MB。
        如果你的集群有 3 个节点,每个节点可以并行处理一个分区,这样就可以更快地完成任务。
        """
        file = sc.textFile("word", 3)  # ("xx" , 3):3是指文件被分成的最小分区数(partitions)
    
        # 2.将所有单词读取出来
        words = file.flatMap(lambda line: line.split(' '))  # 结果:['python', 'java', ...]
    
        # 3.将所有单词加1做value
        word_one = words.map(lambda x: (x, 1))  # 结果:[('python', 1), ('java', 1), ('php', 1), ('c#', 1),...]
    
        # 4.分组并求和
        result = word_one.reduceByKey(lambda a, b: a + b)
    
        # 5.打印结果
        print(result.collect())

7. PySpark:RDD对象数据计算(二)

7.1 RDD的filter方法:传入T泛型数据,返回bool,为false 的数据丢弃,为true的数据保留;(函数对RDD数据逐个处理,得到True的保留至返回值的RDD中)

  • 功能:过滤想要的数据进行保留;
  • filter算子作用:
    • 接受一个处理函数,可用lambda快速编写;
    • 函数对RDD数据逐个处理,得到True的保留至返回值的RDD中;
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 2, 3, 4, 5])

    # 保留基数
    print(rdd.filter(lambda x: x % 2 == 1).collect())

7.2 RDD的distinct方法:对RDD数据进行去重,返回新RDD;
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1, 1, 2, 3, 4, 5, 4, 5])

    # 对rdd对象进行去重
    print(rdd.distinct().collect())

7.3 RDD的sortBy方法:对RDD的容器按照指定规则排序,返回新RDD;
  • func: (T) → U告知按照rdd中的哪个数据进行排序,比如lambda x: x[1] 表示按照rdd中的第二列元素进行排序;
  • numPartitions:目前默认就为1;

结果:

按照元组tople中的第二位元素进行排序,按照降序;

lambda x: x[1]:计算规则,将所有容器的每一个元素按照函数规则处理,x是遍历的元组,x[1]是传入的元组的第二位元素,所以规则就是按照元组的第二位元素进行降序排序;

from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([("zhangSan", 99), ("lisi", 88), ("wangWu", 100)])

    # 对结果进行排序
    final_rdd = rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    print(final_rdd.collect())

  • sortBy算子小结:
    • 接收一个处理函数,可用lambda快速编写;
    • 函数表示用来决定排序的依据;
    • 可以控制升序或降序;
    • 全局排序需要设置分区数为1;

8. 数据计算案例(二):计算城市中的商品以及销售额

8.1 需求
  1. 需求一:各个城市销售额排名,从大到小;

    先按行读取文件,并对json进行split分割,按照|符号,得到最终的字典,使用Spark.reduceByKey进行分组,分组时传递func计算函数,将所有分组后的城市销售额进行a+b的形式,聚合起来,最终得到结果,并按照降序的排序方式排序输出;

  2. 需求二:全部城市,有哪些商品类别在售卖;

    文件读取后,将城市的categpry商品类别,distinct使用去重;

  3. 需求三:北京市有哪些商品类别在售卖;

    将除了北京市的所有数据进行filter过滤,过滤后只留下category并进行去重得到结果;

8.2 文件数据

{"id":1,"timestamp":"2024-06-01T01:03.00Z","category":"电脑","areaName":"杭州","money":"3000"}|{"id":2,"timestamp":"2024-06-01T01:03.00Z","category":"电脑","areaName":"杭州","money":"3500"}
{"id":3,"timestamp":"2024-06-01T01:03.00Z","category":"食品","areaName":"杭州","money":"3000"}|{"id":4,"timestamp":"2024-06-01T01:03.00Z","category":"食品","areaName":"杭州","money":"3700"}
{"id":5,"timestamp":"2024-06-01T01:03.00Z","category":"服饰","areaName":"北京","money":"3000"}|{"id":6,"timestamp":"2024-06-01T01:03.00Z","category":"服饰","areaName":"北京","money":"3900"}
8.3 需求一实现:处理结果自动返回的是一个二元元组;
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    # 1.读取文件得到RDD
    file_rdd = sc.textFile("orders")
    # 2. 取出一个个JSON字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
    # 3. 将一个个JSON字符串转换为字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))

    # print(dict_rdd.collect())

    # 4.取出城市和销售额数据
    city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))

    # 5.按城市分组按销售额聚合
    city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)

    # 6.按销售额聚合结果进行排序
    result_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    print("需求1的结果:", result_rdd.collect())

前三步数据结果:

完整数据结果:

8.4 需求二实现:将字典中的数据处理,返回一个list;
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    # 1.读取文件得到RDD
    file_rdd = sc.textFile("orders")
    # 2. 取出一个个JSON字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
    # 3. 将一个个JSON字符串转换为字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
    # 4.取出全部的商品类别
    category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
    print("需求2的结果:", category_rdd.collect())

8.5 需求三实现:过滤除北京的数据,并只返回一个参数category,是list列表,并进行去重,去重后的结果进行collect输出;
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
    sc = SparkContext(conf=conf)

    # 1.读取文件得到RDD
    file_rdd = sc.textFile("orders")
    # 2. 取出一个个JSON字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
    # 3. 将一个个JSON字符串转换为字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))

    # 4. 过滤北京的数据
    beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
    # 5.取出全部商品类别
    result_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
    print("需求3的结果:", result_rdd.collect())

9. 将RDD的结果数据输出为Python对象的各类方法

  • 数据输出:将RDD输出的值转成文件或Python对象;
  • collect算子:将各个分区内的数据,统一收集到Driver中,形成一个list对象;
  • reduce算子:对RDD数据集按照你传入的逻辑进行聚合;
  • task算子:取出RDD的前N个元素,组合成list返回;
  • count算子:计算RDD有多少条数据,返回值是一个数字;
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = 'C:/Users/Administrator/AppData/Local/Programs/Python/Python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

if __name__ == '__main__':
    rdd = sc.parallelize([1, 2, 3, 4, 5])

    # collect算子,输出RDD为list对象
    rdd_list: list = rdd.collect()
    print("collect算子结果:", rdd_list)
    print("collect算子类型是:", type(rdd_list))

    # reduce算子,对RDD进行两两聚合
    num = rdd.reduce(lambda a, b: a + b)
    print("reduce算子结果:", num)

    # take算子,取出RDD前N个元素,组成list返回
    take_list = rdd.take(3)
    print("take算子结果:", take_list)

    # count,统计rdd内有多少条数据,返回值为数字
    num_count = rdd.count()
    print("count算子结果:", num_count)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1820157.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【cocos creator 3.x】 修改builtin-unlit 加了一个类似流光显示的mask参数

效果见图: shader 代码修改如下, 主要看 USE_MASK_UVY 关键字部分修改: // Copyright (c) 2017-2020 Xiamen Yaji Software Co., Ltd. CCEffect %{techniques:- name: opaquepasses:- vert: unlit-vs:vertfrag: unlit-fs:fragproperties: &a…

数据可视化:Seaborn

安装Seaborn 进入虚拟环境,在终端中键入 pip install seaborn 即可安装。 初步使用Seaborn 在使用seaborn之前,我们先了解一下seaborn是什么,seaborn是以matplotlib为底层的更简便的python第三方库,它可以更快捷地去设置图形的一…

为CAP面板添加简单的Authentication登录验证功能 C#|.net

终于搞定了CAP Dashboard的登录验证功能! 因为网上找不到简单的CAP Dashboard的登录验证功能,所以这个功能摸索着开发了好久。 这个Authentication认证功能,不仅适用于CAP面板,也适用于懒得开发登录页面,但是又需要简单用户名密码登录的网页。 做过后端的比较熟悉,CAP面…

【SCAU数据挖掘】数据挖掘期末总复习题库简答题及解析——上

1.K-Means 假定我们对A、B、C、D四个样品分别测量两个变量,得到的结果见下表。 样品 变量 X1X2 A 5 3 B -1 1 C 1 -2 D -3 -2 利用K-Means方法将以上的样品聚成两类。为了实施均值法(K-Means)聚类,首先将这些样品随意分成两类(A、B)和(C、…

MYSQL 查看SQL执行计划

一、explain explain select id,db,user,host,command,time,state,info from information_schema.processlist order by time desc; id: 查询的标记,可以查看不同查询的执行顺序。 select_type: 查询的类型,如SIMPLE、SUBQUERY、PRIMARY等。 table: …

深入理解指针(二)

目录 1. 数组名的理解 2. 使用指针访问数组 3. ⼀维数组传参的本质 4. 冒泡排序 5. 二级指针 6. 指针数组 7. 指针数组模拟二维数组 1. 数组名的理解 有下面一段代码: #include <stdio.h> int main() {int arr[10] { 1,2,3,4,5,6,7,8,9,10 };int* p &arr[…

【python】通行网格地图四叉树化 (leeccode 427)

【python】通行网格地图四叉树化 受到Leecode 427题的启发&#xff0c;427. 建立四叉树 想将由0和1组成的网格地图绘制为四叉树地图&#xff0c;0表示可通行网格&#xff0c;1表示不可通行网格。 import matplotlib.pyplot as plt import matplotlib.patches as patches …

一文教会你静态住宅代理IP的优势和选择技巧,跨境小白收好这份指南!

一、什么是静态住宅代理IP&#xff1f; 静态住宅代理IP是指分配给个人住宅网络的IP地址&#xff0c;这些IP地址在长时间内保持不变。它们是从互联网服务提供商&#xff08;ISP&#xff09;获取的&#xff0c;因此拥有更高的可信度和较低的被封禁风险。静态住宅代理IP因其独特的…

SpringBoot3 常用的第三方接口调用十种方式

环境&#xff1a;SpringBoot.3.3.0 简介 在项目中调用第三方接口是日常开发中非常常见的。调用方式的选择通常遵循公司既定的技术栈和架构规范&#xff0c;以确保项目的一致性和可维护性。无论是RESTful API调用、Feign声明式HTTP客户端、Apache HttpClient等调用方式&#x…

Word同行内的文字如何左右分别对齐

先打开标尺&#xff08;视图-标尺&#xff09; 开右边&#xff0c;选一个制表位置&#xff0c;比如我选34 切回开始&#xff0c;点段落段落右下角 然后 然后 我修改为35&#xff08;因为“6月13日”总共3个字符&#xff09; 在文字中间按下Tab键&#xff0c;效果如下

Spring Boot 自定义校验注解

1.创建注解&#xff0c;可参考其他检验注解进行创建 2.创建校验类&#xff0c;需实现ContraintValidator并重写isValid方法,注意范型中表示给那个注解(State)提供校验及校验类型&#xff08;String&#xff09;,然后自行编写校验规则true为检验成功&#xff0c;false为失败 3.使…

网工内推 | 外企、上市公司运维工程师,有软考中高项证书优先

01 优尼派特&#xff08;苏州&#xff09;物流有限公司 &#x1f537;招聘岗位&#xff1a;软件运维测试工程师 &#x1f537;任职要求&#xff1a; 1、负责公司自主研发的软件售后服务工作, 包括软件的安装, 调试, 升级,培训, 参数配置, 需求与Bug的处理; 2、负责数据库升级及…

unDraw —— 免费且可定制的插画库,为您的设计注入灵魂

&#x1f3a8; unDraw —— 免费且可定制的插画库&#xff0c;为您的设计注入灵魂 在寻找能够完美融入您品牌风格的插画吗&#xff1f;unDraw&#xff0c;一个提供大量免费插画资源的网站&#xff0c;可能是您的理想选择&#xff01; &#x1f310; 网站特色 免费且开源 unDraw…

Doris集群管理工具Doris Manager安装使用(已踩坑)

背景&#xff1a;Doris集群管理、监控相对复杂&#xff0c;就想着有没有免费的、好用的管理工具&#xff0c;就发现了Doris Manager&#xff0c;给大家分享一下。 官网&#xff1a;https://docs.selectdb.com/docs/enterprise/cluster-manager-guide/deployment-guide/deployme…

【算法训练记录——Day28】

Day28——回溯算法Ⅳ 1.复原IP地址2.[全排列](https://leetcode.cn/problems/permutations/submissions/539240290/)3.[全排列Ⅱ](https://leetcode.cn/problems/permutations-ii/description/) ● 93.复原IP地址 ● 78.子集 ● 90.子集II 1.复原IP地址 思路&#xff1a;相当于…

【OceanBase DBA早下班系列】—— 性能问题如何 “拍CT“ (一键获取火焰图和扁鹊图)

1. 前言 最近接连遇到几个客户的环境在排查集群性能问题&#xff0c;总结了一下&#xff0c;直接教大家如何去获取火焰图、扁鹊图&#xff08;调用关系图&#xff09;&#xff0c;直击要害&#xff0c;就像是内脏的疾病去医院看病&#xff0c;上来先照一个CT&#xff0c;通过分…

HarmonyOS Next 系列之HTTP请求封装和Token持久化存储(四)

系列文章目录 HarmonyOS Next 系列之省市区弹窗选择器实现&#xff08;一&#xff09; HarmonyOS Next 系列之验证码输入组件实现&#xff08;二&#xff09; HarmonyOS Next 系列之底部标签栏TabBar实现&#xff08;三&#xff09; HarmonyOS Next 系列之HTTP请求封装和Token…

家用洗地机排行榜前十名:2024十大王牌机型精准种草

最近很多人都在问我洗地机相关的问题&#xff0c;不愧是改善家庭生活品质的“三神器”之一。洗地机依靠其清洁力和清洁效率吸引了越来越多的平时需要做家务人群的兴趣&#xff0c;为了解答大家关于洗地机的各种疑问&#xff0c;我把市面上目前非常火爆的洗地机型号和参数都进行…

探索未来通信的新边界:AQChat一款融合AI的在线匿名聊天

探索未来通信的新边界&#xff1a;AQChat一款融合AI的在线匿名聊天 在数字时代&#xff0c;即时通讯变得无处不在&#xff0c;但隐私和性能仍旧是许多用户和开发者关注的焦点。今天&#xff0c;我要介绍一个开创性的开源项目 —— AQChat&#xff0c;它不仅重定义了在线匿名聊…

Spring IoC注解

一、回顾反射机制 反射的调用三步&#xff1a;1&#xff09;获取类。2&#xff09;获取方法。3&#xff09;调用方法 调用方法&#xff1a;调用哪个对象&#xff0c;哪个方法&#xff0c;传什么参数&#xff0c;返回什么值。 方法&#xff08;Do&#xff09;类&#xff1a; …