Spark 核心编程

news2025/1/25 7:33:02

文章目录

  • Spark 核心编程
    • 一、RDD
      • 1、分布式计算模拟
        • (1) 搭建基础的架子
        • (2) 客户端向服务器发送计算任务

Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
1)RDD:弹性分布式数据集
2)累加器:分布式共享只写变量
3)广播变量:分布式共享只读变量
接下来让我们看看这三大数据结构是如何数据处理中使用的

一、RDD

1、分布式计算模拟

(1) 搭建基础的架子

首先分为两部分,我们把Excuter当成服务器,把Driver当成客户端。然后用客户端去连接服务器,然后客户端发送数据给服务器。
Excuter (服务器)
第一步设置服务器的端口号,ServerScket(9998)方法,里面的参数是端口号,这可以随便写。然后第二步等待客户端发送数据过来accept()方法。然后第三步使用getInputStream输入流接收客户端发送过来的数据,使用输入流的read()方法,这个就是从客户端拿到的数据,然后把这个数据给输出。最后把输出流,数据等待,还有服务器依次都给关闭。

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.InputStream
import java.net.{ServerSocket, Socket}

//这个是做计算准备的,主要是逻辑代码部分
//这个相当于是服务器,然后Driver相当于是客户端,客户端连接服务器就可以直接使用了
class Excuter {

}
object Excuter{
  def main(args: Array[String]): Unit = {
    //启动服务器,接收数据 这个端口号是随便写的
    val server = new ServerSocket(9998) //这个是网络编程的
    println("服务器启动,等待接收数据")

    //等待客户端的链接
    val client: Socket = server.accept() //等待客户端发送过来的数据,accept()方法
    val in: InputStream = client.getInputStream //输入流接收数据
    val i = in.read() //这个就是拿到的值
    println("接收到客户端发送的数据:" + i) //把客户端拿到的数据给输出

    in.close()  //把输入流给关闭掉
    client.close()
    server.close() //把服务器给关闭掉

  }
}

在这里插入图片描述
Driver (客户端)
首先客户端连接服务器的端口号Socket("localhost",9998)方法,第一个参数是连接方式,这里是本地连接,第二个参数是服务器的端口号。然后第二步就向服务器发送数据,getOutputStream方法输出流,然后使用输出流的write()方法写出数据。然后使用输出流的flush()方法,flush方法的作用是,刷新此输出流并强制写出所有缓冲的输出字节。然后用完之后就把输出流和客户端给关闭了。

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.OutputStream
import java.net.Socket

//这个是用来执行程序的
class Driver {

}
object Driver{
  def main(args: Array[String]): Unit = {
    //连接服务器 本地连接,然后第二个参数是服务器定义的端口号
    val client = new Socket("localhost",9998) //这个相当于是是客户端,连接服务器
    val out: OutputStream = client.getOutputStream //向服务器发东西,用getOutputStream()
    out.write(2)
    out.flush()

    out.close() //用完了吧这个输出流给关掉
    client.close() //然后把这个客户端也关掉
  }
}

(2) 客户端向服务器发送计算任务

Excuter 类里面是服务器,Driver是客户端,Task 里面是准备数据和逻辑操作的,那个Driver 里面创建一个Task 对象然后把Task 用ObjectOutputstream 输出流把对象给输出到Excuter接收,接收也是使用ObjectIntputstream 对象输入流进行接收,因为输出的是一个操作逻辑,用字节流接收肯定不对,所有要用对象。然后Excuter 拿到Task之后,就可以直接使用里面的函数了。Task里面要混入Serializable 特质,因为在网络中肯定是无法直接传送一个对象过去的,所以要进行序列化。
在这里插入图片描述7
Excuter 代码:

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

//这个是做计算准备的,主要是逻辑代码部分
//这个相当于是服务器,然后Driver相当于是客户端,客户端连接服务器就可以直接使用了
class Excuter {

}
object Excuter{ //要混入序列化的特征,不然不能那个传一个对象过去
  def main(args: Array[String]): Unit = {
    //启动服务器,接收数据 这个端口号是随便写的
    val server = new ServerSocket(9998) //这个是网络编程的
    println("服务器启动,等待接收数据")

    //等待客户端的链接
    val client: Socket = server.accept() //等待客户端发送过来的数据,accept()方法
    val in: InputStream = client.getInputStream //输入流接收数据
    val objin: ObjectInputStream = new ObjectInputStream(in) //输出流失obj那么接收也应该是obj
    val task: Task = objin.readObject().asInstanceOf[Task] //这个就是拿到的值 ,但是这里不应该是AnyRef,所以要进行转换
    val ints = task.compute() //上面已经拿到了传过来的操作了,所以可以直接使用里面定义的函数了
    println("计算节点的计算结果为:" + ints) //把客户端拿到的数据给输出

    objin.close()  //把输入流给关闭掉
    client.close()
    server.close() //把服务器给关闭掉

  }
}

Driver 代码:

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

//这个是用来执行程序的
class Driver {

}
object Driver {
  def main(args: Array[String]): Unit = {
    //连接服务器 本地连接,然后第二个参数是服务器定义的端口号
    val client = new Socket("localhost",9998) //这个相当于是是客户端,连接服务器
    val out: OutputStream = client.getOutputStream //向服务器发东西,用getOutputStream()

    val objout = new ObjectOutputStream(out) //定义这个Object的输出,因为上面那个是输出字节的不能传输对象

    val task:Task = new Task() //然后创建一个task
    objout.writeObject(task) //把task 传入给objout 对象输出流
    objout.flush()

    objout.close() //用完了吧这个输出流给关掉
    client.close() //然后把这个客户端也关掉
    println("客户端发送数据完毕")
  }
}

Task 代码:

package com.atguigu.bigdata.spark.core.wc.test2

class Task extends Serializable { //要混入序列化的特征,不然不能那个传一个对象过去
  val datas = List(1,2,3,4)  //这个是数据

  val logic = (num:Int) => {num * 2} //匿名函数  这个是逻辑

  //计算
  def compute() = {
    datas.map(logic)  //莫logic 上面定义的逻辑操作传入进去

  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/168916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构与算法理论知识点】1.1基本概念

1.1基本概念 为什么要学习数据结构与算法? AlgorithmsData StructuresPrograms---- Niklaus Wirth ( Pascal程序设计语言之父、结构化程序设计首创者、图灵奖获得者) 计算机程序:使用计算机求解问题算法是求解问题的步骤的描述:从蛮力到策…

套接字编程(二)UDP服务端与客户端的通信模拟实现

目录 一、前言 二、UDP客户端流程信息 1、创建套接字 2、为套接字绑定地址信息(不推荐) 3、发送数据(将数据放入发送缓冲区中) 4、接收数据(从socket结构体接收缓冲区中取出数据) 5、关闭套接字 三…

机器学习基本概念及问题梳理

前言:整理西瓜书第一、二章中的基本概念 待办:第二章评估方法、性能度量及后续内容未整理 下图梳理机器学习中部分概念 模型评估与选择相关知识点: 错误率(error rate, E):如果在m个样本中有a个样本分类…

WordPress安全指南:19个步骤让您的WordPress安全防线坚如磐石

谈到WordPress安全性,您可以采取很多措施来锁定您的网站,以防止黑客和漏洞影响您的电子商务网站或博客。您最不想发生的事情是一天早上醒来发现您的网站一团糟。因此,今天我们将分享许多技巧、策略和技术,您可以使用这些技巧、策略…

WEBSHELL管理工具流量特征——基础篇

前言 前一阵子帮别人做取证题目,有很多关于WEBSHELL的流量要分析,想起来还有没好好分析过于是准备写篇文章总结一下帮助大家能够快速的辨别WEBSHELL流量,下面我们展开文章来讲。 中国菜刀 这个应该是大家最熟悉的WEBSHELL管理工具&#xf…

NeuRay学习笔记

Neural Rays for Occlusion-aware Image-based Rendering 主页:https://liuyuan-pal.github.io/NeuRay/ 论文:https://arxiv.org/abs/2107.13421 Code:https://github.com/liuyuan-pal/NeuRay 效果: desktop摘要 We present a ne…

一文读懂 UniProt 数据库(2023 最新版)

一、UniProt 数据库介绍 Uniprot (Universal Protein )是包含蛋白质序列,功能信息,研究论文索引的蛋白质数据库,整合了包括EBI( European Bioinformatics Institute),SIB&#xff0…

【面试题】前端最新面试题-浏览器 dom、bom篇

原文见:语雀(https://www.yuque.com/deepstates/interview/fsitlt) ● BOM ● window对象 ○ frames ■ iframe ■ 跨窗口通信 ■ 同源策略/跨域 ○ navigator ● DOM ○ DOM结构 ○ DOM操作 ○ DOM事件 ■ 表单事件 ● 浏览器渲染 ○ 进程、…

Vue组件化编程的组件通信

对于组件化编程,组件之间的通信技术无疑是非常重要的内容,需要将细节牢牢把握。 组件通信,就是子组件放置在父组件内之后,父组件如何向子组件传递参数以及子组件如何与外部组件进行互动。 这部分的知识很重要,需要展开…

基于Ubuntu20.04搭建OpenHarmony v3.0.6的qemu仿真环境

基于Ubuntu20.04搭建OpenHarmony v3.0.6的qemu仿真环境0. 前言1. 安装Ubuntu1.1 更换华为源1.2 安装必要工具2. 下载代码2.1 解压与目录设置3. 配置环境3.1 安装库和工具3.2 设置python版本3.3 安装编译工具hb3.4 切换dash为bash4. 编译4.1 hb构建4.2 启动qemu5. 第二种环境配置…

Java之日期与时间、JDK8新增日期类、包装类、正则表达式、Arrays类、常见算法和Lambda表达式

目录日期与时间DateSimpleDateFormatCalendar概述JDK8新增日期类概述、LocalTime /LocalDate / LocalDateTimeInstantDateTimeFormatterDuration/PeriodchronoUnit包装类正则表达式Arrays类Arrays类概述,常用功能演示Arrays类对于Comparator比较器的支持常见算法选择…

[论文翻译] GIKT: A Graph-based Interaction Model forKnowledge Tracing

摘要随着在线教育的快速发展,知识追踪(KT)已成为追踪学生知识状态并预测他们在新问题上的表现的基本问题。在线教育系统中的问题通常很多,并且总是与更少的技能相关联。然而,以往的文献未能将问题信息与高阶问题-技能相…

计网必会:电路交换和分组交换

电路交换和分组交换的概念和区别,为什么分组交换更有效? 电路交换:由于电路交换在通信之前要在通信双方之间建立一条被双方独占的物理通路(由通信双方之间的交换设备和链路逐段连接而成) 特点是源和目标点建立起名副其…

C语言中的void*是什么?

目录1.void *是什么2.void*的解引用3.void*类型的应用场景1.void *是什么 我们之前学过许多类型的指针变量,如整形指针,字符指针,甚至数组指针,函数指针等。 int a 10; int *pa &a;//整形指针pa接受一个整形变量a的地址但…

阻塞队列-BlockingQueue

一、BlockingQueue介绍BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:支持阻塞…

Win10 每天蓝屏多次,蓝屏代码0x3B:SYSTEM_SERVICE_EXCEPTION

环境: 联想E14笔记本 Win10 专业版 问题描述: Win10 每天发生蓝屏多次,蓝屏代码0x3B:SYSTEM_SERVICE_EXCEPTION 查看事件查看器,系统日志筛选ID1001的事件,蓝屏多次基本上都是3B这错误代码 解决方案: 1.禁用AMD显…

vue3 pinia 状态管理(清晰明了)

前言 最近学习cloud项目,前端使用到 vue3 ts 等技术,其中包括 pinia ,从一脸懵到渐渐清晰过程,在此记录一下,若有不足,希望大佬可以指出。 中文官方文档:https://pinia.web3doc.top/ 一、什…

2022年海南省职业院校技能大赛“网络安全”比赛任务书

2022年海南省职业院校技能大赛“网络安全” 比赛任务书 一、竞赛时间 总计:360分钟 二、竞赛任务书内容 (一)拓扑图 (二)A模块基础设施设置/安全加固(350分) 一、项目和任务描述&#xff…

服务器怎么防勒索病毒

行业背景 随着金融行业信息化建设的飞速发展,金融行业信息化系统经过多年的发展建设,目前信息化程度已经达到了较高水平。信息技术在提高管理水平、促进业务创新、提升企业竞争力方面发挥着日益重要的作用。 需求分析 随着金融信息化的深入发展&#…

Linux调试器-gdb使用

目录 1. 背景 2. 开始使用 3. 理解 创建需要调试的代码 debug&&release 4 详细调试 list/l 行号 list/l 函数名 r或run break(b) info b(reak) d num disable breakpoints enable breakpoints n (next) s(step) breaktrac…