spark介绍及简单使用

news2024/9/28 13:24:36

简介

        Spark是由加州大学伯克利分校AMPLab(AMP实验室)开发的开源大数据处理框架。起初,Hadoop MapReduce是大数据处理的主流框架,但其存在一些限制,如不适合迭代算法、高延迟等。为了解决这些问题,Spark在2010年推出,提供了高效的内存计算和更灵活的数据处理方式。

使用场景:

        批处理:

        Spark支持大规模的批处理任务,通过弹性的分布式计算能力,能够处理海量数据。

        交互式查询:

         Spark提供了Spark SQL,使得用户能够使用SQL语言进行交互式查询,方便数据分析师和数据科学家进行数据探索。

        流处理:

         Spark Streaming模块允许实时处理数据,支持复杂的流处理应用。

        机器学习:

         MLlib是Spark的机器学习库,支持分布式机器学习,适用于大规模数据集的训练和预测。

        图处理:

         GraphX是Spark的图处理库,用于处理图数据结构,支持图算法的并行计算。

        技术竞品:

        Hadoop MapReduce:Spark的前身,仍然是大数据领域的主流框架之一,但相对而言,Spark更灵活、性能更好。

        Apache Flink: 与Spar一个流处理和批处理框架,强调事件时间处理和精确一次性语义。

        Apache Storm: 专注于实时流处理,适用于需要低延迟的应用场景。

        Apache HBase: 针对NoSQL存储,适用于需要实时读写的大数据场景。

优劣势:

        Spark的优势:

        高性能: Spark的内存计算引擎可以显著提高计算速度,特别适用于迭代算法和复杂的数据处理任务。

        易用性: 提供了丰富的API,包括Java、Scala、Python和R等,使得开发者能够使用熟悉的编程语言进行大数据处理。

        统一的处理框架: Spark支持批处理、交互式查询、流处理、机器学习和图处理等多种数据处理模式,为用户提供了统一的编程接口。

        生态系统: Spark生态系统包括Spark SQL、MLlib、GraphX等库,丰富的生态系统支持广泛的数据处理应用。

ec433488fc544c4fa7b488592d7c1188.png

        Spark的劣势:

        资源消耗: 由于使用内存计算,Spark对内存的需求较大,需要足够的硬件资源支持。

        学习曲线: 对于初学者而言,学习Spark可能需要一定的时间,尤其是对于复杂的数据处理任务。

        实时性: 尽管Spark Streaming支持实时处理,但相较于专注于实时处理的框架,实时性可能稍逊一筹。

在选择大数据处理框架时,需要考虑具体的业务需求和场景,综合考虑各个框架的优劣势来做出合适的选择。

spark的shell使用

        本文在hadoop for spark 集群环境下进行演示,当你启动集群的所有工作程序包括spark程序在内,可以使用spark-shell指令在任意一个节点进入到spark交互命令行中

        spark-shell 后置参数解释

- -I <file>:预加载<file>,强制逐行解释。

- --master MASTER_URL:指定Spark的主节点URL,可以是spark://host:port, mesos://host:port, yarn, k8s://https://host:port, 或者 local。

- --deploy-mode DEPLOY_MODE:指定驱动程序的部署模式,可以是本地("client")或者集群中的工作节点("cluster")。

- --class CLASS_NAME:指定应用程序的主类(适用于Java / Scala应用程序)。

- --name NAME:指定应用程序的名称。

- --jars JARS:指定要包含在驱动程序和执行器类路径中的jar文件,用逗号分隔。

- --packages:指定要包含在驱动程序和执行器类路径中的maven坐标的jar文件,用逗号分隔。

- --exclude-packages:指定在解析--packages提供的依赖项时要排除的groupId:artifactId,用逗号分隔。

- --repositories:指定要搜索--packages给出的maven坐标的额外远程仓库,用逗号分隔。

- --py-files PY_FILES:指定要放在PYTHONPATH上的.zip, .egg, 或 .py文件,用逗号分隔。

- --files FILES:指定要放在每个执行器的工作目录中的文件,用逗号分隔。

- --archives ARCHIVES:指定要解压到每个执行器的工作目录中的归档文件,用逗号分隔。

- --conf, -c PROP=VALUE:指定Spark的配置属性。

- --properties-file FILE:指定要从中加载额外属性的文件路径。

- --driver-memory MEM:指定驱动程序的内存(例如1000M, 2G)。

- --driver-java-options:指定要传递给驱动程序的额外Java选项。

- --driver-library-path:指定要传递给驱动程序的额外库路径。

- --driver-class-path:指定要传递给驱动程序的额外类路径。

- --executor-memory MEM:指定每个执行器的内存(例如1000M, 2G)。

- --proxy-user NAME:指定提交应用程序时要模拟的用户。

- --help, -h:显示帮助信息并退出。

- --verbose, -v:打印额外的调试输出。

- --version:打印当前Spark的版本。

        进入spark交互页面,这里有三个方法进入spark的交互环境,不同的语言环境,其提示符也有所不同。

##默认scala语言环境
spark-shell --master local

##使用python语言环境
pyspark

##使用R语言环境
sparkR

12f7f9399d904b7a951928296cd6bedc.png

spark-shell中的使用范例 

        在/home/hadoop 目录下创建一个wordcount.txt,文件内容如下。

        821a814c6bda466c815ee276eaed7851.png

spark-shell进入scala交互页面

读取文件内容、统计内容行数、取首行数据。


scala> val textFile = sc.textFile("file:///home/hadoop/wordcount.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/wordcount.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> textFile.count()
res0: Long = 3                                                                  

scala> textFile.first()
res1: String = hello you

         scala在使用方法上还是和java有几分类似。在linux的交互行上,也可以实现像idea上的联想功能

scala> val textFile = sc.textFile("file:///home/hadoop/wordcount.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:23


scala> textFile.
++                         countApprox             getCheckpointFile    mapPartitionsWithEvaluator   reduce             toDebugString                
aggregate                  countApproxDistinct     getNumPartitions     mapPartitionsWithIndex       repartition        toJavaRDD                    
barrier                    countAsync              getResourceProfile   max                          sample             toLocalIterator              
cache                      countByValue            getStorageLevel      min                          saveAsObjectFile   toString                     
canEqual                   countByValueApprox      glom                 name                         saveAsTextFile     top                          
cartesian                  dependencies            groupBy              partitioner                  setName            treeAggregate                
checkpoint                 distinct                id                   partitions                   sortBy             treeReduce                   
cleanShuffleDependencies   filter                  intersection         persist                      sparkContext       union                        
coalesce                   first                   isCheckpointed       pipe                         subtract           unpersist                    
collect                    flatMap                 isEmpty              preferredLocations           take               withResources                
collectAsync               fold                    iterator             productArity                 takeAsync          zip                          
compute                    foreach                 keyBy                productElement               takeOrdered        zipPartitions                
context                    foreachAsync            localCheckpoint      productIterator              takeSample         zipPartitionsWithEvaluator   
copy                       foreachPartition        map                  productPrefix                toDF               zipWithIndex                 
count                      foreachPartitionAsync   mapPartitions        randomSplit                  toDS               zipWithUniqueId   

        定义好一个参数的路径时,可以使用TAB键进行联想,后面就会弹出可使用的相关函数。函数的命令及其功能,在博主看来甚至和SQL相似,只是使用方法上不同。

Spark在ideal中的使用

        通过idea创建一个maven项目

 

编辑pom.xml增加spark相关依赖

<dependencies>
        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- Spark SQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- Spark Streaming -->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- Spark MLib -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- Spark GraphX -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

点击右侧mven进行刷新 

 新建项目文件目录

         这里简单理解,第一个是存放代码的资源目录,第二个是存放配置文件。第三第四是测试类的,这里创建为第一个 

         在java目录下创建一个java class

创建类名,第一个要大写

 创建远程运行环境

点击远程开发

        

新建一个SSH链接

这里需要保证远程服务器的防火墙等相关配置关闭的。

  点击检查链接并继续,然后进入创建的java class 中进行编写代码

这里需要安装一个spakr插件,可以使代码能在服务器上运行

 spark的RDD简单应用

        写一个单词统计的代码,做为简单的。

在src/main路径下创建一个wordcount.txt文件,并键入以下内容        

hello you
hello he
hello me

创建一个WordCount 名称的class         

package sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class WordCount {

    public static void main(String[] args) throws Exception{
//        配置 Spark 应用:
//这里创建了一个 SparkConf 对象,设置了应用名为 "WordCount",并且指定在本地模式下运行,使用一个本地线程。
        SparkConf sparkConf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local[1]");

//使用 SparkConf 创建了 JavaSparkContext 对象,该对象是 Spark 的 Java API 入口点。
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

//        从文本文件中读取数据,并创建一个包含每行文本的 RDD。
        JavaRDD<String> linesRDD = sc.textFile("src/main/wordcount.txt");

//使用 flatMap 转换操作,将每行文本切分为单词,形成一个包含所有单词的 RDD。
        JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                List<String> list = Arrays.asList(words);
                return list.iterator();
            }
        });

//使用 mapToPair 转换操作,将每个单词映射为键值对,其中键是单词,值是1。
        JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                System.out.println("正在处理的单词是:" + word);
                return new Tuple2<>(word, 1);
            }
        });
//使用 reduceByKey 转换操作,对相同键的值进行累加,实现单词频次的统计。
        JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

//使用 foreach 操作,遍历统计结果并打印每个单词及其频次。
    retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tuple2) throws Exception {
                System.out.println(tuple2);
            }
        });


        sc.stop();
    }

}

        通过以上在spark 的java代码可以看出,使用java写程序时一件相当繁琐的事情。后面会主要一pyspark给大家spark的应用。

        注:这里在运行后会出现很多红色高亮信息,这些并不影响程序的正常运行 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1323454.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Hutool--DFA 敏感词工具类

使用hutool的dfa工具类可以很好的帮助我们来实现敏感词过滤的功能&#xff0c;下面从用例入手来逐步地去j简单了解一下dfa工具类。 字典树 DFA算法的核心是建立了以敏感词为基础的许多敏感词树&#xff08;字典树&#xff09;。 它的基本思想是基于状态转移来检索敏感词。 字…

AI 绘画 | Stable Diffusion 去除照片马赛克

前言 本篇文章教会你如何让几秒钟去除图片中的马赛克,还是依托于Stable Diffusion WEB UI的强大扩展插件ControlNet,快来学起来吧,如果有问题请在评论区留言。 教程 选择大模型 首先在图生图,选择一个写实风格的大模型(我这里选择是majicMIX realistic 麦橘写实_v7.saf…

Apache RocketMQ 5.0 腾讯云落地实践

Apache RocketMQ 发展历程回顾 RocketMQ 最早诞生于淘宝的在线电商交易场景&#xff0c;经过了历年双十一大促流量洪峰的打磨&#xff0c;2016年捐献给 Apache 社区&#xff0c;成为 Apache 社区的顶级项目&#xff0c;并在国内外电商&#xff0c;金融&#xff0c;互联网等各行…

内网渗透测试基础——内网信息收集

内网渗透测试基础——内网信息收集 在内网渗透测试环境中&#xff0c;有很多设备和防护软件&#xff0c;例如Bit9、ArcSight、Maniant等。它们通过收集目标内网的信息&#xff0c;洞察内网网络拓扑结构&#xff0c;找出内网中最薄弱的环节。信息收集的深度&#xff0c;直接关系…

Java 数据结构篇-实现堆的核心方法与堆的应用(实现 TOP-K 问题:最小 k 个数)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 堆的说明 2.0 堆的成员变量及其构造方法 3.0 实现堆的核心方法 3.1 实现堆的核心方法 - 获取堆顶元素 peek() 3.2 实现堆的核心方法 - 下潜 down(int i) 3.3 实…

C/C++ 使用 MySQL API 进行数据库操作

C/C 使用 MySQL API 进行数据库操作 一、前言 随着信息时代的到来&#xff0c;数据库的应用日益广泛&#xff0c;MySQL 作为开源的关系型数据库管理系统&#xff0c;被广大开发者所喜爱。在 C/C 程序中&#xff0c;我们可以通过 MySQL 提供的 API 接口来连接数据库&#xff0…

研发管理-代码管理篇

前言&#xff1a; 工作了这些年&#xff0c;工作了三家公司&#xff0c;也用过主流的代码管理平台&#xff0c;比如SVN&#xff0c;git系列&#xff08;gitlib,gitee&#xff09;,各有优点&#xff0c;我个人比较喜欢SVN&#xff0c;多人协作的代码管理难免会有代码冲突&#…

【算法】红黑树

一、红黑树介绍 红黑树是一种自平衡二叉查找树&#xff0c;是在计算机科学中用到的一种数据结构&#xff0c;典型的用途是实现关联数组。 红黑树是在1972年由Rudolf Bayer发明的&#xff0c;当时被称为平衡二叉B树&#xff08;symmetric binary B-trees&#xff09;。后来&am…

【C语言】SCU安全项目2-BufBomb

目录 关键代码解读&#xff1a; getxs() getbuf() test() 核心思路 具体操作1 具体操作2 前段时间忙于强网杯、英语4级和一些其他支线&#xff0c;有点摸不清头绪了&#xff0c;特别是qwb只有一个输出&#xff0c;太过坐牢&#xff0c;决定这个安全项目做完后就继续投身…

LED恒流调节器FP7126:引领LED照明和调光的新时代(调光电源、汽车大灯)

目录 一、FP7126概述 二、FP7126功能 三、应用领域 随着科技的进步&#xff0c;LED照明成为了当代照明产业的主力军。而在LED照明的核心技术中&#xff0c;恒流调节器是不可或缺的组成部分。今天&#xff0c;我将为大家介绍一款重要的恒流调节器FP7126&#xff0c;适用于LED…

useConsole的封装,vue,react,htmlscript标签,通用

之前用了接近hack的方式实现了console的封装&#xff0c;目标是获取console.log函数的执行&#xff08;调用栈所在位置&#xff09;所在的代码行数。 例如以下代码&#xff0c;执行window.mylog(1)时候&#xff0c;console.log实际是在匿名的箭头函数()>{//这里执行的} con…

基础知识回顾:安装 NGINX 开源版和 NGINX Plus

原文作者&#xff1a;Robert Haynes of F5 原文链接&#xff1a;基础知识回顾&#xff1a;安装 NGINX 开源版和 NGINX Plus 转载来源&#xff1a;NGINX 中文官网 NGINX 唯一中文官方社区 &#xff0c;尽在 nginx.org.cn 如今&#xff0c;NGINX 仍然是全球最受欢迎的 web 服务器…

【nice-slam】基于RGB-D类型SLAM的定位与重建(史上最详细nice-slam资料汇总)

【NICE-SLAM】基于RGB-D类型SLAM的定位与重建 1. 总结2. 论文2. 1 算法核心流程小姐2.2 论文摘要2.3 Dataset result2.3.1 Replica Dataset result2.3.2 ScanNet Dataset result2.3.3 Multi-room Apartment result2.3.4 Co-fusion Dataset (Robustness to Dynamic Objects) res…

Leetcode—2828.判别首字母缩略词【简单】

2023每日刷题&#xff08;六十五&#xff09; Leetcode—2828.判别首字母缩略词 实现代码 class Solution { public:bool isAcronym(vector<string>& words, string s) {int i 0;int len1 words.size();int len2 s.size();if(len1 ! len2) {return false;}for(a…

Achronix提供由FPGA赋能的智能网卡(SmartNIC)解决方案来打破智能网络性能极限

作者&#xff1a;Achronix 随着人工智能/机器学习&#xff08;AI/ML&#xff09;和其他复杂的、以数据为中心的工作负载被广泛部署&#xff0c;市场对高性能计算的需求持续飙升&#xff0c;对高性能网络的需求也呈指数级增长。高性能计算曾经是超级计算机这样一个孤立的领域&a…

使用Python编写简单网络爬虫实例:爬取图片

&#x1f34e;个人主页 &#x1f3c6;个人专栏&#xff1a;日常聊聊 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 ​编辑 简介 步骤 1. 安装依赖库 2. 创建目录 3. 发送HTTP请求并解析页面 4. 查找图片标签并下载图片 注意事项 结语 我的其他博客 简介 网络爬虫是一种…

CSP-S2019提高组day1-T2:括号树

题目链接 [CSP-S2019] 括号树 题目描述 本题中合法括号串的定义如下&#xff1a; () 是合法括号串。如果 A 是合法括号串&#xff0c;则 (A) 是合法括号串。如果 A&#xff0c;B 是合法括号串&#xff0c;则 AB 是合法括号串。 本题中子串与不同的子串的定义如下&#xff…

vscode颜色主题插件one dark Pro安装

1.点击扩展图标→搜索“one dark Pro”→第一个点击安装 2.安装成功后&#xff0c;不要忘了点击设置颜色主题 3.看下效果&#xff1a;

【日积月累】sql执行语句优化

目录 sql执行语句优化 1.前言2.sql执行语句优化2.1语句注意类1.避免使用 * 查询(全表查询)2.限制查询返回数3.小数据集驱动大数据集4.group by 优化5.尽量使用数值替代字符串类型6.使用varchar代替char7.批量插入性能提升 3.误操作导致索引失效1.避免查询条件字符串没有加2.避…

JVS低代码和智能BI(自助式数据分析)12.19更新功能说明

低代码更新功能 新增: 1、表单组件&#xff1a;标题、分割线、按钮等非数据组件增加小程序端隐藏设置&#xff1b; 隐藏设置允许开发者对表单组件中的非数据组件进行隐藏&#xff0c;例如&#xff0c;可能只想展示表单的部分内容&#xff0c;或者希望在特定条件下显示或隐藏…