Spark04: Transformation与Action开发

news2024/9/24 9:24:59

一、创建RDD的三种方式

RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD这样就相当于设置了Spark应用程序的输入源数据然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD。

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件

  • 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  • 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。
  • 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。

1. 使用集合创建RDD

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

引入依赖:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.3</version>
<!--            <scope>provided</scope>-->
        </dependency>

Scala代码:

package com.sanqian.scala

import org.apache.spark.{SparkConf, SparkContext}

object CreateRddByArrayScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByArray").setMaster("local")
    val sc = new SparkContext(conf)
    //创建集合
    val arr = Array(1, 2, 3, 4, 5)
    //基于集合创建RDD
    val rdd = sc.parallelize(arr)
    //对集合中的元素求和
    val sum = rdd.reduce(_ + _)

    //注意:这行println代码是在driver进程中执行的
    println(sum)
  }
}

注意:val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行,parallelize还有reduce之类的操作是在worker节点中执行的

Java代码:

package com.sanqian.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.Arrays;
import java.util.List;

public class CreateRddByArrayJava {
    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //创建集合
        List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> rdd = sc.parallelize(arr);
        Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        System.out.println(sum);

        sc.stop();
    }
}

2. 使用本地文件和HDFS文件创建RDD

  • 通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容
  • textFile()方法支持针对目录、压缩文件以及通配符创建RDD。
  • Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话你的设置是不生效的

 Scala代码:

package com.sanqian.scala

import org.apache.spark.{SparkConf, SparkContext}

object CreateRddByFileScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByArray").setMaster("local")
    val sc = new SparkContext(conf)

    var path = "D:\\data\\words.txt"
    path = "hdfs://bigdata01:9000/words.txt"
    val rdd = sc.textFile(path)
    val length = rdd.map(_.length).reduce(_ + _)
    println(length)
    sc.stop()
  }
}

Java代码:

package com.sanqian.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.util.Arrays;
import java.util.List;

public class CreateRddByFileJava {
    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //创建集合
        String path = "D:\\data\\words.txt";
        path = "hdfs://bigdata01:9000/words.txt";
        JavaRDD<String> rdd = sc.textFile(path, 2);

        //获取每一行数据的长度
        JavaRDD<Integer> numRDD = rdd.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return s.length();
            }
        });
        //计算文件内数据的总长度
        Integer sum = numRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        System.out.println(sum);

        sc.stop();
    }
}

 二、Transformation和Action介绍

 Spark对RDD的操作可以整体分为两类:

  • Transformation和Action这里的Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等
  • Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

不管是Transformation操作还是Action操作,一般会把它们称之为算子,例如:map算子,reduce算子。

其中Transformation算子有一个特性:lazy,lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。

Action的特性:执行Action操作才会触发一个Spark任务的运行,从而触发这个Action之前所有的
Transformation的执行。

三、常用Transformation介绍

那下面我们先来看一下Spark中的Transformation算子

1.官方文档


先来看一下官方文档,进入2.4.3的文档界面

 

 2. 常用算子

算子介绍
map将RDD中的每个元素进行处理,一进一出
filter对RDD中每个元素进行判断,返回true则保留
flatMap与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey根据key进行分组,每个key对应一个Iterable<value>
reduceByKey对每个相同key对应的value进行reduce操作
sortByKey对每个相同key对应的value进行排序操作(全局排序)
join对两个包含<key,value>对的RDD进行join操作
distinct对RDD中的元素进行全局去重

 

 

 

 

 

 

 

 

3. Transformation操作开发实战

  • map:对集合中每个元素乘以2
  • filter:过滤出集合中的偶数
  • flatMap:将行拆分为单词
  • groupByKey:对每个大区的主播进行分组
  • reduceByKey:统计每个大区的主播数量
  • sortByKey:对主播的音浪收入排序
  • join:打印每个主播的大区信息和音浪收入
  • distinct:统计当天开播的大区信息

4. Scala代码:

package com.sanqian.scala

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:transformation 实战
 * map: 对集合中的每个元素乘以2
 * filter: 过滤出集合中的偶数
 * flatMap: 将行拆分为单词
 * groupByKey:对每个大区的主播进行分组
 * reduceByKey: 统计每个大区的主播数量
 * sortByKey: 对主播的音浪收入排序
 * join: 打印每个主播的大区信息和音浪收入
 * distinct: 统计当天开播的主播数量
 */
object TransformationOpScala {


  def main(args: Array[String]): Unit = {
    val sc = getSparkContext
    //map: 对集合中的每个元素乘以2
    //    mapOp(sc)
    //filter: 过滤出集合中的偶数
    //    filterOp(sc)
    //flatMap: 将行拆分为单词
    //    flatMapOp(sc)

    //groupByKey:对每个大区的主播进行分组
//    groupByKeyOp2(sc)
    //reduceByKey: 统计每个大区的主播数量
//    reduceBykeyOp(sc)
    //sortByKey: 对主播的音浪收入排序
//    sortByKeyOp(sc)
    //join: 打印每个主播的大区信息和音浪收入
//    joinOp(sc)
    //distinct: 统计当天开播的大区信息
    distinctOp(sc)
  }
  def distinctOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
    //由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
    rdd.map(_._2).distinct().foreach(println(_))
  }

  def joinOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
    val rdd2 = sc.parallelize(Array((150001, 400), (150002, 200), (15003, 300), (15004, 100)))

    rdd.join(rdd2).foreach(tup => {
      //用户id
      val uid = tup._1
      val area_gold = tup._2
      println(uid + "\t" + area_gold._1 +"\t" + area_gold._2)
    })
  }

  def sortByKeyOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, 400), (150002, 200), (15003, 300), (15004, 100)))
//    rdd.map(tup => (tup._2, tup._1)).sortByKey(ascending = false)
//    .foreach(println(_))
    //sortBy的使用,可以动态指定排序字段比较灵活
    rdd.sortBy(_._2, ascending = false).foreach(println(_))
  }
  def reduceBykeyOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
    rdd.map(tup => (tup._2, 1)).reduceByKey(_ + _).foreach(println(_))
  }

  def groupByKeyOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
    rdd.map(tup => (tup._2, tup._1)).groupByKey().foreach(tup => {
      //获取大区信息
      val area = tup._1
      print(area + ":")
      //获取同一个大区对应的所有用户id
      val it = tup._2
      for (uid <- it) {
        print(uid + " ")
      }
      println()
    })
  }
  // 如果tuple中的数据列超过了2列怎么办?
  // 把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用tuple2包装一下
  //注意:如果你的数据结构比较复杂,可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式
  def groupByKeyOp2(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US", "male"), (150002, "CN", "female"), (15003, "CN", "male"), (15004, "IN", "male")))
    rdd.map(tup => (tup._2, (tup._1, tup._3))).groupByKey().foreach(tup => {
      //获取大区信息
      val area = tup._1
      print(area + ":")
      //获取同一个大区对应的所有用户id
      val it = tup._2
      for ((uid, sex) <- it) {
        print("<" + uid + "," + sex + "> ")
      }
      println()
    })
  }

  def mapOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    rdd.map(_ * 2).foreach(println(_))
  }

  def flatMapOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array("good good study", "day day up"))
    rdd.flatMap(_.split(" ")).foreach(println(_))
  }

  def filterOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    //满足条件的保存下来
    rdd.filter(_ % 2 == 0).foreach(println(_))
  }

  def getSparkContext = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("TransformationOpScala").setMaster("local")
    new SparkContext(conf)
  }
}

5. Java代码:

package com.sanqian.java;

import jdk.nashorn.internal.ir.FunctionCall;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import scala.Tuple3;

import java.util.Arrays;
import java.util.Iterator;

public class TransformationOpJava {
    public static void main(String[] args) {
        JavaSparkContext sc = getSparkContext();
        //map: 对集合中的每个元素乘以2
//        mapOp(sc);
        //filter: 过滤出集合中的偶数
//        filterOp(sc);
        //flatMap: 将行拆分为单词
//        flatMapOp(sc);
        //groupByKey:对每个大区的主播进行分组
//        groupByKeyOp(sc);
//        groupByKeyOp2(sc);
        //reduceByKey: 统计每个大区的主播数量
//        reduceByKeyOp(sc);
        //sortByKey: 对主播的音浪收入排序
//        sortByKeyOp(sc);
        //join: 打印每个主播的大区信息和音浪收入
//        joinOp(sc);
        //distinct: 统计每个大区的主播
        distinctOp(sc);

    }

    private static void distinctOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t5 = new Tuple2<Integer, String>(150001, "US");
        Tuple2<Integer, String> t6 = new Tuple2<Integer, String>(150002, "CN");
        Tuple2<Integer, String> t7 = new Tuple2<Integer, String>(150003, "CN");
        Tuple2<Integer, String> t8 = new Tuple2<Integer, String>(150004, "IN");
        JavaRDD<Tuple2<Integer, String>> rdd2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
        rdd2.map(new Function<Tuple2<Integer, String>, String>() {
            @Override
            public String call(Tuple2<Integer, String> v1) throws Exception {
                return v1._2;
            }
        }).distinct().foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

    private static void joinOp(JavaSparkContext sc) {
        Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150002, 200);
        Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150001, 400);
        Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150003, 300);
        Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
        JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));

        Tuple2<Integer, String> t5 = new Tuple2<Integer, String>(150001, "US");
        Tuple2<Integer, String> t6 = new Tuple2<Integer, String>(150002, "CN");
        Tuple2<Integer, String> t7 = new Tuple2<Integer, String>(150003, "CN");
        Tuple2<Integer, String> t8 = new Tuple2<Integer, String>(150004, "IN");
        JavaRDD<Tuple2<Integer, String>> rdd2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));

        JavaPairRDD<Integer, Integer> rddPair = rdd.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
                return new Tuple2<Integer, Integer>(tup._1, tup._2);
            }
        });
        JavaPairRDD<Integer, String> rdd2Pair = rdd2.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<Integer, String> tup) throws Exception {
                return new Tuple2<Integer, String>(tup._1, tup._2);
            }
        });

        rddPair.join(rdd2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, String>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Integer, String>> tuple) throws Exception {
                //主播编号
                Integer uid = tuple._1;
                //大区和音浪收入信息
                Tuple2<Integer, String> tu = tuple._2();
                System.out.println(uid + "\t" + tu._1 + "\t" + tu._2);
            }
        });

    }

    private static void sortByKeyOp(JavaSparkContext sc) {
        //(150001, 400), (150002, 200), (15003, 300), (15004, 100)
        Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150002, 200);
        Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150001, 400);
        Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150003, 300);
        Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
        JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));



        /*
        rdd.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
                return new Tuple2<Integer, Integer>(tup._2, tup._1);
            }
        }).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
            @Override
            public void call(Tuple2<Integer, Integer> tup) throws Exception {
                System.out.println(tup);
            }
        });
         */
        rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer call(Tuple2<Integer, Integer> v1) throws Exception {
                return v1._2();
            }
        }, false, 1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
            @Override
            public void call(Tuple2<Integer, Integer> tup) throws Exception {
                System.out.println(tup);
            }
        });
    }


    private static void reduceByKeyOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
        Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
        JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));

        rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
                return new Tuple2<String, Integer>(tup._2, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                System.out.println(tuple);
            }
        });
    }

    private static void groupByKeyOp2(JavaSparkContext sc) {
        Tuple3<Integer, String, String> t1 = new Tuple3<Integer, String, String>(150001, "US", "male");
        Tuple3<Integer, String, String> t2 = new Tuple3<Integer, String, String>(150002, "CN", "female");
        Tuple3<Integer, String, String> t3 = new Tuple3<Integer, String, String>(150003, "CN", "female");
        Tuple3<Integer, String, String> t4 = new Tuple3<Integer, String, String>(150004, "IN", "male");
        JavaRDD<Tuple3<Integer, String, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));

        rdd.mapToPair(new PairFunction<Tuple3<Integer, String, String>, String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<String, Integer>> call(Tuple3<Integer, String, String> tup) throws Exception {
                return new Tuple2<String, Tuple2<String, Integer>>(tup._2(), new Tuple2<String, Integer>(tup._3(), tup._1()));
            }
        }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Tuple2<String, Integer>>> tup) throws Exception {
                //大区信息
                String area = tup._1;
                System.out.print(area + ":");
                //获取同一个大区所有用户对应的性别信息
                Iterable<Tuple2<String, Integer>> it = tup._2;
                for (Tuple2<String, Integer> tu : it) {
                    System.out.print("<" + tu._2 + "," + tu._1 + ">");
                }
                System.out.println();

            }
        });
    }

    private static void groupByKeyOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
        Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
        JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
        rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
                return new Tuple2<String, Integer>(tup._2, tup._1);
            }
        }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {
                //获取大区信息
                String area = tup._1;
                System.out.print(area + ":");
                Iterable<Integer> it = tup._2;
                for (Integer uid : it) {
                    System.out.print(uid + " ");
                }
                System.out.println();
            }
        });
    }

    private static void flatMapOp(JavaSparkContext sc) {
        JavaRDD<String> rdd = sc.parallelize(Arrays.asList("good good study", "day day up"));
        rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        }).foreach(new VoidFunction<String>() {
            @Override
            public void call(String word) throws Exception {
                System.out.println(word);
            }
        });
    }

    private static void filterOp(JavaSparkContext sc) {
        JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        rdd.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        }).foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
    }

    private static void mapOp(JavaSparkContext sc) {
        JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        rdd.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer * 2;
            }
        }).foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
    }

    public static JavaSparkContext getSparkContext() {
        SparkConf conf = new SparkConf();
        conf.setAppName("TransformationOpJava").setMaster("local");
        return new JavaSparkContext(conf);
    }


}

四、常用Action介绍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/137577.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python学生管理系统(pyqt5 含界面)

学生管理系统项目流程 项目模块 账号登陆 人脸识别 增添学生信息 删除学生信息 改动学生信息 查询学生信息 项目主体框架 app.py为主代码&#xff0c;负责用于界面打开展示。img文件夹负责放置项目qrc的图像page文件夹为单独页面的类plugin文件夹为功能模块的类ui文件夹…

机器人中的数值优化|【一】数值优化基础

数值优化基础 凸集 Convex Sets 凸集的定义 令X是线性空间。如果对于X的子集S中的所有x和y&#xff0c;并且在区间 [0,1]中的所有t&#xff0c;点 (1−t)xty(1-t)x ty(1−t)xty也属于S&#xff0c;则S称为凸集。 不失一般性&#xff0c;对于所有的凸集&#xff0c;其线性组…

Zookeeper详解(二)——API 事件监听

Java API znode是zooKeeper集合的核心组件&#xff0c;zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。 客户端应该遵循以下步骤&#xff0c;与zookeeper服务器进行清晰和干净的交互。 连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID。…

uniapp 之 接入小程序客服

目录 前言 小程序客服 代码只需要一步 配置也需要一步​​​​​​​ 前言 小程序客服 因老大 看到别人家有在线客服这个功能&#xff0c;就让我也做一个&#xff0c;这个功能很简单 效果图1 代码只需要一步 <button type"default" open-type"con…

MATLAB | 绘图复刻(六) | 分组环形热图

有粉丝问我Ecology Letters, (2021) 24: 1018–1028 Soil carbon persistence governed by plant input and mineral protection at regional and global scales 这篇文章中的Figure 2咋画&#xff0c;原图长这样&#xff1a; 复刻效果&#xff1a; 完整步骤 0 数据定义 按…

node.js创建网站实例1

1.node.js安装 我的电脑环境&#xff1a;win10 网址&#xff1a;https://nodejs.org/en/ 我下载了18.12.1版本 一路next默认安装&#xff0c;安装完成后&#xff0c;运行cmd&#xff0c;查看版本号 会同时安装npm&#xff0c;也可以同时查看版本号 2.创建第一个网站实例hell…

内卷对于2022是一种无奈,也是一种修行

其实我们谁也不知道2023年对于我们普通的开发人员来说会有什么样的试炼&#xff0c;因为2022年身边有太多的人&#xff0c;为了工作&#xff0c;为了生活&#xff0c;为了家庭&#xff0c;为了理想&#xff0c;不得不选择走向别人看似很卷的那条路。 对于我们周围的人来说&…

【Vim】基本操作及命令集详解

概述 Vim 是从 vi 发展出来的一个文本编辑器。vi 内置在Linux系统中&#xff0c;是vim的简化版编辑器&#xff0c;vim则需要进行安装使用。Vim代码补全、编译及错误跳转等方便编程的功能特别丰富&#xff0c;可以实现高效率移动和高效的输入&#xff0c;在程序员中被广泛使用。…

CPT203-Software Engineering(3)

文章目录9. Software Design9.1 Architecture Design9.1.1 Architectural patterns9.2 Component-level Design9.2.1 Component9.2.2 Views of component9.2.3 Component-level design process9.3 User Interface Design9.3.1 Interface Design Process9.3.2 Interface Design …

蓝桥杯Python练习题16-最大最小公倍数

资源限制   内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述   已知一个正整数N&#xff0c;问从1~N中任选出三个数&#xff0c;他们的最小公倍数最大可以为多少。 输入格式   输入一…

三维数学(一)

视频教程&#xff1a;https://www.bilibili.com/video/BV12s411g7gU?p155 向量 一个数字列表&#xff0c;表示各个维度上的有向位移&#xff1b;同时也是一个有大小有方向的物理量&#xff0c;大小及向量的模长&#xff0c;而方向即空间中向量的指向&#xff0c;可以表示物体…

TikTok Shop 越南站点收入已达Lazada 的 80%

让我们一起来看看今日都有哪些新鲜事吧&#xff01;01 TikTok Shop 越南站点收入已达Lazada 的 80% 据越南电商平台数据分析软件Metric.vn 统计&#xff0c;Shopee、Lazada、Tiki 和 Sendo 仍然主导着越南电子商务市场&#xff0c;1-11 月&#xff0c;共销售了 13 亿件产品。其…

简化开发小技巧-Mybatis-Plus的使用和常用操作

目录 简介 快速使用 pom 代码 mapper service 使用 常用操作 简单或操作查询 多条件或查询 更新字段为null 方法一&#xff0c;如果要更新的字段是String类型&#xff0c; 方法二&#xff0c; 使用mybatis-plus的字段注入。 方法三&#xff0c;使用UpdateWrapper…

基于R的Bilibili视频数据建模及分析——预处理篇

基于R的Bilibili视频数据建模及分析——预处理篇 文章目录基于R的Bilibili视频数据建模及分析——预处理篇0、写在前面1、项目介绍1.1 项目背景1.2 数据来源1.3 数据集展示2、数据预处理2.1 删除空数据2.2 增加id字段2.3 处理数值字段3、参考资料0、写在前面 实验环境 Python版…

Stable Diffusion背后原理(Latent Diffusion Models)

前言 2023年第一篇博客&#xff0c;大家新年好呀~ 这次来关注一下Stable Diffusion背后的原理&#xff0c;即 High-Resolution Image Synthesis with Latent Diffusion Models 这篇论文。 之前关注的那些工作只能工作到 256256256 \times 256256256 像素(resize成这个后才输…

设计模式简介

一、设计模式简介 编写软件过程中&#xff0c;程序员面临着来自耦合性&#xff0c;内聚性 以及可维护性&#xff0c;可扩展性&#xff0c;重用性&#xff0c;灵活性等多方面的挑战&#xff0c;设计模式是为了让程序&#xff08;软件&#xff09;&#xff0c;具有更好的&#xf…

04SpringCloudAlibaba服务注册中心—Consul

目录 Consul简介 Consul是什么What is Consul? | Consul by HashiCorp Consul能做什么 Consul下载&#xff1a;Downloads | Consul by HashiCorp Consul使用&#xff1a;Spring Cloud Consul 中文文档 参考手册 中文版 安装并运行Consul 1、官网安装说明&#xff1a;In…

开发板测试手册——系统启动、文件传送操作步骤详解(1)

目 录 前 言 4 1 评估板快速测试 5 1.1 系统启动测试 5 1.2 文件传送测试 11 1.2.1 通过 Linux 系统启动卡 11 1.2.2 通过 OpenSSH 12 1.3 LED 测试 15 1.4 KEY 测试 15 1.5 DDR 读写测试 16 1.6 SD 卡读写测试 17 1.7 eMMC 读写测试 18 前 言 本指导文档适用开发…

2022 CSDN年度报告已出炉

2022年已过&#xff0c;我们迎来了2023年&#xff0c;那么在2022年&#xff0c;你在CSDN平台都做了些什么&#xff0c;收获了什么呢&#xff1f;2022 CSDN年度报告已出炉&#xff0c;来看看你的2022年度报告吧。 点此查看2022年度报告 或扫码查看你的2022 CSDN年度报告哦&…

23种设计模式(三)——策略模式【组件协作】

文章目录意图什么时候使用策略真实世界类比策略模式的实现策略模式的优缺点亦称&#xff1a;Strategy 意图 定义了一组策略&#xff0c;分别在不同类中封装起来&#xff0c;每种策略都可以根据当前场景相互替换&#xff0c;从而使策略的变化可以独立于操作者。比如我们要去某个…