Spark+实例解读

news2025/1/21 2:51:31

第一部分 Spark入门

学习教程:Spark 教程 | Spark 教程

Spark 集成了许多大数据工具,例如 Spark 可以处理任何 Hadoop 数据源,也能在 Hadoop 集群上执行。大数据业内有个共识认为,Spark 只是Hadoop MapReduce 的扩展(事实并非如此),如Hadoop MapReduce 中没有的迭代查询和流处理。然而Spark并不需要依赖于 Hadoop,它有自己的集群管理系统。更重要的是,同样数据量,同样集群配置,Spark 的数据处理速度要比 Hadoop MapReduce 快10倍左右。

Spark 的一个关键的特性是数据可以在内存中迭代计算,提高数据处理的速度。虽然Spark是用 Scala开发的,但是它对 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。

第二部分 SparkCore

2 RDD

2.1 转换算子-map

map是将RDD的数据一条条处理,返回新的RDD

# 定义方法
def add(data):
    return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap

flatMap对RDD执行map操作,然后执行解除嵌套操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)
    }.reduceByKey(_ + _).map { case ((feature, label), num) =>
      (feature, List((label, num))) //feature,label,cnt
    }.reduceByKey(_ ::: _).mapValues { x =>
      val size_entro = x.map(_._2).sum
      val res = x.map(_._2.toDouble / size_entro).map { t =>
        -t * (Math.log(t) / Math.log(2))
      }.sum
      size_entro * res
    }.mapValues { x => x / size }.map(_._2).sum

2.3转换算子-reduceByKey

针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作

rdd.reduceByKey(func)

      val clickStat = joinDf
        .where(F.col("active_type")==="click")
        .rdd
        .map(row => {
          val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat)))
          mapInfo match {
            case Some(x) => x
            case _ => null
          }
        })
        .filter(_!=null)
        .flatMap(x=>x)
        .reduceByKey(_+_)
2.4 转换算子-mapValues

针对二元元组RDD,对其内部的二元元组的value进行map操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)
    }.reduceByKey(_ + _).map { case ((feature, label), num) =>
      (feature, List((label, num))) //feature,label,cnt
    }.reduceByKey(_ ::: _).mapValues { x =>
      val size_entro = x.map(_._2).sum
      val res = x.map(_._2.toDouble / size_entro).map { t =>
        -t * (Math.log(t) / Math.log(2))
      }.sum
      size_entro * res
    }.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy

将RDD的数据进行分组

rdd.groupBy(func)

rdd = sc.parallelize([('a',1),('a',11),('b',1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
​
    val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat])
      .map(l=>{
        (l.getUid,l.getContent_properties.get(0).getId)
      }).toDF("uid", "docid")
      .groupBy($"uid")
2.6 转换算子-filter

过滤想要的数据进行保存

rdd = sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
    val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath)
      .select("_c0")
      .withColumnRenamed("_c0", "userid")
      .withColumn("flow", getexpId($"userid"))
      .filter($"flow" >= start and $"flow" <= end)
      .select("userid")
      .dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区
    val userContentHis = hisPathList.map(path =>{
      val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])
      println(s"hisData ==>${hisData.count()}")
      hisData
    }).reduce(_ union _).distinct().repartition(partition)
union算子 
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重  rdd的类型不同也是可以合并的
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
​
2.rdd的分区数怎么查看? 
通过getNumPartitions API查看,返回int
​
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
​
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile

3 RDD的持久化

3.1 RDD的持久化

rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失

3.2 RDD的缓存

val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint

也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系

3.4 缓存面试题

4 案例

4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源?
计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数
​

5 广播变量 累加器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1957792.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

精准客户从何而来?一招让你的客户源源不断!

你们是否还在为找不到精准客户而烦恼&#xff1f; 今天&#xff0c;我要分享一招非常实用的技巧&#xff0c;让你也能拥有源源不断的客源&#xff01; 首先&#xff0c;我们需要深入了解自己的目标客户。他们是谁&#xff1f;他们的需求是什么&#xff1f;并利用大数据洞察客…

Cocos Creator文档学习记录

Cocos Creator文档学习记录 一、什么是Cocos Creator 官方文档链接&#xff1a;Hello World | Cocos Creator 百度百科&#xff1a;Cocos Creator_百度百科 Cocos Creator包括开发和调试、商业化 SDK 的集成、多平台发布、测试、上线这一整套工作流程&#xff0c;可多次的迭…

【前端 19】使用Vue-CLI脚手架构建Vue2项目

使用Vue CLI构建Vue 2项目 引言 Vue.js 是一个构建用户界面的渐进式JavaScript框架&#xff0c;以其轻量级和易用性受到前端开发者的广泛喜爱。Vue CLI&#xff08;Vue Command Line Interface&#xff09;是一个基于Vue.js进行快速开发的完整系统&#xff0c;提供了零配置的项…

Amazon Bedrock + Amazon DynamoDB 数据设计与建模

一、Amazon DynamoDB简介 在当今数字化转型的浪潮中&#xff0c;企业对数据处理能力的需求日益增长&#xff0c;为了应对大规模数据和高并发访问的挑战&#xff0c;选择一款合适的数据库解决方案变得尤为重要。 Amazon DynamoDB&#xff0c;作为亚马逊云科技提供的一种完全托…

【人工智能】Transformers之Pipeline(六):图像分类(image-classification)

目录 一、引言 二、图像分类&#xff08;image-classification&#xff09; 2.1 概述 2.2 技术原理 2.3 应用场景 2.4 pipeline参数 2.4.1 pipeline对象实例化参数 2.4.2 pipeline对象使用参数 2.4 pipeline实战 2.5 模型排名 三、总结 一、引言 pipeline&#x…

文远知行IPO,L4的梦还是L2给的

文&#xff5c;刘俊宏 编&#xff5c;王一粟 随着萝卜快跑在武汉初步验证了Robotaxi商业闭环&#xff0c;L4自动驾驶的公司们终于迎来了“黎明的曙光”。 7月27日&#xff0c;文远知行向美国SEC&#xff08;证券交易委员会&#xff09;提交了招股书&#xff0c;以超越Moment…

Flask Bootstrap #2 - MVC架构

Reference https://medium.com/%E5%B7%A5%E7%A8%8B%E9%9A%A8%E5%AF%AB%E7%AD%86%E8%A8%98/%E4%BD%BF%E7%94%A8-python-flask-%E5%BB%BA%E7%AB%8B%E7%B6%B2%E7%AB%99-353e449a9bc8 1 MVC架构 在 Flask Boostrap #1 - 安装Flask 透过 app.route() 建立路由是 flask API 常见的…

实验2-4-1 求1到N的和*--sum记得累加啊!

//实验2-4-1 求1到N的和//计算序列 1 2 3 ... 的前N项之和。#include<stdio.h> #include<math.h> int main(){int N,sum0;scanf("%d",&N);for(int a1;a<N;a){ suma;//sum进行累加&#xff01;&#xff01;&#xff01;&#xff01;&#xff01…

VS2019编译和使用gtest测试(C++)

目录 一、首先下载gtest开源 二、使用gtest 一、首先下载gtest开源 https://pan.baidu.com/s/15m62KAJ29vNe1mrmAcmehA 提取码&#xff1a;vfxz 下载下来解压到文件夹&#xff0c;再在文件夹里面新建一个build文件夹&#xff0c;如下&#xff1a; 再安装cmake&#xff0c;…

Cocos Creator2D游戏开发(6)-飞机大战(4)-敌机产生

敌机产生&玩家发射子弹 敌机产生: 创建一个空节点 创建一个敌机预制体 把敌机图片拖入预制体内 使用代码生成敌机 让敌机动起来 创建一个预制体enemy_prefab双击预制体enemy_prefab,然后拖入一个敌机图片,设置好方向和尺寸,一定要记得保存然后关闭(场景编辑器里面的保存)…

【前端 18】安装Node.js

Node.js 安装指南 在今天的博客中&#xff0c;我们将一起探讨如何在您的计算机上安装Node.js。Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境&#xff0c;它允许你在服务器端运行 JavaScript 代码。无论您是前端开发者希望探索全栈开发&#xff0c;还是后端开发者寻…

【ROS 最简单教程 002/300】ROS 环境安装 (虚拟机版): Noetic

&#x1f497; 有遇到安装问题可以留言呀 ~ 当时踩了挺多坑&#xff0c;能帮忙解决的我会尽力 &#xff01; 1. 安装操作系统环境 Linux ❄️ VM / VirtualBox Ubuntu20.04 &#x1f449; 保姆级图文安装教程指路&#xff0c;有经验的话 可以用如下资源自行安装 ITEMREFERENCE…

Excel模拟计算演示-以矩阵乘计算密度为例

Excel模拟计算演示-以矩阵乘计算密度为例 1.参考链接2.CUDA_Occupancy_Calculator截图3.矩阵乘计算密度模拟计算的操作步骤及效果 安装好CUDA之后,/usr/local/cuda-12.1/tools/CUDA_Occupancy_Calculator.xls里会看到"TABLE(,B17)"这样的表达式,原来是模拟计算的结果…

Stable Diffusion 提示词攻略

一、提示词作用 提示词所做的工作是缩小模型出图的解空间&#xff0c;即缩小生成内容时在模型数据里的检索范围&#xff0c;而非直接指 定作画结果。 提示词的效果也受模型的影响&#xff0c;有些模型对自然语言做特化训练&#xff0c;有些模型对单词标签对做特化训练&#xf…

Lumos学习王佩丰Excel第八讲:IF函数逻辑判断

本节课与数学无关&#xff0c;与逻辑强相关。这节课对理工科&#xff0c;尤其是对有计算机基础的同学们会很友好。 一、使用IF函数 1、IF函数的基本用法 函数语法&#xff1a;IF(logical_test,[value_if_true],[value_if_false]) logical_test&#xff1a;判断条件 [value…

现在有什么赛道可以干到退休?

最近&#xff0c;一则“90后无论男女都得65岁以后退休”的消息在多个网络平台流传&#xff0c;也不知道是真是假&#xff0c;好巧不巧今天刷热点的时候又看到一条这样的热点&#xff1a;现在有什么赛道可以干到退休&#xff1f; 点进去看了几条热评&#xff0c;第一条热评说的…

品牌控价:维护市场秩序的关键策略

在当今竞争激烈的市场环境中&#xff0c;品牌控价成为了品牌发展的重要环节。品牌在拓展销售渠道时&#xff0c;为确保自身的形象与利益&#xff0c;通常会为经销商设定出货价和建议零售价。然而&#xff0c;部分经销商为追求短期利益&#xff0c;在电商平台上进行低价引流&…

Ollama怎么启动.gguf 大模型

环境&#xff1a; Llama3-8B 问题描述&#xff1a; Ollama怎么启动.gguf 大模型 解决方案&#xff1a; 要使用 Ollama 启动 .gguf 大模型&#xff0c;你可以按照以下步骤操作&#xff1a; 创建 Modelfile&#xff1a;首先&#xff0c;创建一个名为 Modelfile 的文件&…

【C++BFS算法】886. 可能的二分法

本文涉及的点 CBFS算法 LeetCod886. 可能的二分法 给定一组 n 人&#xff08;编号为 1, 2, …, n&#xff09;&#xff0c; 我们想把每个人分进任意大小的两组。每个人都可能不喜欢其他人&#xff0c;那么他们不应该属于同一组。 给定整数 n 和数组 dislikes &#xff0c;其…