2023_Spark_实验十一:RDD基础算子操作

news2024/12/29 16:41:00

一、RDD的练习可以使用两种方式

  1. 使用Shell
  2. 使用IDEA

二、使用Shell练习RDD

当你打开 Spark 的交互式命令行界面(也就是 Spark shell)的时候,它已经自动为你准备好了一个叫做 sc 的特殊对象,这个对象是用来和 Spark 集群沟通的。你不需要,也不应该自己再创建一个这样的对象。

如果你想告诉 Spark 用哪个计算机或者计算机集群来执行你的命令,可以通过 --master 这个选项来设置。比如,你想在本地计算机上只用四个核心来运行,就可以在命令里加上 --master local[4]

$ ./bin/spark-shell --master local[4]

如果你有一些自己的代码打包成了 JAR 文件,想要在 Spark shell 里用,可以通过 --jars 选项,后面跟上你的 JAR 文件名,用逗号分隔,来把它们加入到可以识别的路径里。

$ ./bin/spark-shell --master local[4] --jars code.jar

此外,如果你需要一些额外的库或者 Spark 的扩展包,可以通过 --packages 选项,后面跟上这些库的 Maven 坐标(一种常用的依赖管理方式),用逗号分隔,来添加它们。假设你需要的包是 org.apache.spark:spark-mllib_2.13:3.4.1,这是Spark的机器学习库。

$ ./bin/spark-shell --master local[4] --packages "org.apache.spark:spark-mllib_2.13:3.4.1"

简单来说,这些选项就是让你告诉 Spark 怎么运行你的代码,以及在哪里找到运行代码所需要的资源。

RDD基础

// 从array中创建RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.foreach(println)


// 读取文件创建RDD
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
println(totalLength)

// 数据持久化
lineLengths.persist()
print(lineLengths.reduce((a, b) => a + b))


// 对象的函数
object MyFunctions {
  def func1(s: String): String = { s"打印RDD中的字符串,包含的字符串有: $s" }
}
val myRdd = lines.flatMap(lines => lines.split(" "))
myRdd.map(MyFunctions.func1).foreach(println)


import org.apache.spark.rdd.RDD
// 类的函数
class MyClass extends Serializable {
  def func1(s: String): String = { f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s" }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

val f1 = new MyClass()
f1.doStuff(myRdd).foreach(println)


// 类的应用
class MyClass2 extends Serializable {
  val field = "你好,测试案例..."
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
val f2 = new MyClass2()
f2.doStuff(myRdd).foreach(println)

// Pair RDD应用
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
// 交换键和值的位置
val swappedCounts = counts.map(_.swap)
// 先根据值排序(降序),然后根据键排序(升序)
val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
val CountsDescondvalue = sortedByValueThenKeyDesc .map(_.swap)
CountsDescondvalue .collect()


// 广播变量 Broadcast Variables
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value

三、使用IDEA练习RDD

基于Spark3.4.1,IDEA练习基础的RDD

package test

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

/**
 * @projectName GNUSpark20204  
 * @package test  
 * @className test.RDD_spark341  
 * @description ${description}  
 * @author pblh123
 * @date 2024/9/26 23:08
 * @version 1.0
 *
 */
    
object RDD_spark341 extends App {

//  创建SparkSession sparkcontext
  val spark = SparkSession.builder
    .appName("RDD_spark341")
    .master("local[2]")
    .getOrCreate()
  val sc: SparkContext = spark.sparkContext

//  spark代码主体
  // 从array中创建RDD
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    distData.foreach(println)


    // 读取文件创建RDD
    val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
    val lineLengths = lines.map(s => s.length)
    val totalLength = lineLengths.reduce((a, b) => a + b)
    println(totalLength)

    // 数据持久化
    lineLengths.persist()
    print(lineLengths.reduce((a, b) => a + b))


    // 对象的函数
    object MyFunctions {
      def func1(s: String): String = {
        s"打印RDD中的字符串,包含的字符串有: $s"
      }
    }

    val myRdd = lines.flatMap(lines => lines.split(" "))
    myRdd.map(MyFunctions.func1).foreach(println)


    import org.apache.spark.rdd.RDD

    // 类的函数
    class MyClass extends Serializable {
      def func1(s: String): String = {
        f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s"
      }

      def doStuff(rdd: RDD[String]): RDD[String] = {
        rdd.map(func1)
      }
    }

    val f1 = new MyClass()
    f1.doStuff(myRdd).foreach(println)


    // 类的应用
    class MyClass2 extends Serializable {
      val field = "你好,测试案例..."

      def doStuff(rdd: RDD[String]): RDD[String] = {
        rdd.map(x => field + x)
      }
    }

    val f2 = new MyClass2()
    f2.doStuff(myRdd).foreach(println)

    // Pair RDD应用
    val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
    val counts = pairs.reduceByKey((a, b) => a + b)
    // 交换键和值的位置
    val swappedCounts = counts.map(_.swap)
    // 先根据值排序(降序),然后根据键排序(升序)
    val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
    val CountsDescondvalue = sortedByValueThenKeyDesc.map(_.swap)
    println(CountsDescondvalue.collect())


    // 广播变量 Broadcast Variables
    val broadcastVar = sc.broadcast(Array(1, 2, 3))
    println(broadcastVar.value)

    val accum = sc.longAccumulator("My Accumulator")
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
    println(accum.value)

//  关闭sparkSesssion sparkcontext
  sc.stop()
  spark.stop()

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2170041.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

女性在网络安全行业崛起,引领行业新风向

1、网络安全自诞生之日起,就与女性有着不解之缘。 ●二战期间,美国雇佣了1万名女性作为“密码女孩”来破译日本人和德国人发送的密信。 ●英国同样雇用了7000多名女性在英国密码分析中心工作,约占全部工作人员的四分之三。 ●世界上的第一…

108.游戏安全项目:信息显示二-剑侠情缘基址分析

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于:易道云信息技术研究院 本人写的内容纯属胡编乱造,全都是合成造假,仅仅只是为了娱乐,请不要盲目相信…

spring-boot web + vue

依赖的软件 maven 1. 官网下载zip 文件,比如apache-maven-3.9.9-bin.zip 2. 解压到某个盘符,必须保证父亲目录的名字包含英文,数字,破折号(-) 3. 设置环境变量M2_HOME, 并将%M2_HOME%\bin添加到windown…

openpnp - 散料飞达不要想着做万能版本,能够贴合现有的物料就好

文章目录 openpnp - 散料飞达不要想着做万能版本,能够贴合现有的物料就好概述笔记天真的版本改进的版本物料编带标准物料编带的样式对于散料飞达关心的尺寸不同编带宽度的散料飞达关键尺寸的列表8mm物料编带12mm物料编带16mm物料编带24mm物料编带32mm物料编带44mm物…

【Linux】环境变量(初步认识环境变量)

文章目录 1. 环境变量1.1 基本概念 2. 认识常见环境变量2.1 PATH2.2 HOME2.3 SHELL2.4 PWD2.5 USER 3. 理解环境变量 1. 环境变量 在main函数的命令行参数中,有argc、argv、env三个参数。 argc:命令函参数的个数argc:存放每个参数的具体数值…

FPGA学习(1)-mux2,2选1多路器

目录 1 开发板配套资料 1.1学习网址和资料网址 2.创建工程文件 2.1创建过程 2.2写程序及仿真测试 2.2.1 写程序生成电路 2.2.2仿真 2.2.3 生成执行文件并烧录 3.实验现象 买的小梅哥店铺的开发板:xc7z020clg400 看的小梅哥的视频:03C _基于ZYN…

提取出散射矩阵归一化相位的含义

散射矩阵的值是从图像中获得的,相位角是距离导致的,所以要归一化,VV/HH VV幅度/HH幅度。 VV相位-HH相位

Java-数据结构-Map与Set-(一) ٩(๑>◡<๑)۶

文本目录: ❄️一、搜索树: ☑ 1、概念: ☑ 2、操作-插入: 代码: ☑ 3、操作-查看: 代码: ☑ 4、操作-删除: 代码: ☑ 5、性能分析: ❄️二、搜索&#…

如何在Ubuntu上查看和刷新DNS缓存

DNS缓存是用于DNS查找的临时存储系统,负责将域名转换为IP地址。进行DNS查询时,系统会检查缓存中的相关信息。如果找到了,那么它会加速域名解析的过程。如果DNS缓存中的数据过时或不正确,则需刷新它以确保使用正确的信息。本文主要…

自己掏耳朵怎么弄干净?双十一必买的四大可视挖耳勺分享

我们在掏耳朵时是不是老是觉得要么掏不干净,要么太进去了弄到痛耳朵。因为耳道属于我们一个盲区,在使用棉签或者普通耳勺容易因为操作不当弄伤耳膜。可能还会照成不可逆的后果。所以自己在掏耳勺更加推荐大家使用可视挖耳勺会更加干净和安全。那么&#…

【MATLAB代码】二维环境下的RSSI定位程序,自适应锚点数量,带图像输出、坐标输出、中文注释

程序描述 MATLAB编写的RSSI定位程序,自适应锚点数量,带图像输出、坐标输出、中文注释。 功能概述: 本程序实现了在二维平面上通过接收信号强度指示(RSSI)进行定位的功能。它使用多个锚节点的信号强度测量来估计未知…

CSS链接

链接是网站的重要组成部分,几乎在每个网页上都能看到不少的链接,合理的设计链接的样式能够给网页的颜值加分。链接有四种不同的状态,分别是 link、visited、active 和 hover,可以通过以下伪类选择器来为链接的四种状态设置不同的样…

CentOS8使用chrony同步网络时间

文章目录 引言I CentOS8使用chrony网络时间同步安装chrony配置间同步服务器地址检查本机的时区设置时区chronyc命令II windows网络时间同步2.1 修改同步服务器2.2 修改同步频率引言 应用场景: 获取服务器时间进行船舶在线率统计 dtos.forEach(item -> {if(item.getDwtime(…

红绿灯倒计时读秒数字识别系统源码分享

红绿灯倒计时读秒数字识别检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of …

数据结构——初识树和二叉树

线性结构是一对一的关系,意思就是只有唯一的前驱和唯一的后继; 非线性结构,如树形结构,它可以有多个后继,但只有一个前驱;图形结构,它可以有多个前驱,也可以有多个后继。 树的定义…

kubeadm部署k8s集群,版本1.23.6;并设置calico网络BGP模式通信,版本v3.25--未完待续

1.集群环境创建 三台虚拟机,一台master节点,两台node节点 (根据官网我们知道k8s 1.24版本之后就需要额外地安装cri-dockerd作为桥接才能使用Docker Egine。经过尝试1.24后的版本麻烦事很多,所以此处我们选择1.23.6版本) 虚拟机环境创建参考…

【LeetCode】动态规划—63. 不同路径 II(附完整Python/C++代码)

动态规划—63. 不同路径 II 前言题目描述基本思路1. 问题定义:2. 理解问题和递推关系:3. 解决方法:3.1 动态规划方法3.2 空间优化的动态规划 4. 进一步优化:5. 小总结: 代码实现Python3代码实现Python 代码解释C代码实现C 代码解释 总结: 前言 本文将探讨“不同路径 II”这一问…

线性跟踪微分器TD详细测试(Simulink 算法框图+CODESYS ST+博途SCL完整源代码)

1、ADRC线性跟踪微分器 ADRC线性跟踪微分器(ST+SCL语言)_adrc算法在博途编程中scl语言-CSDN博客文章浏览阅读784次。本文介绍了ADRC线性跟踪微分器的算法和源代码,包括在SMART PLC和H5U平台上的实现。文章提供了ST和SCL语言的详细代码,并讨论了跟踪微分器在自动控制中的作用…

深入理解 Nuxt.js 中的 app:error 钩子

title: 深入理解 Nuxt.js 中的 app:error 钩子 date: 2024/9/27 updated: 2024/9/27 author: cmdragon excerpt: 摘要:本文深入讲解了Nuxt.js框架中的app:error钩子,介绍其在处理web应用中致命错误的重要作用、使用方法及实际应用场景。通过创建Nuxt项目、定义插件、触发…

基于nodejs+vue的水产品销售管理系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏:Java精选实战项目…