文章目录
- Spark 核心编程
- 一、RDD
- 1、分布式计算模拟
- (1) 搭建基础的架子
- (2) 客户端向服务器发送计算任务
Spark 核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
1)RDD:弹性分布式数据集
2)累加器:分布式共享只写变量
3)广播变量:分布式共享只读变量
接下来让我们看看这三大数据结构是如何数据处理中使用的
一、RDD
1、分布式计算模拟
(1) 搭建基础的架子
首先分为两部分,我们把Excuter当成服务器,把Driver当成客户端。然后用客户端去连接服务器,然后客户端发送数据给服务器。
Excuter (服务器):
第一步设置服务器的端口号,ServerScket(9998)
方法,里面的参数是端口号,这可以随便写。然后第二步等待客户端发送数据过来accept()
方法。然后第三步使用getInputStream
输入流接收客户端发送过来的数据,使用输入流的read()
方法,这个就是从客户端拿到的数据,然后把这个数据给输出。最后把输出流,数据等待,还有服务器依次都给关闭。
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.InputStream
import java.net.{ServerSocket, Socket}
//这个是做计算准备的,主要是逻辑代码部分
//这个相当于是服务器,然后Driver相当于是客户端,客户端连接服务器就可以直接使用了
class Excuter {
}
object Excuter{
def main(args: Array[String]): Unit = {
//启动服务器,接收数据 这个端口号是随便写的
val server = new ServerSocket(9998) //这个是网络编程的
println("服务器启动,等待接收数据")
//等待客户端的链接
val client: Socket = server.accept() //等待客户端发送过来的数据,accept()方法
val in: InputStream = client.getInputStream //输入流接收数据
val i = in.read() //这个就是拿到的值
println("接收到客户端发送的数据:" + i) //把客户端拿到的数据给输出
in.close() //把输入流给关闭掉
client.close()
server.close() //把服务器给关闭掉
}
}
Driver (客户端):
首先客户端连接服务器的端口号Socket("localhost",9998)
方法,第一个参数是连接方式,这里是本地连接,第二个参数是服务器的端口号。然后第二步就向服务器发送数据,getOutputStream
方法输出流,然后使用输出流的write()
方法写出数据。然后使用输出流的flush()
方法,flush方法的作用是,刷新此输出流并强制写出所有缓冲的输出字节。然后用完之后就把输出流和客户端给关闭了。
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.OutputStream
import java.net.Socket
//这个是用来执行程序的
class Driver {
}
object Driver{
def main(args: Array[String]): Unit = {
//连接服务器 本地连接,然后第二个参数是服务器定义的端口号
val client = new Socket("localhost",9998) //这个相当于是是客户端,连接服务器
val out: OutputStream = client.getOutputStream //向服务器发东西,用getOutputStream()
out.write(2)
out.flush()
out.close() //用完了吧这个输出流给关掉
client.close() //然后把这个客户端也关掉
}
}
(2) 客户端向服务器发送计算任务
Excuter 类里面是服务器,Driver是客户端,Task 里面是准备数据和逻辑操作的,那个Driver 里面创建一个Task 对象然后把Task 用ObjectOutputstream
输出流把对象给输出到Excuter接收,接收也是使用ObjectIntputstream
对象输入流进行接收,因为输出的是一个操作逻辑,用字节流接收肯定不对,所有要用对象。然后Excuter 拿到Task之后,就可以直接使用里面的函数了。Task里面要混入Serializable
特质,因为在网络中肯定是无法直接传送一个对象过去的,所以要进行序列化。
7
Excuter 代码:
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
//这个是做计算准备的,主要是逻辑代码部分
//这个相当于是服务器,然后Driver相当于是客户端,客户端连接服务器就可以直接使用了
class Excuter {
}
object Excuter{ //要混入序列化的特征,不然不能那个传一个对象过去
def main(args: Array[String]): Unit = {
//启动服务器,接收数据 这个端口号是随便写的
val server = new ServerSocket(9998) //这个是网络编程的
println("服务器启动,等待接收数据")
//等待客户端的链接
val client: Socket = server.accept() //等待客户端发送过来的数据,accept()方法
val in: InputStream = client.getInputStream //输入流接收数据
val objin: ObjectInputStream = new ObjectInputStream(in) //输出流失obj那么接收也应该是obj
val task: Task = objin.readObject().asInstanceOf[Task] //这个就是拿到的值 ,但是这里不应该是AnyRef,所以要进行转换
val ints = task.compute() //上面已经拿到了传过来的操作了,所以可以直接使用里面定义的函数了
println("计算节点的计算结果为:" + ints) //把客户端拿到的数据给输出
objin.close() //把输入流给关闭掉
client.close()
server.close() //把服务器给关闭掉
}
}
Driver 代码:
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket
//这个是用来执行程序的
class Driver {
}
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器 本地连接,然后第二个参数是服务器定义的端口号
val client = new Socket("localhost",9998) //这个相当于是是客户端,连接服务器
val out: OutputStream = client.getOutputStream //向服务器发东西,用getOutputStream()
val objout = new ObjectOutputStream(out) //定义这个Object的输出,因为上面那个是输出字节的不能传输对象
val task:Task = new Task() //然后创建一个task
objout.writeObject(task) //把task 传入给objout 对象输出流
objout.flush()
objout.close() //用完了吧这个输出流给关掉
client.close() //然后把这个客户端也关掉
println("客户端发送数据完毕")
}
}
Task 代码:
package com.atguigu.bigdata.spark.core.wc.test2
class Task extends Serializable { //要混入序列化的特征,不然不能那个传一个对象过去
val datas = List(1,2,3,4) //这个是数据
val logic = (num:Int) => {num * 2} //匿名函数 这个是逻辑
//计算
def compute() = {
datas.map(logic) //莫logic 上面定义的逻辑操作传入进去
}
}