Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))

news2024/11/15 4:40:39

1、RDD概述

1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫弹性分布式数据集,是Spark中对于分布式数据集的抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

1.2 RDD五大特性

1、一组分区,即是数据集的基本组成单位,标记数据是哪个分区的
2、一个计算每个分区的函数
3、RDD之间的依赖关系
4、一个Partitioner,即RDD的分片函数:控制分区的数据流向(键值对)
5、一个列表,储存存取每个Partition的优先位置(prefered Location)。如果节点和分区个数不对应优先把分区设置在那个节点。移动数据不如移动计算,除非资源不够。

2、RDD编程

2.1 RDD的创建

在Spark中创建RDD的创建方式可以分为三种:
1、从集合中创建
2、从外部储存创建
3、从其他RDD创建

2.1.1 IDEA环境准备

1、创建一个maven工程,工程名称叫SparkCore
2、在pom文件中添加spark-core的依赖和scala的编译插件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.0</version>
    </dependency>
</dependencies>

3、如果不想运行时打印大量日志,可以在resources文件夹中添加log4j2.properties文件,并添加日志配置信息

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
2.1.2 创建IDEA快捷键

创建SparkContext和SparkConf时存在的模板代码,我们可以设置idea快捷键一键生成。
1、点击File->Settings…->Editor->Live Templates->output->Live Template

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

//第八步的代码
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// TODO. 编写代码

// x. 关闭sc
sc.stop();

设置自动导包
在这里插入图片描述

2.1.3 从集合中创建

1、创建包com.zhm.spark
2、创建类Test01_createRDDWithList

public class Test01_createRDDWithList {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo 编写代码--由字符串数组创建RDD
        JavaRDD<String> stringRDD = sparkContext.parallelize(Arrays.asList("hello", "zhm"));

        //4、收集RDD
        List<String> result = stringRDD.collect();

        //5、遍历打印输出结果
        result.forEach(System.out::println);


        //6、 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:
在这里插入图片描述

2.1.4从外部储存系统的数据集创建

外部存储系统的数据集创建RDD如:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。
1、数据准备
在新建的SparkCore项目名称上右键–>新建input文件夹–>在input文件夹上右键–>新建word.txt。编辑如下内容

hello world
hello zhm
hello future

2、创建RDD

public class Test02_createRDDWithFile {
    public static void main(String[] args) {
       //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建SparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

        //3、编写代码--读取路径./input下的文件,并创建RDD
        JavaRDD<String> fileRDD = javaSparkContext.textFile("./input/word.txt");

        //4、收集RDD
        List<String> result = fileRDD.collect();

        //5、遍历打印输出结果
        result.forEach(System.out::println);

        //6、关闭 sparkContext
        javaSparkContext.stop();

    }
}

运行结果:
在这里插入图片描述

2.2 分区规则

2.2.1 从集合创建RDD

1、创建一个包名:com.zhm.spark.partition
2、代码验证

package com.zhm.spark.partition;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
 * @ClassName Test01_ListPartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 11:42
 * @Version 1.0
 */
public class Test01_ListPartition {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");


        //2、创建sparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

        //3、编写代码
        // 0*(5/2)=0   1*(5/2)=2.5   (0,2.5] 左开右闭 1,2
        //1*(5/2)=2.5   2*(5/2)=5     (2.5,5]左开右闭 3,4,5
        JavaRDD<Integer> integerRDD = javaSparkContext.parallelize(Arrays.asList(11, 12, 36, 14, 05), 2);

//        4、将RDD储存问文件观察文件判断分区
        integerRDD.saveAsTextFile("output");


//        JavaRDD<String> stringRDD = javaSparkContext.parallelize(Arrays.asList("1", "2", "3", "4", "5"),2);
//        stringRDD.saveAsTextFile("output");


        //5、关闭javaSparkContext
        javaSparkContext.stop();
    }
}

运行结果:
在这里插入图片描述

在这里插入图片描述

2.2.2 从文件创建RDD
package com.zhm.spark.partition;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @ClassName Test02_FilePartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 13:54
 * @Version 1.0
 */
public class Test02_FilePartition {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext对象
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

        //3、编写代码
        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("input/1.txt",3);

        //4、将stringJavaRDD储存到文件中
        stringJavaRDD.saveAsTextFile("output");

        //5、关闭资源
        javaSparkContext.stop();
    }
}


运行结果:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
1、分区规则
(1)分区数量的计算方式:
如果: JavaRDD stringJavaRDD = javaSparkContext.textFile(“input/1.txt”,3);
在这里插入图片描述

  • totalSize = 10 // totalSize指的是文件中的真实长度,这里需要确认你的文件换行符,不同的换行符是不一样的
  • goalSize = 10 / 3 = 3(byte) //表示每个分区存储3字节的数据
  • 分区数= totalSize/ goalSize = 10 /3 => 3,3,4
  • 由于第三个分区的4子节大于3子节的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即3,3,3,1
    (2)Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系
    (3)数据读取位置计算是以偏移量为单位来进行计算的。
    (4)数据分区的偏移量范围的计算
    在这里插入图片描述

2.3 Transformation 转换算子

2.3.1 Value类型

创建包名com.zhm.spark.operator.value

2.3.1.1 map()映射

1、用法:给定映射函数f,map(f)以元素为粒度对RDD做数据转换
2、映射函数f:
(1)映射函数f可以带有明确签名函数,也可以是匿名内部函数
(2)映射函数f的参数类型必须与RDD的元素类型保持一致,而输出类型则任由开发者自行决定。
3、解释说明:
函数f是一个函数可以写作匿名子类,它可以接受一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并一次应用f函数,从而产生一个新的RDD。即这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。
4、需求:将Lover.txt文件中的每行结尾拼接“Thank you”
5、具体实现

package com.zhm.spark.operator.value;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/**
 * @ClassName StudyMap
 * @Description 对单个元素进行操作
 * @Author Zouhuiming
 * @Date 2023/6/27 14:04
 * @Version 1.0
 */
public class StudyMap {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、编写代码  对元素进行操作
        JavaRDD<String> stringJavaRDD = sparkContext.textFile("input/Lover.txt");

        JavaRDD<String> mapRDD = stringJavaRDD.map(s -> s + " Thank you");

        JavaRDD<String> mapRDD1 = stringJavaRDD.map(new Function<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + "Thank you";
            }
        });

        //4、遍历打印输出结果
        mapRDD.collect().forEach(System.out::println);
        System.out.println("++++++++++++++++++++++++");
        mapRDD1.collect().forEach(System.out::println);

        //5 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果
在这里插入图片描述

2.3.1.2 flatMap()扁平化

flatMap其实和map算子一样,flatMap也是用来做数据映射的。
1、用法:flatMap(f),以元素为粒度,对RDD进行数据转换。
2、特点:
不同于map映射函数f的类型是(元素)->(元素)
flatMap的映射函数类型是(元素)->(集合)
3、过程:
(1)以元素为单位,创建集合
(2)去掉集合“外包装”,提前集合元素
4、功能说明
与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。
5、案例说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。
6、具体实现

package com.zhm.spark.operator.value;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * @ClassName StudyFLatMap
 * @Description 炸裂
 * @Author Zouhuiming
 * @Date 2023/6/27 14:09
 * @Version 1.0
 */
public class StudyFLatMap {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、编写逻辑代码--创建列表arrayList,其中每个元素的类型是字符串列表
        ArrayList<List<String>> arrayList = new ArrayList<>();

        arrayList.add(Arrays.asList("1","2","3"));
        arrayList.add(Arrays.asList("4","5","6"));
        arrayList.add(Arrays.asList("7","8","9"));

        //4、根据arraylist创建RDD
        JavaRDD<List<String>> listJavaRDD = sparkContext.parallelize(arrayList);

        //5、使用flatMap将RDD中每个元素进行转换打散,泛型为打散之后的数据
        JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() {
            @Override
            public Iterator<String> call(List<String> strings) throws Exception {
                return strings.iterator();
            }
        });

        //6、收集RDD,并打印输出
        System.out.println("---------输出集合构建的RDD之flatMap测试------------");
        stringJavaRDD.collect().forEach(System.out::println);


        //Todo 从文件读取数据的话要自己实现将元素转换为集合
        //7、读取文件中的数据
        JavaRDD<String> javaRDD = sparkContext.textFile("input/word.txt");

        //8、将每行数据按空格切分之后,转换为一个list数组再将String数组转换为list集合返回list集合的迭代器
        JavaRDD<String> stringJavaRDD1 = javaRDD.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] split = s.split(" ");
                return Arrays.asList(split).iterator();
            }
        });

        //9、收集RDD,并打印输出
        System.out.println("-----输出文件系统构建的RDD之flatMap测试---");
        stringJavaRDD1.collect().forEach(s -> {
            System.out.println(s);
        });

        //10 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果
在这里插入图片描述

2.3.1.3 filter()过滤

1、用法:filter(f),以元素为粒度对RDD执行判定函数f
2、判定函数
(1)f,指的是类型为(RDD元素类型)=> (Boolean)的函数
(2)判定函数f的形参类型,必须与RDD的元素类型保持一致,而f的返回结果,只能是True或者False。
3、功能说明
(1)在任何一个RDD上调用filter(f)方法时,会对该RDD中每一个元素应用f函数
(2)作用是保留RDD中满足f(即f返回值为True)的数据,过滤掉不满足f(即f返回值为false)的数据。
4、需求说明:创建一个RDD,过滤出对2取余等于0的数据
在这里插入图片描述
5、代码实现

package com.zhm.spark.operator.value;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

/**
 * @ClassName StudyFilter
 * @Description 过滤元素
 * @Author Zouhuiming
 * @Date 2023/6/27 14:26
 * @Version 1.0
 */
public class StudyFilter {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo  根据集合创建RDD
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);

        // 根据数据与2取模,过滤掉余数不是0的数据元素
        JavaRDD<Integer> filterRDD = javaRDD.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        });

        System.out.println("------filter算子测试------");
        filterRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述

2.3.1.4 groupBy()分组

1、用法:groupBy(f) ,以元素为粒度对每个元素执行函数f。
2、函数f:
(1)函数f为用户自定义实现内容,返回值任意
(2) 函数返回值为算子groupBy返回值的key,元素为value。
(3)算子groupBy的返回值为新的重新分区的K—V类型RDD
3、功能说明:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
4、案例说明:创建一个RDD,按照元素模以2的值进行分组。
5、代码实现

package com.zhm.spark.operator.value;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

/**
 * @ClassName StudyGroupBy
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 14:32
 * @Version 1.0
 */
public class StudyGroupBy {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo 根据集合创建RDD
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);

        //4、储存到文件查看groupby之前的情况
        javaRDD.saveAsTextFile("outputGroupByBefore");


        //5、对RDD执行groupBy操作,计算规则是value%2
        JavaPairRDD<Integer, Iterable<Integer>> integerIterableJavaPairRDD = javaRDD.groupBy(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer % 2;
            }
        });

        //6、类型可以容易修改
        JavaPairRDD<Boolean, Iterable<Integer>> booleanIterableJavaPairRDD = javaRDD.groupBy(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        });

        //7、输出结果
        System.out.println("---执行groupBy之后的RDD分区情况---");
        integerIterableJavaPairRDD.collect().forEach(System.out::println);
        booleanIterableJavaPairRDD.collect().forEach(System.out::println);

        //x 关闭 sparkContext
        Thread.sleep(1000000000L);
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述
6、说明:
(1)groupBy会存在shuffle过程
(2)shuffle:将不同的分区数据进行打乱重组的过程
(3)shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。
在这里插入图片描述

2.3.1.5 distinct()去重

1、用法:distinct(numPartitions), 实现对RDD进行分布式去重。
2、参数numPartitions:指定去重后的RDD的分区个数。
3、功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。
4、代码实现

package com.zhm.spark.operator.value;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
 * @ClassName StudyDistinct
 * @Description 去重
 * @Author Zouhuiming
 * @Date 2023/6/27 14:44
 * @Version 1.0
 */
public class StudyDistinct {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo 根据集合创建RDD
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 1, 2, 3, 4, 5, 6), 2);

        //4、使用distinct算子实现去重,底层使用分布式去重,慢但是不会OOM
        JavaRDD<Integer> distinctRDD = javaRDD.distinct();

        //5、收集打印
        distinctRDD.collect().forEach(System.out::println);


        Thread.sleep(100000000L);
        //6 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述

5、distinct会存在shuffle过程。在这里插入图片描述

2.3.1.6 sortBy()排序

1、用法:RDD.sortBy(f, ascending, numpartitions)
2、参数介绍:
(1)函数f: 对每个元素都执行函数f,返回值类型和元素中的类型一致
(2)ascending:数据类型为Boolean,默认是True,参数决定了排序后,RDD中的元素的排列顺序,即升序/降序
(3)numpartitions:排序后的RDD的分区数。
3、功能说明
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。默认排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。
4、案例需求说明:创建一个RDD,按照数字大小分别实现正序和倒序排序
在这里插入图片描述
5、代码实现

package com.zhm.spark.operator.value;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

/**
 * @ClassName StudySortBy
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 14:48
 * @Version 1.0
 */
public class StudySortBy {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 3, 2, 9, 6, 5, 3), 2);

        //4、使用sortBy算子对javaRDD进行排序(泛型->以谁作为标准排序,true->为正序)

        JavaRDD<Integer> javaRDD1 = javaRDD.sortBy(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer;
            }
        }, true, 2);

        //5、收集输出
        javaRDD1.collect().forEach(System.out::println);

        //x 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述

2.3.2 Key-Value类型

创建包名:com.zhm.spark.operator.keyvalue

2.3.2.1 mapToPair()

1、用法:RDD.mapToPair(f) ,对父RDD中的每条记录都执行函数f,得到新的记录<k, v>
2、作用:将Value类型转换为key-Value类型
3、代码实现

package com.zhm.spark.operator.keyvalue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * @ClassName StudyMapToPair
 * @Description 将不是kv类型的RDD转换为kv类型的RDD
 * @Author Zouhuiming
 * @Date 2023/6/27 14:52
 * @Version 1.0
 */
public class StudyMapToPair {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

        //4、使用mapToPair算子对javaRDD转换为kv类型的RDD

        JavaPairRDD<Integer, Integer> pairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
                return new Tuple2<>(integer, integer);
            }
        });

        //5、收集输出
        System.out.println("------由v型RDD转换得到的kv型RDD------");
        pairRDD.collect().forEach(System.out::println);

        //Todo 由集合直接创建KV型RDD
        JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 2), new Tuple2<>(3, 3), new Tuple2<>(4, 5)));

        //6、收集输出
        System.out.println("-----由集合直接创建KV型RDD-----");
        integerIntegerJavaPairRDD.collect().forEach(System.out::println);
        
     sparkContext.stop();
    }
}

运行结果:
在这里插入图片描述

2.3.2.2 mapValues()只对Value进行操作

1、用法:newRDD = oldRdd.mapValues(func)
2、参数函数func:自定义实现的函数,仅对oldRdd中(k,v)数据的v作用。
3、功能说明:针对于(K,V)形式的类型只对V进行操作
4、需求说明:创建一个pairRDD,并将value添加字符尾缀" Fighting"
5、代码实现

package com.zhm.spark.operator.keyvalue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @ClassName StudyMapValues
 * @Description 对kv类型的RDD的v进行操作
 * @Author Zouhuiming
 * @Date 2023/6/27 15:01
 * @Version 1.0
 */
public class StudyMapValues {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo
        JavaPairRDD<Integer, Integer> javaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 2), new Tuple2<>(3, 3),
                new Tuple2<>(4, 4), new Tuple2<>(5, 5)), 2);

        //4、为kv型RDD的v拼接尾缀"Fighting"
        JavaPairRDD<Integer, String> resultRDD = javaPairRDD.mapValues(new Function<Integer, String>() {
            @Override
            public String call(Integer integer) throws Exception {
                return integer + " Fighting";
            }
        });

        //5、打印收集
        resultRDD.collect().forEach(System.out::println);


        //6 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述

2.3.2.3 groupByKey()按照K重新分组

1、用法:KVRDD.groupByKey();
2、功能说明
groupByKey对每个key进行操作,但只生成一个结果集,并不进行聚合。
该操作可以指定分区器或者分区数(默认使用HashPartitioner)
3、需求说明
统计单词出现次数
在这里插入图片描述
4、代码实现

package com.zhm.spark.operator.keyvalue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @ClassName StudyGroupByKey
 * @Description 分组聚合
 * @Author Zouhuiming
 * @Date 2023/6/27 15:07
 * @Version 1.0
 */
public class StudyGroupByKey {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo
        JavaRDD<String> javaRDD = sparkContext.parallelize(Arrays.asList("a", "a", "a", "b", "b", "b", "b", "a"), 2);

        //4、根据JavaRDD创建KVRDD
        JavaPairRDD<String, Integer> pairRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //5、聚合相同的key
        JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

        //6、收集并输出RDD内容
        System.out.println("-----查看groupByKeyRDD的内容------");

        groupByKeyRDD.collect().forEach(System.out::println);
        sparkContext.stop();
    }
}

5、运行结果:
在这里插入图片描述

2.3.2.4 reduceByKey()按照K聚合V

1、用法:KVRDD.reduceByKey(f);
2、功能说明:将RDD[K,V]中的元素按照相同的K的V进行聚合。存在多种重载形式,可设置新RDD的分区数。
3、需求说明:统计单词出现次数
在这里插入图片描述
4、代码实现

package com.zhm.spark.operator.keyvalue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @ClassName StudyReduceByKey
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 15:15
 * @Version 1.0
 */
public class StudyReduceByKey {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo
        JavaRDD<String> javaRDD = sparkContext.parallelize(Arrays.asList("a", "a", "a", "b", "b", "b", "b", "a"), 2);

        //4、根据JavaRDD创建KVRDD
        JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //5、聚合相同的key,统计单词出现的次数
        JavaPairRDD<String, Integer> result = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        //6、收集并输出RDD的内容
        System.out.println("查看--result--的内容");
        result.collect().forEach(System.out::println);

        //x 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述

2.3.2.5 reduceByKey和groupByKey区别

1、educeByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
2、 groupByKey:按照key进行分组,直接进行shuffle。
3、在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。

2.3.2.6 sortByKey()按照K进行排序

1、用法:kvRDD.sortByKey(true/false)
2、功能说明:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
3、参数说明:true:为升序排列,false为降序排列
4、需求说明:创建一个pairRDD,按照key的正序和倒序进行排序
在这里插入图片描述
5、代码实现

package com.zhm.spark.operator.keyvalue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @ClassName StudySortByKey
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 15:47
 * @Version 1.0
 */
public class StudySortByKey {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、Todo 创建kvRDD
        JavaPairRDD<Integer, String> javaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(4, "d"), new Tuple2<>(3, "c"), new Tuple2<>(1, "a"),
                new Tuple2<>(2, "b")));

        //4、收集输出
        System.out.println("排序前:");
        javaPairRDD.collect().forEach(System.out::println);

        //5、对RDD按照key进行排序
        JavaPairRDD<Integer, String> sortByKeyRDD = javaPairRDD.sortByKey(true);

        //收集输出
        System.out.println("排序后:");
        sortByKeyRDD.collect().forEach(System.out::println);


        //x 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果:
在这里插入图片描述

2.4 Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
创建包名:com.zhm.spark.operator.action

2.4.1 collect():以数组的形式返回数据集

1、用法:RDD.collect();
2、功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
在这里插入图片描述
注意:所以的数据都会被拉取到Driver端,慎用
3、需求说明:创建一个RDD,并将RDD内容收集到Driver端打印(代码实现在最后面)

2.4.2 count()返回RDD中元素个数

1、用法:RDD.count(),返回值为Long类型
2、功能说明:返回RDD中元素的个数
在这里插入图片描述
3、需求说明:创建一个RDD,统计该RDD的条数(代码实现在最后面)

2.4.3 first()返回RDD中的第一个元素

1、用法:RDD.first(), 返回值类型是元素类型
2、功能说明:返回RDD中的第一个元素
在这里插入图片描述
3、需求说明:创建一个RDD,返回该RDD中的第一个元素(代码实现在最后面)

2.4.4 take()返回由RDD前n个元素组成的数组

1、RDD.take(int num), 返回值为RDD中元素类型的List列表
2、功能说明:返回一个由RDD的前n个元素组成的数组
在这里插入图片描述
3、需求说明:创建一个RDD,取出前3个元素(代码实现在最后面)

2.4.5 countByKey()统计每种key的个数

1、用法:pairRDD.countByKey(), 返回值类型为Map<[RDD中key的类型] , Long>
2、功能说明:统计每种key的个数
在这里插入图片描述

3、需求说明:创建一个PairRDD,统计每种key的个数(代码实现在最后面)

2.4.6 save相关算子

1、saveAsTextFile(path)
(1)功能:将RDD保存成Text文件
(2)功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本。
2、 saveAsObjectFile(path)
(1)功能:序列化成对象保存到文件
(2)功能说明:用于将RDD中的元素序列化成对象,存储到文件中。
(代码实现在最后面)

2.4.7 foreach()遍历RDD中每一个元素

1、功能:遍历RDD中的每个元素,并依次应用函数
在这里插入图片描述
2、需求说明:创建一个RDD,对每个元素进行打印

2.4.8 所有的代码实现
package com.zhm.spark.operator.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @ClassName TestAll
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/28 13:47
 * @Version 1.0
 */
public class TestAll {
    public static void main(String[] args) {
        //设置往HDFS储存数据的用户名
        System.setProperty("HADOOP_USER_NAME","zhm");
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("TestAll");

        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        //3、获取RDD
//        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
        JavaPairRDD<Integer, String> javaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(1, "z"), new Tuple2<>(2, "h"), new Tuple2<>(3, "m"),
                new Tuple2<>(1, "zhm")
        ),2);

        //4、collect():以数组的形式返回数据集
        //注意:所有的数据都会被拉取到Driver端,慎用
        System.out.println("-------------collect测试-------------");
        javaPairRDD.collect().forEach(System.out::println);

        //5、count():返回RDD中元素个数
        System.out.println("-------------count测试-------------");
        System.out.println(javaPairRDD.collect());

        //6、first():返回RDD中第一个元素
        System.out.println("-------------first测试-------------");
        System.out.println(javaPairRDD.first());

        //7、take():返回RDD前n个元素组成的数组
        System.out.println("-------------take测试-------------");
        javaPairRDD.take(3).forEach(System.out::println);

        //8、countByKey统计每种key的个数
        System.out.println("-------------countByKey测试-------------");
        System.out.println(javaPairRDD.countByKey());


        //9、save相关的算子
        //以文本格式储存数据
        javaPairRDD.saveAsTextFile("outputText");
        //以对象储存数据
        javaPairRDD.saveAsObjectFile("outputObject");

        //10、foreach():遍历RDD中每一个元素
        javaPairRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            @Override
            public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                System.out.println(integerStringTuple2._1+":"+integerStringTuple2._2);
            }
        });
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}


运行结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/732249.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pyecharts 绘制各种统计图的案例

Pyecharts 绘制各种统计图的案例 基础使用 from pyecharts import options as opts from pyecharts.charts import Bar, Line, Pie, Scatter from pyecharts.faker import Faker# 柱状图示例 def bar_chart():x_data Faker.choose()y_data Faker.values()bar (Bar().add_xa…

simulink实战 建模 简单车辆动力学模型

Gmg Discrete-TimeIntegrator 离散时间积分器

CentOS 7 搭建 Impala 4.1.2 + Kudu 1.15.0 测试环境

安装依赖 这部分不过于详细介绍&#xff0c;如果有现成环境也可以直接拿来使用。 Java 下载 java 安装包&#xff0c;需要登录 oracle&#xff0c;请自行下载。 cd /mnt tar zxvf jdk-8u202-linux-x64.tar.gz配置环境变量到 /etc/bashrc&#xff0c;并执行 source /etc/bas…

关于深度学习图像数据增广

数据增广方法在广义上可以按照产生新数据的方式分为数据变形和数据过采样。由于操作简单&#xff0c;同时数据量上的需求远比现在要低得多&#xff0c;早期对数据增广的应用多是数据变形类方法。对于图像数据&#xff0c;基本的图像变换操作都属于数据变形类增广方法&#xff0…

Jvm参数设置-JVM(八)

上篇文章说了逃逸分析和标量&#xff0c;代码实例解析了内存分配先从eden区域开始&#xff0c;当内存不足的时候&#xff0c;才会进入s0和s1&#xff0c;发生yangGC&#xff0c;之后大内存会放入old&#xff0c;因为我们昨天程序运行了一个45M的对象&#xff0c;于是小对象在ed…

详解------>数组笔试题(必备知识)

目录 本章将通过列题进一步了解sizeof 与strlen的区别&#xff0c;加强对数组的理解。 1&#xff1a;一维数组列题 2&#xff1a;字符数组列题 3&#xff1a;二维数组列题 首先在进行这些习题讲解之前我们需要知道的知识点 sizeof&#xff1a;是一个关键字&#xff0c;可以…

KMP--高效字符串匹配算法(Java)

KMP算法 KMP算法算法介绍代码演示: KMP算法 KMP算法是为了解决这一类问题,给定一个字符串str1,和一个字符串str2,如果str2属于str1d的字串,则返回字串第一个出现位置的下标,不存在返回-1. 注意: 子串是连续的. 举个例子 str1 “abc123abs” str1 长度假设m str2 “123”; str2…

pycharm汉化

安装pycharm 不多说了&#xff0c;直接下载安装即可 汉化 file -setting plugins 输入chinese进行搜索 点击 进行安装&#xff0c;等待安装完成 安装完成需要重启&#xff0c;点击重启&#xff0c;等待重启完成即可 出现上图&#xff0c;说明汉化成功了

【计算机视觉】YOLOv8的测试以及训练过程(含源代码)

文章目录 一、导读二、部署环境三、预测结果3.1 使用检测模型3.2 使用分割模型3.3 使用分类模型3.4 使用pose检测模型 四、COCO val 数据集4.1 在 COCO128 val 上验证 YOLOv8n4.2 在COCO128上训练YOLOv8n 五、自己训练5.1 训练检测模型5.2 训练分割模型5.3 训练分类模型5.4 训练…

Mybatis-xml和动态sql

xml映射方式 除了之前那种 select(语句) public void ...();通过注解定义sql语句&#xff0c;还可以通过xml的方式来定义sql语句 注意 在resource创建的是目录&#xff0c;要用斜线分隔 创建出文件后 先写约束 <?xml version"1.0" encoding"UTF-8"…

第4集丨JavaScript 使用原型(prototype)实现继承——最佳实战2

目录 一、临时构造器方式1.1 代码实现1.2 代码分析 二. 增加uber属性&#xff0c;用于子对象访问父对象2.1 实现分析2.2 代码实现 三. 将继承封装成extend()函数3.1 代码实现3.1.1 临时构造器实现extend()3.1.2 原型复制实现extend2() 3.2 代码测试3.2.1 测试extend()函数3.2.1…

uniapp打包嵌入app,物理返回键的问题

问题描述&#xff1a;将uniapp开发的应用打包成wgt包放入app后&#xff0c;发现手机自带的返回键的点击有问题&#xff0c;比如我从app原生提供的入口进入了uniapp的列表页&#xff0c;然后我又进入了详情页&#xff0c;这时候在详情页点击物理返回键的话&#xff0c;它直接就返…

C语言—最大公约数和最小公倍数

作者主页&#xff1a;paper jie的博客_CSDN博客-C语言,算法详解领域博主 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文录入于《算法详解》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心…

过河卒

题目描述 棋盘上 A 点有一个过河卒&#xff0c;需要走到目标 B 点。卒行走的规则&#xff1a;可以向下、或者向右。同时在棋盘上 C 点有一个对方的马&#xff0c;该马所在的点和所有跳跃一步可达的点称为对方马的控制点。因此称之为“马拦过河卒”。 棋盘用坐标表示&#xff…

云同步盘 vs 普通网盘:选择哪种更适合你?区别解析与选购指南!

云同步盘是一种基于云存储的在线服务&#xff0c;主要用于将本地文件存储到云端&#xff0c;并通过客户端软件实现文件的自动同步&#xff0c;从而保持本地和云端文件的同步更新。用户可以在任何设备上访问和共享这些文件。 云同步盘和普通云盘都是云存储服务&#xff0c;可以让…

Kubernetes CoreDNS

Kubernetes CoreDNS 1、DNS服务概述 coredns github 地址&#xff1a; https://github.com/kubernetes/kubernetes/blob/master/cluster/addons/dns/coredns/coredns.yaml.base service 发现是 k8s 中的一个重要机制&#xff0c;其基本功能为&#xff1a;在集群内通过服务名…

TL-ER2260T获取SSH密码并登录后台

TL-ER2260T获取SSH密码并登录后台 首先需要打开诊断模式 打开Ubuntu&#xff0c;通过如下指令计算SSH密码&#xff0c;XX-XX-XX-XX-XX-XX是MAC地址echo -n "XX-XX-XX-XX-XX-XX" | tr -d - | tr [a-z] [A-Z] | md5sum | cut -b 1-16SSH登录ssh -oKexAlgorithmsdiffie…

硬件打样和小批量生产

PCB 打样和小批量生产过程 包括PCB 定型、生产文件制作、元器件准备、装配图制作、贴片、全流程测试。 打样一般是 几块PCB 手工进行焊接。 其中生产文件根据加工厂 一般提供PCB或者Gerber。 元器件准备设计公司的物料管理&#xff0c;这里假设已经拿到了所需的物料。 装…

微信小程序开发22__在列表中 高亮选中某一项

思考一个问题: 在一个列表中&#xff0c;怎样实现高亮选中 某一项呢? 我们先看要实现的效果图 <!-- 这里data-index 用于点击时传递参数, 在js取时写法&#xff1a; e.target.dataset.index --> <view wx:for"{{info}}" class"{{indexnum?active:…

UTOPIA Automatic Generation of Fuzz Driver using Unit Tests

UTOPIA: Automatic Generation of Fuzz Driver using Unit Tests 这篇论文主要由三星研究院发表于2023 IEEE Symposium on Security and Privacy (SP)会议上 论文获取链接&#xff1a; https://gts3.org/assets/papers/2023/jeong:utopia.pdf 背景 模糊测试分为两种&#xf…