Spark Shell的简单使用

news2024/11/23 14:50:54

简介

        Spark shell是一个特别适合快速开发Spark原型程序的工具,可以帮助我们熟悉Scala语言。即使你对Scala不熟悉,仍然可以使用这个工具。Spark shell使得用户可以和Spark集群交互,提交查询,这便于调试,也便于初学者使用Spark。前一章介绍了运行Spark实例之前的准备工作,现在你可以开启一个Spark shell,然后用下面的命令连接你的集群:

spark-shell  spark://vm02:7077

格式:spark-shell  spark://host:port, 可以进入spark集群的任意一个节点
默认情况是进入到一个scala语言环境的一个交互窗口。

[hadoop@vm03 bin]$ spark-shell  spark://vm02:7077
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/21 20:06:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://vm03:4040
Spark context available as 'sc' (master = local[*], app id = local-1703160374523).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/
         
Using Scala version 2.12.18 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

以上进入spark交互窗口中,输出一些日志信息,包含指定APP ID信息。

        master = local[*], app id = local-1703160374523

local[*] 是一种运行模式,用于指定 Spark 应用程序在本地模式下运行,而 * 表示 Spark 应该使用所有可用的 CPU 核心。如果需要使用多线程运行模式需要指定运行的线程数量local[N].

加载一个简单的text文件

        在服务器上随便创建一个txt文件用于做演示

        随便造数据如下:

[hadoop@vm02 ~]$ vim text.txt

Name, Age, City, Occupation, Salary
John, 25, New York, Engineer, 80000
Alice, 30, San Francisco, Data Scientist, 90000
Bob, 28, Los Angeles, Software Developer, 85000
Eva, 22, Chicago, Student, 0
Michael, 35, Boston, Manager, 100000
Olivia, 29, Seattle, Designer, 95000
David, 31, Austin, Analyst, 88000
Sophia, 26, Denver, Teacher, 75000
Daniel, 33, Miami, Doctor, 120000
Emma, 27, Atlanta, Nurse, 70000
William, 32, Houston, Researcher, 95000
Ava, 24, Phoenix, Artist, 78000
James, 29, San Diego, Programmer, 92000
Grace, 28, Portland, Writer, 86000
Jackson, 30, Nashville, Musician, 110000
Lily, 26, Minneapolis, Chef, 89000
Ethan, 35, Detroit, Entrepreneur, 130000
Chloe, 23, Philadelphia, Student, 0
Logan, 31, Pittsburgh, Engineer, 98000
Harper, 27, Charlotte, Manager, 105000
Aiden, 28, Las Vegas, Developer, 90000
Mia, 25, Dallas, Scientist, 95000
Lucas, 30, San Antonio, Designer, 85000
Evelyn, 29, Raleigh, Teacher, 78000
Noah, 34, Orlando, Doctor, 115000
Amelia, 26, Sacramento, Analyst, 92000
Sophie, 32, Tampa, Nurse, 75000
Owen, 28, St. Louis, Researcher, 98000
Isabella, 31, Kansas City, Writer, 86000

使用spark-shell交互页面,进行读取该文件内容。

scala> val infile = sc.textFile("file:/home/hadoop/text.txt")
infile: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/text.txt MapPartitionsRDD[1] at textFile at <console>:23

val infile = sc.textFile("/home/hadoop/text.txt")

        这段代码的目的是读取指定路径下的文本文件,创建一个Spark RDD(infile),该RDD包含文件中的每一行作为一个元素。这是在Spark中处理文本数据的一种常见方式。将text.txt文件中的每行作为一个RDD(Resilient Distributed Datasets)中的单独元素加载到Spark中,并返回一个名为infile的RDD。

       多副本范例

        注意当你连接到Spark的master之后,若集群中没有分布式文件系统,Spark会在集群中每一台机器上加载数据,所以要确保集群中的每个节点上都有完整数据。通常可以选择把数据放到HDFS、S3或者类似的分布式文件系统去避免这个问题。在本地模式下,可以将文件从本地直接加载,例如
        sc.textFile([filepah]),想让文件在所有机器上都有备份,请使用SparkContext类中的addFile函数,代码如下:        

import org.apache.spark.SparkFiles;
val file =sc.addFile("file:/home/hadoop/text.txt")
val inFile=sc.textFile(SparkFiles.get("text.txt"))

         addFile可以把文件分发到各个worker当中,然后worker会把文件存放在临时目录下。之后可以通过SparkFiles.get()获取文件

import org.apache.spark.SparkFiles


// 获取文件在工作节点上的本地路径
val localFilePath = SparkFiles.get("text.txt")

// 打印路径
println(s"File is distributed to: $localFilePath")

        在其他节点,可以通过  SparkFiles的get()函数获取其存储路径

         文件内容读取范例

        在读取文件的时候,需要所有节点均存在该文件,不然后报错文件不存在,本spark基于hadoop for hdfs的分布式文件系统进行演练,首先需要将文件上传到hdfs文件系统中去


[hadoop@vm02 ~]$ hdfs dfs -mkdir /hadoop 
[hadoop@vm02 ~]$ hdfs dfs -ls /
Found 3 items
drwxr-xr-x   - hadoop supergroup          0 2023-12-21 22:31 /hadoop
drwxr-xr-x   - hadoop supergroup          0 2023-12-18 10:06 /hbase
drwxr-xr-x   - hadoop supergroup          0 2023-11-28 09:33 /home
[hadoop@vm02 ~]$ hdfs dfs -put /home/hadoop/text.txt  /hadoop/
[hadoop@vm02 ~]$ hdfs dfs -ls /hadoop 
Found 1 items
-rw-r--r--   3 hadoop supergroup       1119 2023-12-21 22:31 /hadoop/text.txt

将文件上传到hdfs中去,使用first进行查看文件内容表头信息

import org.apache.spark.SparkFiles; 
val infile = sc.textFile("hdfs://vm02:8020/hadoop/text.txt") 
infile.first() 

这里的8020是hdfs的rpc端口。 

spark-shell的逻辑回归 

        在 Spark 中,逻辑回归是一种用于二分类问题的机器学习算法。尽管它的名字中包含"回归",但实际上它是一种分类算法,用于预测一个二元目标变量的概率。


scala> import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegression

scala> import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.VectorAssembler

scala> import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.{SparkSession, DataFrame}

scala> 

scala> 

scala> val spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
23/12/22 00:15:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@13f05e8e



scala> val data = Seq(
     |   (1.0, 0.1, 0.5),
     |   (0.0, 0.2, 0.6),
     |   (1.0, 0.3, 0.7),
     |   (0.0, 0.4, 0.8)
     | )
data: Seq[(Double, Double, Double)] = List((1.0,0.1,0.5), (0.0,0.2,0.6), (1.0,0.3,0.7), (0.0,0.4,0.8))

scala> 

scala> val columns = Seq("label", "feature1", "feature2")
columns: Seq[String] = List(label, feature1, feature2)

scala> 

scala> val df: DataFrame = data.toDF(columns: _*)
df: org.apache.spark.sql.DataFrame = [label: double, feature1: double ... 1 more field]

scala> df.show()
+-----+--------+--------+
|label|feature1|feature2|
+-----+--------+--------+
|  1.0|     0.1|     0.5|
|  0.0|     0.2|     0.6|
|  1.0|     0.3|     0.7|
|  0.0|     0.4|     0.8|
+-----+--------+--------+



scala> val assembler = new VectorAssembler()
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_dc7bc810fe30, handleInvalid=error

scala>   .setInputCols(Array("feature1", "feature2"))
res1: assembler.type = VectorAssembler: uid=vecAssembler_dc7bc810fe30, handleInvalid=error, numInputCols=2

scala>   .setOutputCol("features")
res2: res1.type = VectorAssembler: uid=vecAssembler_dc7bc810fe30, handleInvalid=error, numInputCols=2

scala> 

scala> val assembledData = assembler.transform(df)
assembledData: org.apache.spark.sql.DataFrame = [label: double, feature1: double ... 2 more fields]

scala> assembledData.show()
+-----+--------+--------+---------+
|label|feature1|feature2| features|
+-----+--------+--------+---------+
|  1.0|     0.1|     0.5|[0.1,0.5]|
|  0.0|     0.2|     0.6|[0.2,0.6]|
|  1.0|     0.3|     0.7|[0.3,0.7]|
|  0.0|     0.4|     0.8|[0.4,0.8]|
+-----+--------+--------+---------+



scala> val lr = new LogisticRegression()
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_29b7d06469ba

scala>   .setLabelCol("label")
res4: org.apache.spark.ml.classification.LogisticRegression = logreg_29b7d06469ba

scala>   .setFeaturesCol("features")
res5: org.apache.spark.ml.classification.LogisticRegression = logreg_29b7d06469ba

scala>   .setMaxIter(10)
res6: res5.type = logreg_29b7d06469ba

scala>   .setRegParam(0.01)
res7: res6.type = logreg_29b7d06469ba



scala> val lrModel = lr.fit(assembledData)
23/12/22 00:15:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_29b7d06469ba, numClasses=2, numFeatures=2



scala> val summary = lrModel.summary
summary: org.apache.spark.ml.classification.LogisticRegressionTrainingSummary = org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummaryImpl@4369db27

scala> println(s"Coefficients: ${lrModel.coefficients}")
Coefficients: [-4.371555225626981,-4.37155522562698]

scala> println(s"Intercept: ${lrModel.intercept}")
Intercept: 3.9343997030642823

scala> println(s"Objective History: ${summary.objectiveHistory.mkString(", ")}")
Objective History: 0.6931471805599453, 0.5954136109155707, 0.5904687934140505, 0.5901819039583514, 0.5901795791081599, 0.5901795782746598

        在进行 拟合模型的时候,会占用较高的内存,如果内存不足,会导致内存溢出而退出spark-shell会话。通过以下命令,增加算子内存

spark-shell --conf spark.executor.memory=4g

但是不能超过可用内存

free -h 

代码含义解释

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.{SparkSession, DataFrame}

此部分导入了必要的Spark MLlib类和Spark SQL类。

val spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

这创建了一个Spark会话,应用程序的名称为"LogisticRegressionExample"。

val data = Seq(
  (1.0, 0.1, 0.5),
  (0.0, 0.2, 0.6),
  (1.0, 0.3, 0.7),
  (0.0, 0.4, 0.8)
)

val columns = Seq("label", "feature1", "feature2")

val df: DataFrame = data.toDF(columns: _*)
df.show()

此部分使用示例数据创建了一个名为df的DataFrame,其中每一行表示一个数据点,具有标签("label")和两个特征("feature1"和"feature2")。show()方法用于显示DataFrame。

val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2"))
  .setOutputCol("features")

val assembledData = assembler.transform(df)
assembledData.show()

使用VectorAssembler将"feature1"和"feature2"列组合成名为"features"的单列。结果的DataFrame存储在assembledData中,并显示出来。

val lr = new LogisticRegression()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxIter(10)
  .setRegParam(0.01)

此部分创建了一个逻辑回归模型(lr)并设置了一些参数,例如标签列,特征列,最大迭代次数(setMaxIter)和正则化参数(setRegParam)。

val lrModel = lr.fit(assembledData)

使用fit方法在组合数据(assembledData)上训练逻辑回归模型。

val summary = lrModel.summary
println(s"Coefficients: ${lrModel.coefficients}")
println(s"Intercept: ${lrModel.intercept}")
println(s"Objective History: ${summary.objectiveHistory.mkString(", ")}")

        此部分输出逻辑回归模型训练的各种结果。显示了系数,截距和训练过程中目标函数的历史记录。summary对象提供了有关训练摘要的其他信息。

这里使用scala 语法相当繁琐,转换为python的语法就会简单很多

python示例

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression



# 创建 Spark 会话
spark = SparkSession.builder \
    .appName("LogisticRegressionExample") \
    .master("spark://10.0.0.102:7077") \
.getOrCreate()

# 创建包含一些示例数据的 DataFrame
data = [
    (1.0, 0.1, 0.5),
    (0.0, 0.2, 0.6),
    (1.0, 0.3, 0.7),
    (0.0, 0.4, 0.8)
]

columns = ["label", "feature1", "feature2"]

df = spark.createDataFrame(data, columns)
df.show()

# 使用 VectorAssembler 将特征列合并成一个特征向量
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
assembledData = assembler.transform(df)
assembledData.show()

# 创建逻辑回归模型
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.01)

# 拟合模型
lrModel = lr.fit(assembledData)

# 查看模型的训练结果
print("Coefficients: {}".format(lrModel.coefficients))
print("Intercept: {}".format(lrModel.intercept))
print("Objective History: {}".format(lrModel.summary.objectiveHistory()))

此时可以登录到spark web上查看任务情况

http://10.0.0.102:8081/

spark web ui 的端口信息可以通过以下方式查看 

ps -ef |grep webui-port

当资源不足时,执行代码过程中没五秒钟会输出一次提示信息(不影响代码执行)

23/12/22 00:54:47 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1333433.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java中创建对象的方式有哪些?】

✅Java中创建对象的方式有哪些&#xff1f; ✅使用New关键字✅使用反射机制✅使用clone方法✅使用反序列化✅使用方法句柄✅ 使用Unsafe分配内存 ✅使用New关键字 这是我们最常见的也是最简单的创建对象的方式&#xff0c;通过这种方式我们还可以调用任意的构造函数 (无参的和有…

Spring Boot学习随笔- 第一个Thymeleaf应用(基础语法th:,request、session作用域取值)

学习视频&#xff1a;【编程不良人】2021年SpringBoot最新最全教程 第十五章、Thymeleaf Thymeleaf是一种现代化的服务器端Java模板引擎&#xff0c;专门用于Web和独立环境。Thymeleaf在有网络和无网络的环境下皆可运行&#xff0c;即可以让美工在浏览器查看页面的静态效果&am…

数组元素反序

和前面的字符串逆向输出有异曲同工之妙 第一位和最后一位交换位置&#xff0c;然后用比大小循环 那么接下来修改一下这个程序&#xff0c;我们接下来解释一下p的概念 画图解释&#xff1a; 在最前面的 定义的时候&#xff0c;我们将p&#xff08;0&#xff09;定义在了1上&…

Ps:直方图 - 统计数据

使用扩展视图或全部通道视图时&#xff0c;直方图 Histogram的下方会显示一组实时统计数据。 提示&#xff1a; 要在直方图面板控制菜单中勾选&#xff08;默认&#xff09;“显示统计数据” Show Statistics。 源 Source --整个图像 Entire Image 默认选项。显示整个图像&am…

Spring 依赖注入概述、使用以及原理解析

前言 源码在我github的guide-spring仓库中&#xff0c;可以克隆下来 直接执行。 我们本文主要来介绍依赖注入的使用示例及其原理 依赖注入 什么是依赖注入 依赖注入&#xff08;Dependency Injection&#xff0c;简称DI&#xff09;是一种设计模式&#xff0c;它用于实现对…

【MySQL学习笔记008】多表查询

1、多表关系 概述&#xff1a;项目开发中&#xff0c;在进行数据库表结构设计时&#xff0c;会根据业务需求及业务模块之间的关系&#xff0c;分析并设计表结构&#xff0c;由于业务之间相互关联&#xff0c;所以各个表结构之间也存在着各种联系&#xff0c;基本上可分为三种&a…

在linux操作系统Centos上安装服务器相关软件

如果您的服务器没有图形界面(GUI),您可以通过命令行(终端)来安装和配置Tomcat、JDK和MySQL等软件。以下是在没有图形界面GHome的 Linux 系统上安装这些软件的基本步骤: 对于CentOS Stream 9,您可以按照以下步骤在命令行上安装Tomcat、JDK 和 MySQL 数据库: 1. 安装JD…

设计模式--迭代器模式

实验18&#xff1a;迭代器模式 本次实验属于模仿型实验&#xff0c;通过本次实验学生将掌握以下内容&#xff1a; 1、理解迭代器模式的动机&#xff0c;掌握该模式的结构&#xff1b; 2、能够利用迭代器模式解决实际问题。 [实验任务]&#xff1a;JAVA和C常见数据结构迭代…

基于遗传算法特征选择及单层感知机模型的IMDB电影评论文本分类案例

基于遗传算法特征选择及单层感知机模型的IMDB电影评论文本分类案例 1.数据载入及处理2.感知机模型建立3.模型训练4.遗传算法进行特征选择注意 5.联系我们 1.数据载入及处理 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dat…

线程的同步与互斥

抢票的例子 竞争过程 进程A被切走 进程B被切走 结论&#xff1a; 互斥 int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr); mutex: 指向要初始化的互斥锁的指针。attr: 用于设置互斥锁属性的指针&#xff0c;通常可以传入 NULL 以使用默认属性…

【贪心】最小生成树Kruskal算法Python实现

文章目录 [toc]问题描述最小生成树的性质证明 Kruskal算法时间复杂性Python实现 个人主页&#xff1a;丷从心 系列专栏&#xff1a;贪心算法 问题描述 设 G ( V , E ) G (V , E) G(V,E)是无向连通带权图&#xff0c; E E E中每条边 ( v , w ) (v , w) (v,w)的权为 c [ v ] …

【图神经网络 · 科研笔记5】异构信息网络,利用注意力选择元路径;利用进化邻域和社群实现自监督动态图嵌入,交叉监督对比学习;近期科研思维导图小汇总;

记录部分科研文献阅读相关内容【划重点】,主题“图神经网络”,仅学习使用。 🎯作者主页: 追光者♂🔥 🌸个人简介: 📝[1] CSDN 博客专家📝 🏆[2] 人工智能领域优质创作者🏆 🌟[3] 2023年城市之星领跑者TOP1(哈尔滨)🌿 🌿[4] 2022年度…

maven下载jar包失败

配置国内镜像 设置国内的仓库,比如: <!--阿里仓库--><mirror><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><mirrorOf>central</mirrorOf></mirror>…

CnosDB如何确保多步操作的最终一致性?

背景 在时序数据库中&#xff0c;资源的操作是一个复杂且关键的任务。这些操作通常涉及到多个步骤&#xff0c;每个步骤都可能会失败&#xff0c;导致资源处于不一致的状态。例如&#xff0c;一个用户可能想要在CnosDB集群中删除一个租户&#xff0c;这个操作可能需要删除租户…

Alnet网络分析与demo实例

参考自 up主的b站链接&#xff1a;霹雳吧啦Wz的个人空间-霹雳吧啦Wz个人主页-哔哩哔哩视频这位大佬的博客 Fun_机器学习,pytorch图像分类,工具箱-CSDN博客 数据集下载 http://download.tensorflow.org/example_images/flower_photos.tgz 包含 5 中类型的花&#xff0c;每种…

嵌入式开发——PWM高级定时器

学习目标 加强掌握PWM开发流程理解定时器与通道的关系掌握多通道配置策略掌握互补PWM配置策略掌握定时器查询方式掌握代码抽取优化策略掌握PWM调试方式学习内容 需求 点亮8个灯,采用pwm的方式。 定时器 通道 <

Netty-4-网络编程模式

我们经常听到各种各样的概念——阻塞、非阻塞、同步、异步&#xff0c;这些概念都与我们采用的网络编程模式有关。 例如&#xff0c;如果采用BIO网络编程模式&#xff0c;那么程序就具有阻塞、同步等特质。 诸如此类&#xff0c;不同的网络编程模式具有不同的特点&#xff0c…

【大数据】NiFi 的基本使用

NiFi 的基本使用 1.NiFi 的安装与使用1.1 NiFi 的安装1.2 各目录及主要文件 2.NiFi 的页面使用2.1 主页面介绍2.2 面板介绍 3.NiFi 的工作方式3.1 基本方式3.2 选择处理器3.3 组件状态3.4 组件的配置3.4.1 SETTINGS&#xff08;通用配置&#xff09;3.4.2 SCHEDULING&#xff0…

博弈论:理解决策背后的复杂动态

1.基本概念 博弈论是一门研究具有冲突和合作元素决策制定的数学理论。它不仅适用于经济学&#xff0c;还广泛应用于政治学、心理学、生物学等领域。博弈论的核心在于分析参与者&#xff08;称为“玩家”&#xff09;在特定情境下的策略选择&#xff0c;以及这些选择如何影响最…

工资发放 C语言xdoj92

题目描述&#xff1a; 公司财务要发工资现金&#xff0c;需要提前换取100元、50元、20元、10元、5元和1元的人民币&#xff0c; 请输入工资数&#xff0c;计算张数最少情况下&#xff0c;各自需要多少张。 输入格式&#xff1a;共一行&#xff0c;输入一个正整数。 输出格式&am…