【spark-spring boot】学习笔记

news2025/1/6 18:12:25

目录

  • 说明
  • RDD学习
    • RDD介绍
    • RDD案例
      • 基于集合创建RDD
      • RDD存入外部文件中
    • 转换算子 操作
      • map 操作
        • 说明
        • 案例
      • flatMap操作
        • 说明
        • 案例
      • filter 操作
        • 说明
        • 案例
      • groupBy 操作
        • 说明
        • 案例
      • distinct 操作
        • 说明
        • 案例
      • sortBy 操作
        • 说明
        • 案例
      • mapToPair 操作
        • 说明
        • 案例
      • mapValues操作
        • 说明
        • 案例
      • groupByKey操作
        • 说明
        • 案例
      • reduceByKey操作
        • 说明
        • 案例
      • sortByKey操作
        • 说明
        • 案例
    • 行动算子 操作
      • collect 操作
        • 说明
        • 案例
      • count 操作
        • 说明
        • 案例
      • first操作
        • 说明
        • 案例
      • take操作
        • 说明
        • 案例
      • countByKey操作
        • 说明
        • 案例
      • saveAsTextFile 操作
        • 说明
        • 案例
      • foreach操作
        • 说明
        • 案例

说明

本文依赖于上一篇文章:【spark学习】 spring boot 整合 spark 的相关内容,请先阅读上一篇文章,将spark需要的环境先配置好,并测试通过之后再进行此文章的阅读和操作。

RDD学习

RDD介绍

想象一下你有一个大大的数据表,里面包含了很多很多的信息。如果你想对这些数据进行操作,比如筛选出符合条件的数据、或者对数据做一些计算,RDD 就是 Spark 用来存储和操作这些数据的一种方式。它有两个基本操作:转换操作(就像是加工数据)和 行动操作(获取结果)

RDD案例

基于集合创建RDD

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
        parallelize.collect().forEach(System.out::println); // 打印
    }
}

输出结果:
在这里插入图片描述

RDD存入外部文件中

将RDD存入外部文件,用于观察文件结果和分区结果。txt文件数等于默认分区数。从结果中看出默认分区为4个。

注意:存放结果的文件夹路径必须没有被创建。

以下​案例中,将基于集合生成的RDD保存到saveRddDemo文件夹中​。

package www.zhangxiaosan.top.timer;


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");
        
        // parallelize(): 创建RDD,RDD中创建默认分区数
        // parallelize( 元素, 分区数): 创建RDD,RDD中指定分区数
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
        
        parallelize.collect().forEach(System.out::println); // 打印

        // 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。
        String fileSavePath="I:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo";
        parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 
    }
}

​运行结果:
在文件夹中存放的是运行程序生成的文件。如下图。
_SUCCESS文件​:成功标记
part-XXX 文件:保存的数据文件 ​,结果中有4个文件,说明默认分区为4个。
在这里插入图片描述

转换算子 操作

转换算子(Transformation)是指那些返回一个新的 RDD 的操作。转换算子不会立即执行计算,而是构建一个执行计划,只有当行动算子(Action)触发计算时,转换算子的操作才会实际执行。转换算子可以用来处理和转换数据,比如映射、过滤、聚合等。

map 操作

说明

map() 方法传入一个函数作为参数。map() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:map( 函数( RDD元素 ) )。 map() 方法会返回一个新的RDD。

案例

将RDD中的每个元素都拼接上字符串 “ say hi ”

package www.zhangxiaosan.top.timer;


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("张三", "李四", "王五");
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
		
		// lambda表达式传入匿名函数,进行元素拼接字符。
        // item 表示RDD中的元素
        parallelize = parallelize.map(item -> item + " say hi"); 
        
        parallelize.collect().forEach(System.out::println); // 打印
    }
}

输出结果:
在这里插入图片描述

flatMap操作

说明

flatMap() 方法传入一个函数作为参数 。flatMap() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:flatMap( 函数( RDD元素 ) )。 flatMap() 方法会返回一个新的RDD。
flatMap() 方法 会将 RDD的元素扁平化处理成一个集合。

例如:
假设你有一个箱子,箱子里面放着几个小盒子,每个小盒子里又有一些玩具。flatMap 就是一个工具,它能帮你把每个小盒子里的玩具拿出来,直接放进一个大盒子里,最终把所有玩具放在一个地方。
每个小盒子里的玩具可以不止一个,甚至可能没有玩具(比如有的盒子是空的)。flatMap 会把每个盒子里的玩具都拿出来,放到一个大盒子里,最终得到一个扁平的大盒子,里面是所有玩具。

案例

将集合:[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]
扁平化处理成:[ 1, 2, 3, 4, 5, hello, spark, 张三, 李四]

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void flatMapDemo() {
    	// 声明集合:[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]
        List<List<String>> arrs = Arrays.asList(
                Arrays.asList("1", "2", "3", "4", "5"),
                Arrays.asList("hello", "spark"),
                Arrays.asList("张三", "李四")
        );
        
        //输出声明的集合呢容
        System.out.println("原始集合打印:");
        arrs.forEach(item -> System.out.print(" " + item));

		// 分隔符
        System.out.println();
        System.out.println("-----------");

        System.out.println("flatMap操作后:");
        // 创建集合的RDD
        JavaRDD<List<String>> parallelize = javaSparkContext.parallelize(arrs);

		// flatMap操作
        List<String> collect = parallelize.flatMap(i -> i.iterator()).collect(); 

		// 打印 flatMap操作 后的集合
        collect.forEach(item -> System.out.print(" " + item));
        System.out.println();
    }
}

filter 操作

说明

filter() 方法传入一个函数作为参数,函数返回值只能为Boolean值。filter() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:filter( 函数( RDD元素 ) )。 filter() 方法会返回一个新的RDD。filter() 将RDD元素中满足条件的元素保留,即函数返回为true的元素;RDD中不满足条件的元素,即函数返回为false的元素过滤。

案例

过滤单数,保留双数


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void filterDemo() {
    	// 声明集合
        List<Integer> arrs = Arrays.asList(-1, 0, 1, 2, 3, 4, 5);

        System.out.println("原始集合打印:");
        arrs.forEach(item -> System.out.print(" " + item));
		// 输出过滤前的数据
        System.out.println();
        System.out.println("-----------");

        System.out.println("filter操作后:");
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

		// 过滤掉 单数 (取模不等于0的数字为单数,否则为偶数)。item % 2 == 0 的数字为双数,返回有true保留
        List<Integer> collect = parallelize.filter(item -> item % 2 == 0).collect();
		
		// 输出过滤后的数据
        collect.forEach(item -> System.out.print(" " + item));
        System.out.println();
    }
}

运行结果:
在这里插入图片描述

groupBy 操作

说明

将数据按某些条件进行分组。每个组由键值对组成。
groupBy() 常和以下聚合函数一起使用,来对分组数据进行统计分析。
常用的聚合操作包括:

  1. count(): 统计每个组的元素数量
  2. sum(): 计算每个组的元素总和
  3. avg(): 计算每个组的平均值
  4. max():计算每个组的最大值
  5. min(): 计算每个组的最小值
  6. agg(): 自定义聚合操作,可以结合多个聚合函数
案例

给写生成绩按照分数来分组。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void groupByDemo() {
    	// 定义学生成绩数据
        List<Tuple2<String, Integer>> students = Arrays.asList(
                new Tuple2<>("张三", 85),
                new Tuple2<>("李四", 90),
                new Tuple2<>("王五", 85),
                new Tuple2<>("赵六", 95),
                new Tuple2<>("孙七", 90)
        );
        
        // 创建RDD
        JavaRDD<Tuple2<String, Integer>> parallelize = javaSparkContext.parallelize(students);

        // 使用 groupBy 进行分组,按成绩分组
        JavaPairRDD<Object, Iterable<Tuple2<String, Integer>>> integerIterableJavaPairRDD = parallelize.groupBy(tuple -> tuple._2());// 使用 groupBy 按成绩分组。_2表示元组的第二个元素,即分数

        // 打印结果
        integerIterableJavaPairRDD.sortByKey().foreach(item -> System.out.println("成绩:" + item._1() + ",学生:" + item._2()));

    }
}

结果如下:
在这里插入图片描述

distinct 操作

说明

对RDD的元素进行分布式去重。返回新的RDD,新的RDD内的元素不重复出现。

案例

将集合中重复的元素去除。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    
    @PostConstruct
    public void distinctDemo() {
        // 声明集合
        List<Integer> arrs = Arrays.asList(1,2,3,4,5,6,1,4,6);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // distinct()方法,用于去除重复元素。
        JavaRDD<Integer> distinct = parallelize.distinct();

        // 打印结果
        distinct.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

sortBy 操作

说明

对RDD的元素进行排序,返回新的RDD。
sortBy() 可传入3个参数:
参数1:函数,每个RDD元素都会传入此函数中,此函数定义排序规则。
参数2:boolean值。定义排序顺序。true为升序,false降序。
参数3:Integer 整数,指定分区数量。

案例

将集合中的元素降序排序。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    
    @PostConstruct
    public void sortByDemo() {
	    // 创建无序集合
        List<Integer> arrs = Arrays.asList(7,2,5,4,1,9,6,3,8);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // 使用 sortBy 进行排序,按元素值排序,并返回一个RDD。
        JavaRDD<Integer> sortBy = parallelize.sortBy(item -> item, false, 1);

        // 打印结果
        sortBy.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

mapToPair 操作

说明

mapToPair()是将一个普通的 RDD 转换为 JavaPairRDD 的一个方法。 JavaPairRDD 中的每个元素都是一个键值对。
mapToPair对RDD的元素进行映射成一个由键值对组成的 RDD(即映射成JavaPairRDD),即将每个元素转换成一个 Tuple2 对象,其中第一个元素是键(key),第二个元素是值(value)。

案例

将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔,形成键值对,姓名为键,值为成绩。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void mapToPairDemo() {
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "王五:85", "赵六:95", "孙七:90");
        
        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);

        // 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对
        JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair((PairFunction<String, String, Integer>) s -> {
            // 根据冒号分隔学生姓名和成绩,返回一个 (姓名, 成绩) 的元组
            String[] parts = s.split(":");
            // 返回一个 (姓名, 成绩) 的元组
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        });

        // 打印结果
        studentsPairRDD.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

mapValues操作

说明

mapValues() 是一个用于处理 JavaPairRDD 的方法。它可以对 RDD 中的每个键值对的 值(value) 进行转换,同时保留原来的 键(key) 不变。

案例

将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔,形成键值对,姓名为键,值为成绩。并将成绩在原分数上+5分。


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void mapToPairDemo() {
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "王五:85", "赵六:95", "孙七:90");

        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);

        // 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对
        JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair((PairFunction<String, String, Integer>) s -> {
            // 根据冒号分隔学生姓名和成绩,返回一个 (姓名, 成绩) 的元组
            String[] parts = s.split(":");
            // 返回一个 (姓名, 成绩) 的元组
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        });

        // 打印结果
        System.out.println("原始分数:");
        studentsPairRDD.collect().forEach(System.out::println);

        System.out.println("\n+5分后:");
        // item 表示每个键值对中的值,即分数
        JavaPairRDD<String, Integer> newValeRdd = studentsPairRDD.mapValues(item -> item + 5);

        // 打印结果
        newValeRdd.collect().forEach(System.out::println);
    }
}

运行结果:
在这里插入图片描述

groupByKey操作

说明

groupByKey() 是一个用于处理 JavaPairRDD 的方法。它根据键对数据进行分组,将所有具有相同键的元素聚集到一起,生成一个新的 RDD,其中每个键对应一个包含所有相同键值的集合。
每个键对应的值收集到一个 Iterable 容器中,然后返回一个新的 RDD,其中每个键对应一个包含该键的所有值的 Iterable。

案例

将集合 [“张三:85”, “李四:90”, “陈八:95”, “王五:85”, “黄六:92”] 中的学生和成绩按照冒号分隔,形成键值对,成绩为键,值为姓名。并将键值对中的键进行分类,将相同分数的学生归成一组。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void groupByKeyDemo(){
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "陈八:95", "王五:85", "黄六:92");

        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);

        // 使用 mapToPair 将每个元素转换为 (成绩, 姓名) 的键值对
        JavaPairRDD<Integer, String> studentsPairRDD = studentsRDD.mapToPair(s -> {
            String[] parts = s.split(":");
            return new Tuple2<Integer, String>(Integer.parseInt(parts[1]), parts[0]);
        });
        
        // 根据键值对中的键进行分类
        JavaPairRDD<Integer, Iterable<String>> groupedRDD = studentsPairRDD.groupByKey();
		
		// 打印结果
        groupedRDD.collect().forEach(pair -> {
            System.out.println("成绩: " + pair._1() + ", 姓名: " + pair._2());
        });
  	}
}

执行结果:
在这里插入图片描述

reduceByKey操作

说明

reduceByKey()用于对 JavaPairRDD 数据进行聚合的一个方法。它根据键对值进行合并,并在分区内进行局部聚合,从而减少了跨节点的数据传输,通常比 groupByKey() 更高效。

案例

计算学生的总分。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void groupByKeyDemo(){
        // 创建包含学生成绩的列表
        List<String> students = Arrays.asList("张三:85", "李四:90", "张三:95", "王五:85", "李四:92");

        // 将数据并行化为 RDD
        JavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);
        
        // 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对
        JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair(s -> {
            String[] parts = s.split(":");
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        });

        // 使用 groupByKey 按学生姓名进行成绩求和
        JavaPairRDD<String, Integer> groupedRDD = studentsPairRDD.reduceByKey((a, b) -> a + b);

        // 打印结果
        groupedRDD.collect().forEach(pair -> {
            System.out.println("姓名: " + pair._1() + ", 成绩: " + pair._2());
        });
    }
}

运行结果:
在这里插入图片描述

sortByKey操作

说明

对 JavaPairRDD 数据按键进行排序的方法。默认为升序。
传入参数为Boolean值:true 升序 ,false降序

案例

将学生成绩按照成绩从高到低降序排序

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    @PostConstruct
    public void sortByKeyDemo(){
    	// 定义原始数据
        List<Tuple2<String, Integer>> students = Arrays.asList(
                new Tuple2<>("张三", 85),
                new Tuple2<>("李四", 90),
                new Tuple2<>("王五", 65),
                new Tuple2<>("赵六", 95),
                new Tuple2<>("孙七", 90)
        );
		// 生成RDD
        JavaPairRDD<String, Integer> studentsPairRDD = javaSparkContext.parallelizePairs(students);

        // 将学生原集合(姓名,成绩)格式的数据转换为(成绩,姓名)格式的键值对
        JavaPairRDD<Integer, String> integerStringJavaPairRDD = studentsPairRDD.mapToPair(item -> new Tuple2<>(item._2(), item._1()));
		
		// 将(成绩,姓名)格式的键值对按照键降序排序,即按照成绩降序排序
        JavaPairRDD<Integer, String> stringIntegerJavaPairRDD = integerStringJavaPairRDD.sortByKey(false);
		
		// 打印结果
        stringIntegerJavaPairRDD .collect() .forEach(pair -> {
            System.out.println("成绩: " + pair._1() + ", 姓名: " + pair._2());
        });
    }
}

运行结果:
在这里插入图片描述

行动算子 操作

行动算子(Action)是指会触发 Spark 作业的执行,并且会产生一个结果或者副作用的操作。与 转换算子(Transformation)不同,转换算子只会定义数据转换的计算逻辑,而不会立即执行。只有在遇到行动算子时,Spark 才会真正开始计算,并将结果返回给用户或写入外部存储。

当你调用一个行动算子时,Spark 会从头开始执行所有必要的转换操作,并将结果返回给你或者存储到外部系统(如 HDFS、数据库等)。

行动算子通常会返回一个具体的结果,例如一个列表、一个数值,或者在某些情况下,可能会执行一些副作用操作(例如将数据写入磁盘)。

collect 操作

说明

将分布式 RDD 中的所有数据项拉取到本地驱动程序(Driver)中,通常作为一个数组、列表或其他集合类型。因为 collect() 会将整个 RDD 数据集拉到本地,所以如果数据量非常大,可能会导致内存溢出(OutOfMemoryError)。

注意事项:
数据量大时的风险:如果 RDD 中包含的数据量非常大,调用 collect() 会导致所有数据被加载到本地驱动程序的内存中,这可能会导致内存溢出错误(OutOfMemoryError)。因此,建议在数据集非常大的情况下谨慎使用 collect()。

建议:对于大规模数据集,通常会使用其他行动算子(如 take())获取一个数据的子集,避免一次性加载所有数据。

并行计算的代价:尽管 collect() 会将所有数据从分布式环境中拉回到单个节点,但它不会对数据进行额外的计算,只会执行之前定义的转换操作。因此,它本质上是将整个数据集的计算结果从集群中汇总回来。

案例

收集RDD数据并打印。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    @PostConstruct
    public void collectDemo(){
        // 定义一个整数集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // collect()方法,用于将RDD中的数据收集到Driver端,并返回一个List。
        List<Integer> collect = parallelize.collect();

        // 打印结果
        collect.forEach(item -> System.out.println(item));
    }
}

执行结果:
在这里插入图片描述

count 操作

说明

统计RDD中的元素数量。

案例

统计RDD中的元素数量,并输出


import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;
    @PostConstruct
    public void countDemo(){
        // 定义一个整数集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // count()方法,统计数量。
        Long total = parallelize.count();

        // 打印结果
        System.out.println("元素数量 = " + total);
    }
}

运行结果:
在这里插入图片描述

first操作

说明

返回RDD中的第一个元素

案例

获取RDD中的第一个元素并打印

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void firstDemo(){
        // 创建一个集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // 获取第一个元素
        Integer first = parallelize.first();

        // 打印结果
        System.out.println("第一个元素 = " + first);
    }
}

运行结果:在这里插入图片描述

take操作

说明

在RDD中从头获取指定数量的元素,返回获取的元素集合。

案例

获取前3个元素

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void takeDemo(){
        // 创建一个集合
        List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);

        // 创建RDD
        JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);

        // 获取前三个元素
        List<Integer> takes = parallelize.take(3);

        // 打印结果
        System.out.println("前三个元素 = " + takes);
    }
}

运行结果:
在这里插入图片描述

countByKey操作

说明

统计RDD中每种键的数量。

案例

根据键值对,统计键值对中不同键的数量。并打印。

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    @PostConstruct
    public void countByKeyDemo(){
        // 创建一个集合
        List<Tuple2<String,Integer>> arrs = Arrays.asList(
                new Tuple2<String,Integer>("张三",15),
                new Tuple2<String,Integer>("张三",20),
                new Tuple2<String,Integer>("李四",20),
                new Tuple2<String,Integer>("李四",30),
                new Tuple2<String,Integer>("李四",50),
                new Tuple2<String,Integer>("王五",10)
        );

        // 创建JavaPairRDD 对象,JavaPairRDD对象的元素为键值对。
        JavaPairRDD<String, Integer> parallelize = javaSparkContext.parallelizePairs(arrs);

        // 使用 countByKey() 方法,统计相同键的数量。
        Map<String, Long> countByKey = parallelize.countByKey();

        // 打印结果
        countByKey.forEach((key, value) -> System.out.println("键: " + key + ", 数量: " + value));
    }
}

运行结果:
在这里插入图片描述

saveAsTextFile 操作

说明

将 RDD 的数据以txt文件保存到外部存储系统。传入指定路径,文件会生成到该路径下。

注意
输出路径不能存在:如果输出路径已经存在,saveAsTextFile 会抛出异常
文件分区:Spark 会将每个分区的数据写入一个独立的文件。因此,如果 RDD 有多个分区,它会生成多个文件,每个文件对应一个分区的数据 。 文件名会根据分区编号进行自动命名,通常形式是:part-00***
文本格式:保存时,RDD 中的每个元素会被转换为文本行。默认情况下,Spark 会把 RDD 中的每个元素的 toString() 输出到文件中。
路径支持分布式存储:saveAsTextFile 支持将数据保存到本地文件系统、HDFS、S3 等分布式存储中。路径的格式取决于存储系统的类型。例如,如果要保存到 HDFS,路径应该以 hdfs:// 开头。

案例

将RDD存入外部文件。

以下​案例中,将基于集合生成的RDD保存到saveRddDemo文件夹中​。具体参考本文中的 RDD存入外部文件中 内容

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

/**
 * spark 案例
 */
@Slf4j
@Component
public class DemoTimer {
    @Autowired
    JavaSparkContext javaSparkContext;

    @Autowired
    SparkSession sparkSession;

    /**
     * 创建 RDD 算子
     */
    @PostConstruct
    public void createRDD() {
        // 从集合创建 RDD
        List<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");
        
        // parallelize(): 创建RDD,RDD中创建默认分区数
        // parallelize( 元素, 分区数): 创建RDD,RDD中指定分区数
        JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD
        
        parallelize.collect().forEach(System.out::println); // 打印

        // 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。
        String fileSavePath="I:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo";
        parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 
    }
}

foreach操作

说明

循环遍历RDD中的元素

案例

以上案例中大部分用到此方法,是否用方式看以上案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2248812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot 3 集成 Spring Security(2)授权

文章目录 授权配置 SecurityFilterChain基于注解的授权控制自定义权限决策 在《Spring Boot 3 集成 Spring Security&#xff08;1&#xff09;》中&#xff0c;我们简单实现了 Spring Security 的认证功能&#xff0c;通过实现用户身份验证来确保系统的安全性。Spring Securit…

Apache OFBiz xmlrpc XXE漏洞(CVE-2018-8033)

目录 1、漏洞描述 2、EXP下载地址 3、EXP利用 1、漏洞描述 Apache OFBiz是一套企业资源计划&#xff08;ERP&#xff09;系统。它提供了广泛的功能&#xff0c;包括销售、采购、库存、财务、CRM等。 Apache OFBiz还具有灵活的架构和可扩展性&#xff0c;允许用户根据业务需求…

【Android】ARouter的使用及源码解析

文章目录 简介介绍作用 原理关系 使用添加依赖和配置初始化SDK添加注解在目标界面跳转界面不带参跳转界面含参处理返回结果 源码基本流程getInstance()build()navigation()_navigation()Warehouse ARouter初始化init帮助类根帮助类组帮助类 completion 总结 简介 介绍 ARouter…

springboot整合hive

springboot整合hive pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.…

IntelliJ IDEA 中,自动导包功能

在 IntelliJ IDEA 中&#xff0c;自动导包功能可以极大地提高开发效率&#xff0c;减少手动导入包所带来的繁琐和错误。以下是如何在 IntelliJ IDEA 中设置和使用自动导包功能的详细步骤&#xff1a; 一、设置自动导包 打开 IntelliJ IDEA&#xff1a; 启动 IntelliJ IDEA 并打…

【MySQL课程学习】:MySQL安装,MySQL如何登录和退出?MySQL的简单配置

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;MySQL课程学习 &#x1f337;追光的人&#xff0c;终会万丈光芒 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 MySQL在Centos 7环境下的安装&#xff1a; 卸载…

Easyexcel(7-自定义样式)

相关文章链接 Easyexcel&#xff08;1-注解使用&#xff09;Easyexcel&#xff08;2-文件读取&#xff09;Easyexcel&#xff08;3-文件导出&#xff09;Easyexcel&#xff08;4-模板文件&#xff09;Easyexcel&#xff08;5-自定义列宽&#xff09;Easyexcel&#xff08;6-单…

(计算机网络)期末

计算机网络概述 物理层 信源就是发送方 信宿就是接收方 串行通信--一次只发一个单位的数据&#xff08;串行输入&#xff09; 并行通信--一次可以传输多个单位的数据 光纤--利用光的反射进行传输 传输之前&#xff0c;要对信源进行一个编码&#xff0c;收到信息之后要进行一个…

uniapp跨域问题解决方案

uniapp跨域问题解决方案 引言 在使用 uni-app 本地开发 H5> 平台时&#xff0c;需要使用浏览器进行调试&#xff0c;而浏览器会有跨域的问题。比如直接通过本地IP地址去访问开发中的页面&#xff0c;同时这个页面会调一些现有的接口时&#xff0c;就面临着跨域的问题。 解决…

Android 基于Camera2 API进行摄像机图像预览

前言 近期博主准备编写一个基于Android Camera2的图像采集并编码为h.264的应用&#xff0c;准备分为三个阶段来完成&#xff0c;第一阶段实现Camera2的摄像机预览&#xff0c;第二阶段完成基于MediaCodec H.264编码&#xff0c;第三阶段完成基于MediaCodec H.264解码,针对不同…

设计模式:11、迭代器模式(游标)

目录 0、定义 1、迭代器模式的四种角色 2、迭代器模式的UML类图 3、示例代码 4、迭代器的next()方法与集合的get(int index)方法的效率对比&#xff08;LinkedList为例&#xff09; 0、定义 提供一种方法顺序访问一个聚合对象中的各个元素&#xff0c;而又不需要暴露该对象…

UE5连接VR(pico,quest)进行PC VR开发(没有废话全是干货)

一、PICO VR连接UE 首先picoVR&#xff0c;不管是pico neo3还是pico4&#xff0c;用到的软件就只有三个 分别是pico互联助手PICO 互联 | PICO (picoxr.com)、steam VR&#xff0c;虚幻引擎5 pico互联助手 在pico互联助手中你需要选择两种连接方式&#xff08;推荐USB连接&a…

《UnityShader 入门精要》更复杂的光照

代码&示例图见&#xff1a;zaizai77/Shader-Learn: 实现一些书里讲到的shader 到了这里就开启了书里的中级篇&#xff0c;之后会讲解 Unity 中的渲染路径&#xff0c;如何计算光照衰减和阴影&#xff0c;如何使用高级纹理和动画等一系列进阶内容 Unity 中的渲染路径 在U…

用nextjs开发时遇到的问题

这几天已经基本把node后端的接口全部写完了&#xff0c;在前端开发时考虑时博客视频类型&#xff0c;考虑了ssr&#xff0c;于是选用了nextJs&#xff0c;用的是nextUi,tailwincss,目前碰到两个比较难受的事情。 1.nextUI个别组件无法在服务器段渲染 目前简单的解决方法&…

Golang项目:实现一个内存缓存系统

要求 支持设定过期时间&#xff0c;精确到秒支持设定最大内存&#xff0c;当内存超过时做出合适的处理支持并发安全按照以下接口安全 type Cache interface{//size : 1KB 100KB 1MB 2MB 1GBSetMaxMemory(size string )bool//将value写入缓存Set(key string, val interface{},e…

Softing线上研讨会 | Ethernet-APL:推动数字时代的过程自动化

| &#xff08;免费&#xff09;线上研讨会时间&#xff1a;2024年11月19日 16:00~16:30 / 23:00~23:30 Ethernet-APL以10Mb/s的传输速率为过程工业中的现场设备带来了无缝以太网连接和本质安全电源&#xff0c;这不仅革新了新建工厂&#xff0c;也适用于改造现有工厂。 与现…

《Deep Multimodal Learning with Missing Modality: A Survey》中文校对版

文章汉化系列目录 文章目录 文章汉化系列目录摘要1 引言2 方法论分类&#xff1a;概述2.1 数据处理方面2.2 策略设计方面 3 数据处理方面的方法3.1 模态填充3.1.1 模态组合方法3.1.2 模态生成方法 3.2 面向表示的模型3.2.1 协调表示方法3.2.2 表示组合方法。3.2.3 表示生成方法…

python爬虫案例——猫眼电影数据抓取之字体解密,多套字体文件解密方法(20)

文章目录 1、任务目标2、网站分析3、代码编写1、任务目标 目标网站:猫眼电影(https://www.maoyan.com/films?showType=2) 要求:抓取该网站下,所有即将上映电影的预约人数,保证能够获取到实时更新的内容;如下: 2、网站分析 进入目标网站,打开开发者模式,经过分析,我…

鸿蒙安全控件之位置控件简介

位置控件使用直观且易懂的通用标识&#xff0c;让用户明确地知道这是一个获取位置信息的按钮。这满足了授权场景需要匹配用户真实意图的需求。只有当用户主观愿意&#xff0c;并且明确了解使用场景后点击位置控件&#xff0c;应用才会获得临时的授权&#xff0c;获取位置信息并…

MATLAB矩阵元素的修改及删除

利用等号赋值来进行修改 A ( m , n ) c A(m,n)c A(m,n)c将将矩阵第 m m m行第 n n n列的元素改为 c c c&#xff0c;如果 m m m或 n n n超出原来的行或列&#xff0c;则会自动补充行或列&#xff0c;目标元素改为要求的&#xff0c;其余为 0 0 0 A ( m ) c A(m)c A(m)c将索引…