Spark---转换算子、行动算子、持久化算子

news2025/1/11 21:08:42

一、转换算子和行动算子

1、Transformations转换算子

1)、概念

Transformations类算子是一类算子(函数)叫做转换算子,如map、flatMap、reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

2)、Transformation类算子

filter :过滤符合条件的记录数,true保留,false过滤掉

map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

reduceByKey:将相同的Key根据相应的逻辑进行处理。

sortByKey/sortBy:作用在K,V格式的RDD上,对Key进行升序或者降序排序。

2、Action行动算子

1)、概念:

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

2)、Action类算子

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。

take(n):返回一个包含数据集前n个元素的集合。

first:first=take(1),返回数据集中的第一个元素。

foreach:循环遍历数据集中的每个元素,运行相应的逻辑。

collect:将计算结果回收到Driver端。

3)、demo:动态统计出现次数最多的单词个数,过滤掉。

  • 一千万条数据量的文件,过滤掉出现次数多的记录,并且其余记录按照出现次数降序排序。

假设有一个records.txt文件

hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark

代码处理:

package com.bjsxt.demo;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;
/**
 * 动态统计出现次数最多的单词个数,过滤掉。
 * @author root
 *
 */
public class Demo1 {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.setMaster("local").setAppName("demo1");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		JavaRDD<String> lines = jsc.textFile("./records.txt");
		JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> call(String t) throws Exception {
				return Arrays.asList(t.split(" "));
			}
		});
		JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		});
		
		JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);
		
		final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1+v2;
			}
			
		}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
					throws Exception {
				return new Tuple2<Integer, String>(t._2, t._1);
			}
		}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
					throws Exception {
				return new Tuple2<String, Integer>(t._2, t._1);
			}
		}).take(1);
		
		System.out.println("take--------"+take);
		
		JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Boolean call(Tuple2<String, Integer> v1) throws Exception {
				return !v1._1.equals(take.get(0)._1);
			}
		}).reduceByKey(new Function2<Integer,Integer,Integer>(){

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1+v2;
			}
			
		}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
					throws Exception {
				return new Tuple2<Integer, String>(t._2, t._1);
			}
		}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
					throws Exception {
				return new Tuple2<String, Integer>(t._2, t._1);
			}
		});
		
		result.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			
			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Tuple2<String, Integer> t) throws Exception {
				System.out.println(t);
			}
		});
		
		jsc.stop();
		
	}
}

3、Spark代码流程

1)、创建SparkConf对象

可以设置Application name。

可以设置运行模式。

可以设置Spark application的资源需求。

2)、创建SparkContext对象

3)、基于Spark的上下文创建一个RDD,对RDD进行处理。

4)、应用程序中要有Action类算子来触发Transformation类算子执行。

5)、关闭Spark上下文对象SparkContext。

二、Spark持久化算子

1、控制算子

1)、概念

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

2)、cache

默认将RDD的数据持久化到内存中。cache是懒执行。

注意:chche()=persist()=persist(StorageLevel.Memory_Only)

测试cache文件:

测试代码:

1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ 
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();

persist:

可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本数。

持久化级别如下:

2、cache和persist的注意事项

1)、cache和persist都是懒执行,必须有一个action类算子触发执行。

2)、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

3)、cache和persist算子后不能立即紧跟action算子。

4)、cache和persist算子持久化的数据当applilcation执行完成之后会被清除。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
  • persist(StorageLevel.DISK_ONLY)与Checkpoint的区别?

1)、checkpoint需要指定额外的目录存储数据,checkpoint数据是由外部的存储系统管理,不是Spark框架管理,当application完成之后,不会被清空。

2)、cache() 和persist() 持久化的数据是由Spark框架管理,当application完成之后,会被清空。

3)、checkpoint多用于保存状态。

  • checkpoint 的执行原理:

1)、当RDD的job执行完毕后,会从finalRDD从后往前回溯。

2)、当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。

3)、Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
  • 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1240762.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode——121 买卖股票的最佳时机

(超时。。。。。。&#xff09;除了暴力法我是真的。。。。。。 class Solution {public int maxProfit(int[] prices) {int len prices.length;int max0;for(int i0;i<len-1;i){for(int ji1;j<len;j){int income prices[j] - prices[i];if(income>max){maxincome;…

路由的控制与转发原理

场景1&#xff1a;路由器收到数据包后&#xff0c;会根据数据包的目标IP地址&#xff0c;计算出目标网段&#xff0c;再确定终端设备的具体位置。这个过程中&#xff0c;还需要计算出接口&#xff0c;或数据包下一跳的地址。最终会生成一条路由&#xff0c;即路径&#xff0c;存…

外部中断为什么会误触发?

今天在写外部中断的程序的时候&#xff0c;发现中断特别容易受到干扰&#xff0c;我把手放在对应的中断引脚上&#xff0c;中断就一直触发&#xff0c;没有停过。经过一天的学习&#xff0c;找到了几个解决方法&#xff0c;所以写了这篇笔记。如果你的中断也时不时会误触发&…

基于JavaWeb+SSM+Vue教学辅助微信小程序系统的设计和实现

基于JavaWebSSMVue教学辅助微信小程序系统的设计和实现 源码获取入口前言主要技术系统设计功能截图Lun文目录订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 前言 1.1 概述 随着信息时代的快速发展&#xff0c;互联网的优势和普及&#xff0c;人们生活…

二、防火墙-安全策略

学习防火墙之前&#xff0c;对路由交换应要有一定的认识 1、安全策略基本概念2、匹配顺序3、缺省包过滤4、安全策略发展史4.1.基于ACL包过滤4.2.融合UTM安全策略4.3.一体化安全策略 5、Local区域安全策略5.1.针对OSPF协议配置Local区域安全策略5.2.其他协议 6、ASPF6.1.帮助FTP…

PCIE链路训练-状态跳转1

A&#xff1a;12ms超时&#xff0c;或者再任何lane上检测到Electrical Idle Exit&#xff1b; B&#xff1a; 1.发送“receiver detection”之后没有一个lane的接收逻辑被rx检测到 2.不满足条件c&#xff0c;比如两次detection出现差别&#xff1b; C&#xff1a;发送端在没…

文本分析:NLP 魔法!

一、说明 这是一个关于 NLP 和分类项目的博客。NLP 是自然语言处理&#xff0c;目前需求量很大。让我们了解如何利用 NLP。我们将通过编码来理解流程和概念。我将在本博客中介绍 BagOfWords 和 n-gram 以及朴素贝叶斯分类模型。这个博客的独特之处&#xff08;这使得它很长&…

竞赛选题 车道线检测(自动驾驶 机器视觉)

0 前言 无人驾驶技术是机器学习为主的一门前沿领域&#xff0c;在无人驾驶领域中机器学习的各种算法随处可见&#xff0c;今天学长给大家介绍无人驾驶技术中的车道线检测。 1 车道线检测 在无人驾驶领域每一个任务都是相当复杂&#xff0c;看上去无从下手。那么面对这样极其…

Oracle 的 Java SE、OpenJDK、Database 链接

1 访问主站 Oracle | Cloud Applications and Cloud Platform 2 开发者 2.1 OpenJDK (这里的不用登录&#xff0c;就可以下载) JDK Builds from Oracle 2.2 JavaSE (需要登录&#xff0c;才可以下载) Java Downloads | Oracle 2.3 DataBase (MySQL为例) MySQL :: MySQL Dow…

排序算法--快速排序

实现逻辑 ① 从数列中挑出一个元素&#xff0c;称为 “基准”&#xff08;pivot&#xff09;&#xff0c; ② 重新排序数列&#xff0c;所有元素比基准值小的摆放在基准前面&#xff0c;所有元素比基准值大的摆在基准的后面&#xff08;相同的数可以到任一边&#xff09;。在这…

DataFunSummit:2023年OLAP引擎架构峰会-核心PPT资料下载

一、峰会简介 OLAP技术是当前大数据领域的热门方向&#xff0c;该领域在各个行业都有广泛的使用场景&#xff0c;对OLAP引擎的功能有丰富多样的需求。同时&#xff0c;在性能、稳定性和成本方面&#xff0c;也有诸多挑战。目前&#xff0c;OLAP技术没有形成统一的事实标准&…

系统移植-交叉编译工具链

不同架构的机器码 与 汇编语言 都不可移植&#xff0c; 且二者一一对应 c语言中三种成分&#xff1a; 1.分号结尾的叫做语句 语句可以让CPU执行&#xff0c;可以进行预处理&#xff0c;编译等生成机器码 2.#开头的为预处理指令 不带分号 CPU无法执行 3.注释&#xff0c;…

AR道具特效制作工具

AR&#xff08;增强现实&#xff09;技术已经逐渐渗透到各个行业&#xff0c;为企业带来了全新的营销方式和用户体验。在这个背景下&#xff0c;美摄科技凭借其强大的技术实力和创新精神&#xff0c;推出了一款专为企业打造的美摄AR特效制作工具&#xff0c;旨在帮助企业轻松实…

Eclipse常用设置-乱码

在用Eclipse进行Java代码开发时&#xff0c;经常会遇到一些问题&#xff0c;记录下来&#xff0c;方便查看。 一、properties文件乱码 常用的配置文件properties里中文的乱码&#xff0c;不利于识别。 处理流程&#xff1a;Window -> Preferences -> General -> Ja…

stm32定时器输入捕获模式

频率测量 频率测量有两种方法 测频法&#xff1a;在闸门时间T内&#xff0c;对上升沿或下降沿计次&#xff0c;得到N&#xff0c;则评率fxN/T测周法&#xff1a;两个上升沿内&#xff0c;以标准频率fc计次得到N&#xff0c;则频率fx fc/N中界频率&#xff1a;测频法和测周法误…

Altium Designer学习笔记8

创建原理图元件&#xff1a; 画出原理图&#xff1a; 根据规则书画出原理图&#xff1a; 根据规则书画出封装图&#xff1a; 参照&#xff1a; 确认下过孔的内径和外径的最小允许值。

设计模式-创建型模式-工厂方法模式

一、什么是工厂方法模式 工厂模式又称工厂方法模式&#xff0c;是一种创建型设计模式&#xff0c;其在父类中提供一个创建对象的方法&#xff0c; 允许子类决定实例化对象的类型。工厂方法模式是目标是定义一个创建产品对象的工厂接口&#xff0c;将实际创建工作推迟到子类中。…

前端js语音朗读文本

<!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>语音朗读</title></head><body>&l…

js双击修改元素内容并提交到后端封装实现

前面发过一个版本了&#xff0c;后来又追加了些功能。重新发一版。新版支持select和radio。 效果图&#xff1a; 右上角带有绿标的&#xff0c;是可以修改的单元格。如果不喜欢显示绿标&#xff0c;可以传递参数时指定不显示&#xff0c;如果想改为其它颜色&#xff0c;也可以…

C++多线程学习(二):多线程通信和锁

参考引用 C11 14 17 20 多线程从原理到线程池实战代码运行环境&#xff1a;Visual Studio 2019 1. 多线程状态 1.1 线程状态说明 初始化 (lnit)&#xff1a;该线程正在被创建就绪 (Ready)&#xff1a;该线程在就绪列表中&#xff0c;等待 CPU 调度运行 (Running)&#xff1a;…