Spark-java版

news2025/1/20 22:41:10

SparkContext初始化

相关知识
  • SparkConf 是SparkContext的构造参数,储存着Spark相关的配置信息,且必须指定Master(比如Local)和AppName(应用名称),否则会抛出异常;
  • SparkContext 是程序执行的入口,一个SparkContext代表一个 Application
初始化过程的主要核心:
  1. 依据SparkConf创建一个Spark执行环境SparkEnv
  2. 创建并初始化Spark UI,方便用户监控,默认端口为 4040
  3. 设置Hadoop相关配置及Executor环境变量;
  4. 创建和启动TaskSchedulerDAGScheduler
初始化方式:
  1. SparkConf conf = new SparkConf().setAppName(appName).setMaster(master)
  2. JavaSparkContext sc=new JavaSparkContext(conf)

程序运行完后需使用sc.stop()关闭SparkContext
 

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import java.util.List;

public class Edu {
    public static void main(String[] args) {
        /********** Begin **********/
        //第一步:设置SparkConf
        SparkConf conf = new SparkConf().setAppName("educoder").setMaster("local");
        //第二步:初始化SparkContext
       JavaSparkContext sc = new JavaSparkContext(conf);
        /********** End **********/
        
        List<String> data = Arrays.asList("hello");
        JavaRDD<String> r1 = sc.parallelize(data);
        System.out.print(r1.collect());
        
		/********** Begin **********/
        //第三步:关闭SparkContext
       sc.stop();
        /********** End **********/

    }
}

集合并行化创建RDD

任务描述

本关任务:计算并输出各个学生的总成绩。

相关知识

为了完成本关任务,你需要掌握:1.集合并行化创建RDD,2.reduceByKey

集合创建RDD

Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD
 

ArrayList<Integer> list = new ArrayList<Integer>();
list.add(1);
list.add(2);
list.add(3);
  1. JavaRDD<Integer> rdd = sc.parallelize(list,3);//参数1:Seq集合,必须。参数2:分区数,默认为该Application分配到的资源的CPU核数
  2. Integer sum = rdd.reduce((a, b) -> a + b);
  3. System.out.print(sum);

输出:6

reduceByKey()

对元素为RDD[K,V]对的RDDKey相同的元素的Value进行聚合。

List<Tuple2<String,Integer>> list = Arrays.asList(new Tuple2("hive",2),new Tuple2("spark",4),new Tuple2("hive",1));
JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);
List<Tuple2<String, Integer>> result = listRDD.reduceByKey((x, y) -> x + y).collect();

输出: (spark,4) (hive,3)

collect() :以数组的形式返回RDD中的所有元素,收集分布在各个worker的数据到driver节点。

编程要求

根据提示,在右侧编辑器begin-end处补充代码,计算并输出各个学生的总成绩。

  • ("bj",88): bj指学生姓名,88指学生成绩。
测试说明

平台会对你编写的代码进行测试:

预期输出: (bj,254) (sh,221) (gz,285)
 

package step1;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.*;
public class JStudent {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("JStudent");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<String,Integer>> list = Arrays.asList(
             new Tuple2("bj",88),new Tuple2("sh",67),
             new Tuple2("gz",92),new Tuple2("bj",94),
             new Tuple2("sh",85),new Tuple2("gz",95),
             new Tuple2("bj",72),new Tuple2("sh",69),
             new Tuple2("gz",98));
        //第一步:创建RDD
        JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);
        //第二步:把相同key的进行聚合
        JavaPairRDD<String, Integer> result = listRDD.reduceByKey((x, y) -> x + y);
        //第三步:收集
        List<Tuple2<String, Integer>> collect = result.collect();
        //第四步:输出
        for (Tuple2 c:collect){
            System.out.println(c);
        }
        sc.stop();
    }
}

读取外部数据集创建RDD
 

任务描述

本关任务:读取文本文件,按照文本中数据,输出老师及其出现次数。

相关知识

为了完成本关任务,你需要掌握:1.读取文件创建RDD,2.本关所需算子。

读取文件

textFile()

JavaRDD<String> rdd = sc.textFile("/home/student.txt")//文件路径
算子

(1)mapToPair:此函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象

ArrayList<Integer> list = new ArrayList<Integer>();
list.add(1);
list.add(2);
list.add(3);
JavaRDD<Integer> rdd = sc.parallelize(list);
JavaPairRDD<Integer,String> result = rdd.mapToPair(x -> new Tuple2(x,1)

输出:(1,1)(2,1)(3,1)

(2) reduceByKey() :对元素为RDD[K,V]对的RDDKey相同的元素的Value进行聚合

List<Tuple2<String,Integer>> list = Arrays.asList(new Tuple2("hive",2),new Tuple2("spark",4),new Tuple2("hive",1));
JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);
List<Tuple2<String, Integer>> result = listRDD.reduceByKey((x, y) -> x + y).collect();

输出: (spark,5) (hive,3)

编程要求

根据提示,在右侧编辑器begin-end处补充代码,输出老师姓名和出现次数。

  • 输入文件样例:

bigdata,laozhang bigdata,laoduan javaee,xiaoxu

bigdata指科目,laozhang指老师名称。

预期输出: (laoliu,1) (laoli,3) (laoduan,5) (laozhang,2) (laozhao,15) (laoyang,9) (xiaoxu,4)
 

package step2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class JTeachers {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("JTeachers");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String dataFile = "file:///root/step2_files";
              //第一步:以外部文件方式创建RDD
        JavaRDD<String> teaRDD = sc.textFile(dataFile);
        //String name = line.split(",")[1];
        //第二步:将文件中每行的数据切分,得到自己想要的返回值
        Integer one = 1;
        JavaPairRDD<String, Integer> teacher = teaRDD.mapToPair(line ->{
            String names = line.split(",")[1];
            Tuple2<String, Integer> t2 = new Tuple2<>(names, one);
            return t2;
        });
        //第三步:将相同的key进行聚合
        JavaPairRDD<String, Integer> tea = teacher.reduceByKey((x, y) -> x + y);
        //第四步:将结果收集起来
        List<Tuple2<String, Integer>> result = tea.collect();
        //第五步:输出
        for (Tuple2 t:result){
            System.out.println(t);
        }
        sc.stop();
    }
}

map算子完成转换操作

相关知识

为了完成本关任务,你需要掌握:如何使用map算子。

map

将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。

图中每个方框表示一个RDD分区,左侧的分区经过自定义函数f:T->U映射为右侧的新RDD分区。但是,实际只有等到Action算子触发后,这个f函数才会和其他函数在一个Stage中对数据进行运算。

map 案例
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
System.out.println("init:" + list);
JavaRDD<Integer> rdd = sc.parallelize(list);
JavaRDD<Integer> map = rdd.map(x -> x * 2);
System.out.println("result :" + map.collect());

输出:

init :[1, 2, 3, 4, 5, 6] result :[2, 4, 6, 8, 10, 12]

说明:rdd1的元素(1 , 2 , 3 , 4 , 5 , 6)经过map算子(x -> x*2)转换成了rdd2(2 , 4 , 6 , 8 , 10)

编程要求

根据提示,在右侧编辑器begin-end处补充代码,完成以下需求:

需求1:使用map算子,将rdd的数据(1, 2, 3, 4, 5)按照下面的规则进行转换操作,规则如下:

  • 偶数转换成该数的平方;

  • 奇数转换成该数的立方。

需求2:使用map算子,将rdd的数据("dog", "salmon", "salmon", "rat", "elephant")按照下面的规则进行转换操作,规则如下:

  • 将字符串与该字符串的长度组合成一个元组,例如
  1. dog --> (dog,3)
  2. salmon --> (salmon,6)
    package net.educoder;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    import java.util.Arrays;
    import java.util.List;
    public class Step1 {
        private static SparkConf conf;
        private static JavaSparkContext sc;
        static {
            conf = new SparkConf().setAppName("Step1").setMaster("local");
            sc = new JavaSparkContext(conf);
        }
        /**
         * 返回JavaRDD
         *
         * @return JavaRDD
         */
        public static JavaRDD<Integer> MapRdd() {
            List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
            JavaRDD<Integer> rdd = sc.parallelize(list);
            /**
             *
             * 需求:使用map算子,将rdd的数据进行转换操作
             * 规则如下:
             *      偶数转换成该数的平方
             *      奇数转换成该数的立方
             *
             */
            /********** begin ***********/
            JavaRDD<Integer> map = rdd.map(num -> {
                if (num % 2 == 0) {
                    return num * num;
                } else {
                    return num * num * num;
                }
            });
            return map;
            /********** end ***********/
        }
        /**
         * 返回JavaRDD
         *
         * @return JavaRDD
         */
        public static JavaRDD<Tuple2> MapRdd2() {
            List<String> list = Arrays.asList("dog", "salmon", "salmon", "rat", "elephant");
            JavaRDD<String> rdd = sc.parallelize(list);
            /**
             *
             * 需求:使用map算子,将rdd的数据进行转换操作
             * 规则如下:
             *      将字符串与该字符串的长度组合成一个元组,例如:dog  -->  (dog,3),salmon   -->  (salmon,6)
             *
             */
           /********** begin ***********/
            JavaRDD<Tuple2> map = rdd.map(str -> {
                int i = str.length();
                return new Tuple2(str, i);
            });
            return map;
           /********** end ***********/
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1268973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Halcon的二维码姿态矫正

任务要求&#xff1a; 下图中的二维码进行校正。&#xff08;HALCON10.0自带例图&#xff0c;路径&#xff1a;“images/datacode/ecc200/ecc200_to_preprocess_001.png”&#xff09; 任务分析&#xff1a; 图中的二维码存在畸变&#xff0c;需对其进行透射变换。首先获得图…

Java EE 多线程

文章目录 1. 认识线程1.1 什么是进程1.2 什么是线程1.2.1. 线程是怎么做到的呢&#xff1f;1.2.2. 进程和线程的关系 1.3 多线程编程1.3.1. 第一个多线程程序1.3.2. 使用 jconsole 命令查看线程1.3.3. 实现 Runnable 接口&#xff0c;重写 run1.3.4. 继承 Thread 重写 run&…

机器学习:领域自适应学习

训练一个分类器是小问题 上难度 训练数据和测试数据不一致&#xff0c;比如训练数据是黑白的&#xff0c;测试时彩色的&#xff0c;结果准确率非常低。 训练数据和测试数据有点差距的时候&#xff0c;能不能效果也能好呢&#xff1f;这就用到了领域自使用domain adptation 用一…

Hive数据倾斜之:数据类型不一致导致的笛卡尔积

Hive数据倾斜之&#xff1a;数据类型不一致导致的笛卡尔积 目录 Hive数据倾斜之&#xff1a;数据类型不一致导致的笛卡尔积一、问题描述二、原因分析三、精度损失四、问题解决 一、问题描述 如果两张表的jion&#xff0c;关联键分布较均匀&#xff0c;没有明显的热点问题&…

【Android Jetpack】Room数据库

文章目录 引入EntitiesPrimary Key主键索引和唯一性对象之间的关系外键获取关联的Entity对象嵌套对象Data Access Objects&#xff08;DAOs&#xff09;使用Query注解的方法简单的查询带参数查询返回列的子集可被观察的查询 数据库迁移用法 引入 原始的SQLite有以下两个缺点: …

uniapp2023年微信小程序头像+昵称分别获取

1、DOM <view class"m-user"><view class"user-info"><!--头像 GO--><button class"avatar avatar-wrapper" open-type"chooseAvatar" chooseavatar"onChooseAvatar"slot"right"><im…

GPU - cuda 安装

GPU - cuda 安装 环境搭建安装 0.确认你的电脑上有英伟达显卡 通过winR输入:control /name Microsoft.DeviceManager打开显示适配器&#xff0c;能看到显卡即可。我的版本是 3060 驱动版本 31.0.15.4617 1.查看主机显卡驱动版本. 主机GPU驱动版本决定你的主机最高能支持到什…

技术前沿丨Teranode如何实现无限扩容

​​发表时间&#xff1a;2023年9月15日 BSV区块链协会的技术团队目前正在努力开发Teranode&#xff0c;这是一款比特币节点软件&#xff0c;其最终目标是实现比特币的无限扩容。然而&#xff0c;正如BSV区块链协会网络基础设施负责人Jake Jones在2023年6月举行的伦敦区块链大会…

k8s中Pod控制器简介,ReplicaSet、Deployment、HPA三种处理无状态pod应用的控制器介绍

目录 一.Pod控制器简介 二.ReplicaSet&#xff08;简写rs&#xff09; 1.简介 &#xff08;1&#xff09;主要功能 &#xff08;2&#xff09;rs较完整参数解释 2.创建和删除 &#xff08;1&#xff09;创建 &#xff08;2&#xff09;删除 3.扩容和缩容 &#xff08…

点击元素以外的事件监听

在项目中&#xff0c;我们经常会遇到需要监听目标元素以外的区域被点击或鼠标移入移出等需求。 例如下面我们有一个表格里面嵌套表单的组件 我希望点击n行的时候&#xff0c;n行的元素变成表单元素进行输入或者选择&#xff0c; 当我点击其他其他区域n行又会恢复成数据展示…

C语言二十一弹 --打印空心正方形

C语言实现打印空心正方形 思路&#xff1a;观察图中空心正方形&#xff0c;可知首行列和尾行列被黑色外框包裹&#xff0c;其它均为空。所以按观察打印即可。 总代码 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h>int main() {int n 0;while (scanf("%d&q…

2023年双十一报告(B站平台)

2023年双11购物节自10月24日开启预售&#xff0c;持续至11月13日落下帷幕。在购物狂欢期间&#xff0c;B站以更加成熟的姿态参战今年双11。 据B站官方数据显示&#xff0c;双11期间&#xff0c;B站带货GMV同比增长251%。其中视频带货GMV同比增长376%&#xff0c;直播带货GMV同…

Springboot 使用 阿里的 druid 连接池 启用 wall sql防火墙的情况下怎么支持多sql同时执行?

1、问题如上&#xff0c;看了不少网上的文章&#xff0c;在我这都不生效&#xff0c;网上主要的解决思路有两个。 第一个是&#xff1a;去掉配置文件中的 wall filter # 修改之前 spring.datasource.druid.filtersstat,wall,log4j# 修改之前 spring.datasource.druid.filte…

Python+requests+Jenkins接口自动化测试实例

在做功能测试的基础上&#xff0c;我平时也会用postman测试接口&#xff0c;不过postman只能测试一个一个接口&#xff0c;不能连贯起来&#xff0c;特别是我们公司的接口很多都是要用到token的&#xff0c;导致我每次测个需要登录的接口都要去获取到token&#xff0c;做了很多…

什么是主机安全,有什么作用?

当今数字化时代&#xff0c;网络安全威胁和风险日益突出&#xff0c;已成为企业面临的重大安全挑战。网络攻击者不断尝试利用各种技术和手段对企业网络资源进行探测和攻击&#xff0c;如&#xff1a;利用漏洞、木马、钓鱼、勒索等方式窃取数据、破坏系统、篡改信息。因此&#…

win10下安装 Anaconda + Cuda + Cudnn + Pycharm + Pytorch

1.安装Anaconda &#xff08;1-1&#xff09;下载Ananconda, Anaconda官网 选择windows版本&#xff1b; &#xff08;1-2&#xff09;安装Anaconda,一般选择【Just Me】 &#xff08;1-3&#xff09;建议不要装在C盘&#xff0c;后期多环境的python环境和各种库文件会占用很多…

Oracle 11g安装过程

文章目录 前言1.下载安装包2.安装2.1本地安装文件2.2 安装过程 3.查看是否安装成功3.1 查看oracle是否安装成功3.2 查看oracle服务 前言 本文仅用于记录亲自安装oracle的过程 1.下载安装包 官网地址&#xff1a; Oracle Database 11g Release 2 (11.2.0.1.0) 注意&#xff…

电脑开机提示“未正确启动”怎么办?

有时我们在打开电脑时&#xff0c;会出现蓝屏&#xff0c;并提示“电脑未正确启动”&#xff0c;那么&#xff0c;这该怎么办呢&#xff1f;下面我们就来了解一下。 方法一&#xff1a;执行系统还原 我们在上文中提到了Windows无法正确启动的问题可能是由于三方程序或者近期的…

web自动化之源selenium

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️宝剑锋从磨砺出&#xff0c;梅花香自苦寒来 &#x1f324;️ 什么是自动化以为什…

从源码解析Containerd容器启动流程

从源码解析Containerd容器启动流程 本文从源码的角度分析containerd容器启动流程以及相关功能的实现。 本篇containerd版本为v1.7.9。 更多文章访问 https://www.cyisme.top 本文从ctr run命令出发&#xff0c;分析containerd的容器启动流程。 ctr命令 查看文件cmd/ctr/comman…