Spark的学习-02

news2024/11/26 8:43:47

Spark Standalone集群的安装

 
 

架构:普通分布式主从架构 主:Master:管理节点:管理从节点、接客、资源管理和任务 调度,等同于YARN中的ResourceManager 从:Worker:计算节点:负责利用自己节点的资源运行主节点 分配的任务 功能:提供分布式资源管理和任务调度,基本上与YARN是一致的

pi的测试

 
 

/opt/installs/spark/bin/spark-submit --master yarn /opt/installs/spark/examples/src/main/python/pi.py 200

上面我们已经在bigdata01上面安装好了Anaconda ,所以接下来不需要再安装,分发到bigdata02、bidata03即可

#上传,或者同步:
xsync.sh /opt/modules/Anaconda3-2021.05-Linux-x86_64.sh   #(分发给02、03)
# 添加执行权限
chmod u+x Anaconda3-2021.05-Linux-x86_64.sh  #(不添加也行, 直接执行在目录下执行即可)
# 执行
sh ./Anaconda3-2021.05-Linux-x86_64.sh  

# 过程
#第一次:【直接回车,然后按q】
   Please, press ENTER to continue
   >>>
#第二次:【输入yes】
 Do you accept the license terms? [yes|no]
 [no] >>> yes
#第三次:【输入解压路径:/opt/installs/anaconda3】
 [/root/anaconda3] >>> /opt/installs/anaconda3
 #第四次:【输入yes,是否在用户的.bashrc文件中初始化
Anaconda3的相关内容】
 Do you wish the installer to initialize  Anaconda3
   by running conda init? [yes|no]
   [no] >>> yes
   
  ----- 步骤和上面一样
  
  刷新环境变量:
# 刷新环境变量
source /root/.bashrc
# 激活虚拟环境,如果需要关闭就使用:conda deactivate
conda activate
配置环境变量:
# 编辑环境变量
vi /etc/profile
# 添加以下内容
# Anaconda Home
export ANACONDA_HOME=/opt/installs/anaconda3
export PATH=$PATH:$ANACONDA_HOME/bin
制作软链接:
# 刷新环境变量
source /etc/profile

# 创建软连接
ln -s /opt/installs/anaconda3/bin/python3 /usr/bin/python3

重新再解压spark安装包
# 解压安装
cd /opt/modules
tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /opt/installs
# 重命名
cd /opt/installs
mv spark-3.1.2-bin-hadoop3.2 spark-standalone

我们上面安装好的 Spark的名字是 Spark-local ,上面也创建了软连接,所以现在需要把上面的软链接删除

# 删除上面创建的软连接
rm -rf spark  
# 重新建立新的
ln -s spark-standalone spark

1、修改 spark-env.sh配置文件:

        先启动hdfs

        接着在hdfs上创建目录

# 创建程序运行日志的存储目录
hdfs dfs -mkdir -p /spark/eventLogs/

cd /opt/installs/spark/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
# 22行:申明JVM环境路径以及Hadoop的配置文件路径
export JAVA_HOME=/opt/installs/jdk
export HADOOP_CONF_DIR=/opt/installs/hadoop/etc/hadoop
# 60行左右
export SPARK_MASTER_HOST=bigdata01 # 主节点所在的地址
export SPARK_MASTER_PORT=7077 #主节点内部通讯端口,用于接收客户端请求
export SPARK_MASTER_WEBUI_PORT=8080 #主节点用于供外部提供浏览器web访问的端口
export SPARK_WORKER_CORES=1     # 指定这个集群总每一个从节点能够使用多少核CPU
export SPARK_WORKER_MEMORY=1g   #指定这个集群总每一个从节点能够使用多少内存
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8081
export SPARK_DAEMON_MEMORY=1g  # 进程自己本身使用的内存
export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://bigdata01:9820/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true"
# Spark中提供了一个类似于jobHistoryServer的进程,就叫做HistoryServer, 用于查看所有运行过的spark程序

2、spark-defaults.conf:Spark属性配置文件

mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf

# 末尾
spark.eventLog.enabled           true
spark.eventLog.dir              hdfs://bigdata01:9820/spark/eventLogs
spark.eventLog.compress              true

3、workers:从节点地址配置文件

mv workers.template workers
vim workers
# 末尾处删掉localhost,添加以下内容
bigdata01
bigdata02
bigdata03

4、log4j.properties:日志配置文件

mv log4j.properties.template log4j.properties
vim log4j.properties
# 19行:修改日志级别为WARN
log4j.rootCategory=WARN, console
#  log4j的5种 级别  debug --> info --> warn --error -->fatal

接着在第二台和第三台上,创建软链接

# 同步新压缩好的spark-standalone
xsync.sh /opt/installs/spark-standalone/
# 同步软连接
xsync.sh /opt/installs/spark

启动:

启动master:
cd /opt/installs/spark
sbin/start-master.sh
启动所有worker:
sbin/start-workers.sh
如果你想启动某一个worker
sbin/start-worker.sh

启动日志服务:
sbin/start-history-server.sh

要想关闭某个服务,将start换为stop

standalone集群启动情况:

master页面:

日志监控页面(18080):

4040和8080端口页面的区别

当有任务进行中的时候,就可以启动4040端口,若此任务并没有执行完毕,集群中又启动了新的端口,就会再启动一个4041端口.....可以一直累加下去

4040端口中任务的执行一结束,就无法再运行此窗口

当启动master等的时候,就会启动8080端口

所以我们在本地模式的时候,是无法启动8080端口的,只能启动4040

此处运行的程序,其实就是4040端口中正在执行的进程,当这个正在running的任务结束后,显示执行完成时,4040端口就打不开了

spark三种模式下4040与8080端口的启动情况:

本地模式:只能启动4040端口 (因为8080端口要启动master才能看到)

standalone(集群)模式: 可以看到4040还可以看到8080端口

Yarn模式:4040、8080都看不到

Spark 4040端口各个模块的作用

Job:

各个界面作用: Job:显示当前这个程序的所有Job,一个程序可以有多个Job Spark中不是所有的代码都会触发Job的产生和运行 所有RDD的转换是不会立即产生job,运行Task任务的,这种模式称为Lazy模式:避免在内存中构建RDD,但是你不用只有遇到了需要使用数据的代码操作才会产生job,触发Task任务的运行 能触发job任务生成的目前有: saveAsTextFile foreach

Stages

 
 

Stages:显示当前这个程序的所有Stage,一个Job可以有多个 Stage

Stage 可以理解为多个算子组成的阶段,到底有多少个Stage,取决于算子是否会触发shuffle过程。假如有两个触发shuffle过程的算子,整个程序可以切为三个阶段。

当一个Job被触发运行的时候,Spark底层会根据回溯算法构建这个job的执行计划图,即DAG图

每个Job都会有1个DAG图,在构建的时候会根据计算过程中是否要产生shuffle来划分Stage 不产生Shuffle的操作就在同一个Stage中执行,产生Shuffle的操作,会传递到另外一个Stage中执行 最终每个Stage中的操作会转换为对应的Task来执行

每个黑点表示一个RDD

每个矩形框中的RDD的转换都是在内存中完成的

曲线代表经过了Shuffle,灰色代表没有执行,因为之前执行的

Executors

 
 

显示当前这个程序的运行进程的信息

每个Spark程序都由两种进程组成:一个Driver和多个Executors

Driver进程:负责解析程序,构建DAG图,构建Stage,构建、调度、监控Task任务的运行 Executor进程:负责运行程序中的所有Task任务

Storage:显示当前这个程序在内存缓存的数据信息 。

Environment:显示当前这个程序所有的配置信息。

Spark-submit提交

# 提交程序的语法
# spark-submit [可选的选项] Python文件 Python文件中用到的参数
spark-submit --master local[2] / spark://bigdata01:7077 / yarn \
……
hdfs://bigdata01:9820/spark/app/pyspark_core_word_args.py /spark/wordcount/input /spark/wordcount/output

spark-submit中各个参数的意义

其实就是在将 提交命令的 [options] 可以写什么。

--master:用于指定程序运行的模式,5种模式,本地模式、
    Standalone、yarn、Mesos、K8s
    本地模式:--master local
    Standalone模式:--master spark://master:7077
    YARN模式:--master yarn
    作用等同于代码中:setMaster
--deploy-mode:用于指定Driver进程运行位置 【重点,后面展开讲】
--name:用于指定程序的名称,作用等同于代码中:setAppName
--jars:用于指定一些额外的jar包,例如读写MySQL时候需要用到MySQL的驱动包
--conf:用于指定当前程序运行的额外的一些配置,作用等同于代码中:set

Driver资源选项

Driver资源选项:主要用于构建一个非RDD的操作
--driver-memory:指定Driver进程能够使用的内存大小,默认是1G
--driver-cores:指定Driver进程能够使用的CPU核数,默认是1Core
--supervise:指定如果Driver故障,就自动重启

executor可以使用的参数:

运行这个程序的一个进程
需要的资源,资源都是来自于从节点
 --executor-cores 4
 --executor-memory 16
参数解释:
--executor-memory:指定每个Executor能够使用多少内存
--executor-cores:指定每个Executor能够使用多少CPU
--total-executor-cores:Standalone集群模式,指定所有Executor总共使用的CPU核数,用于间接指定Executor的个数
--num-executors:YARN集群模式,直接指定Executor的个数
--queue:指定提交程序到哪个队列中运行

加载顺序:优先级:代码中配置【set】 > 参数选项【--conf】 > 配置文件【公共配置:spark-defualt.conf】

实战测试:

将下面代码在pycharm中写好,然后直接拖拽到虚拟机中指定的路径下

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
import os
import sys

"""
-------------------------------------------------
   Description :        TODO:用于实现词频统计
   SourceFile  :        04.pyspark_core_wordcount_hdfs_args
-------------------------------------------------
"""

if __name__ == '__main__':
    # todo:0-设置系统环境变量
    # os.environ['JAVA_HOME'] = 'D:/jdk1.8.0_241'
    # os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.0'
    # os.environ['PYSPARK_PYTHON'] = 'D:/Anaconda/python.exe'
    # os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/Anaconda/python.exe'
    # os.environ['HADOOP_USER_NAME'] = 'root'

    # todo:1-构建SparkContext
    # 甚至 任务的名字都可以不写,让提交任务的时候指定
    conf = SparkConf().setAppName("SparkSubmitApp")
        # .setMaster("local[2]")\

    sc = SparkContext(conf=conf)

    # todo:2-数据处理:读取、转换、保存
    # step1: 读取数据:SparkContext对象负责读取文件,用传递的第二个参数作为程序的输入地址
    input_rdd = sc.textFile(sys.argv[1])
    # 输出第一行
    # print(input_rdd.first())
    # 打印总行数
    # print(input_rdd.count())

    # step2: 处理数据
    rs_rdd = input_rdd\
            .filter(lambda line: len(line.strip()) > 0)\
            .flatMap(lambda line: line.strip().split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda tmp,item: tmp+item)

    # step3: 保存结果
    # 打印结果
    rs_rdd.foreach(lambda x: print(x))
    # 结果保存到文件中:路径不能提前存在,将第二个参数作为输出路径
    rs_rdd.saveAsTextFile(sys.argv[2])

    # todo:3-关闭SparkContext
    sc.stop()

开始编写命令,提交任务

注意:我们前面已经将spark的软连接链接到了 standalone(集群)上,所以需要先把之前的软连接删除掉,重现创建新的,指向本地

# 删除原来的软连接
rm -rf /opt/installs/spark
# 创建新的软连接指向
ln -s /opt/installs/spark-local /opt/installs/spark

本地(local)

spark-submit \
--master local[2] \
/home/_pytSparkDemo04-yuancheng.py \
/home/data.txt \
/home/output02

和上面一样,也需要将软连接再连回来

# 删除原来的软连接
rm -rf /opt/installs/spark
# 创建新的软连接指向
ln -s /opt/installs/spark-standalone /opt/installs/spark

集群(standalone)

spark-submit \
--master spark://bigdata01:7077 \
/home/_pytSparkDemo04-yuancheng.py \
hdfs://bigdata01:9820/spark/wordcount/input \
hdfs://bigdata01:9820/spark/wordcount/jiqun01

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237615.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux相关概念和易错知识点(20)(dentry、分区、挂载)

目录 1.dentry (1)路径缓存的原因 (2)dentry的结构 ①多叉树结构 ②file和dentry之间的联系 ③路径概念存在的意义 2.分区 (1)为什么要确认分区 (2)挂载 ①进入分区 ②被挂…

《Linux运维总结:基于银河麒麟V10+ARM64架构CPU部署redis 6.2.14 TLS/SSL哨兵集群》

总结:整理不易,如果对你有帮助,可否点赞关注一下? 更多详细内容请参考:《Linux运维篇:Linux系统运维指南》 一、简介 Redis 哨兵模式是一种高可用性解决方案,它通过监控 Redis 主从架构,自动执行故障转移,从而确保服务的连续性。哨兵模式的核心组件包括哨兵(Sentine…

vue3实现一个无缝衔接、滚动平滑的列表自动滚屏效果,支持鼠标移入停止移出滚动

文章目录 前言一、滚动元素相关属性回顾一、实现分析二、代码实现示例:2、继续添加功能,增加鼠标移入停止滚动、移出继续滚动效果2、继续完善 前言 列表自动滚屏效果常见于大屏开发场景中,本文将讲解用vue3实现一个无缝衔接、滚动平滑的列表自…

腾讯云nginx SSL证书配置

本章教程,记录在使用腾讯云域名nginx证书配置SSL配置过程。 一、nginx配置 域名和证书,替换成自己的即可。证书文件可以自定义路径位置。服务器安全组或者防火墙需要开放80和443端口。 server {#SSL 默认访问端口号为 443listen 443 ssl; #请填写绑定证书的域名server_name c…

RabbitMQ的DLX(Dead-Letter-Exchange 死信交换机,死信交换器,死信邮箱)(重要)

RabbitMQ的DLX 1、RabbitMQ死信队列2、代码示例2.1、队列过期2.1.1、配置类RabbitConfig(关键代码)2.1.2、业务类MessageService2.1.3、配置文件application.yml2.1.4、启动类2.1.5、配置文件2.1.6、测试 2.2、消息过期2.2.1、配置类RabbitConfig2.2.2、…

陪诊问诊APP开发实战:基于互联网医院系统源码的搭建详解

时下,开发一款功能全面、用户体验良好的陪诊问诊APP成为了医疗行业的一大热点。本文将结合互联网医院系统源码,详细解析陪诊问诊APP的开发过程,为开发者提供实用的开发方案与技术指导。 一、陪诊问诊APP的背景与功能需求 陪诊问诊APP核心目…

FreeRTOS 21:递归互斥信号量

递归信号量,见文知义,递归嘛,就是可以重复获取调用的,本来按照信号量的特性,每获取一次可用信号量个数就会减少一个,但是递归则然, 对于已经获取递归互斥量的 任务可以重复获取该递归互斥量&…

算法|牛客网华为机试41-52C++

牛客网华为机试 上篇:算法|牛客网华为机试21-30C 文章目录 HJ41 称砝码HJ42 学英语HJ43 迷宫问题HJ44 SudokuHJ45 名字的漂亮度HJ46 截取字符串HJ48 从单向链表中删除指定值的节点HJ50 四则运算HJ51 输出单向链表中倒数第k个结点HJ52 计算字符串的编辑距离 HJ41 称砝…

mysql5安全审计

安装插件 插件需要严格与数据库版本适配,不然安装过程中会出现问题 解压插件 cd 插件所在路径unzip audit-plugin-mysql-5.7-1.1.7-921-linux-x86_64.zip#查看mysql默认插件目录 mysql> SHOW GLOBAL VARIABLES LIKE plugin_dir;# 将插件移动到mysql默认插件目…

MySQL 安装与配置

MySQL 安装与配置 MySQL 安装 MySQL 一般分为社区版和商业版,我们使用的是社区版(因为免费)。MySQL 安装的教程在网上有很多,此处就不再进行进行赘述,这里推荐两篇文章:如何在 Windows11 中安装 MySQL 8.…

Flink安装和Flink CDC实现数据同步

一,Flink 和Flink CDC 1, Flink Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。 中文文档 Apache Flink Documentation | Apache Flink 官方文档 :https://flink.apache.org Flink 中文社区…

车机版 Android Audio 框架笔记

车机版Android Audio 框架涉及的知识点很多,在工作中涉及的功能板块也及其繁杂,后面我会根据工作中的一些实际遇到的实例,逐步拆解 Android Audio的知识点,这里从网上整理了一些思维导图,可以做为未来的一个研究方向&a…

ubuntu 22.04 镜像源更换

双11抢了个云服务器,想要整点东西玩玩,没想到刚上来就不太顺利 使用sudo apt update更新软件,然后发生了如下报错 W: Failed to fetch http://mirrors.jdcloudcs.com/ubuntu/dists/jammy/InRelease 理所当然想到可能是镜像源连接不是很好&…

浅谈Agent

目录 什么是大模型 Agent ? 大模型Agent 有哪些部分组成? 规划(Planning) Planning类型 不依赖反馈的计划 基于反馈的计划 拆解子目标和任务分解方法 COT TOT GOT LLMP 反思和完善 ReAct(融合推理与执行的能力) Reflexion(动态…

NAT网络工作原理和NAT类型

NAT基本工作流程 通常情况下,某个局域网中,只有路由器的ip是公网的,局域网中的设备都是内网ip,内网ip不具备直接与外部应用通信的能力。 处于内网的设备如何借助NAT来实现访问外网的应用? 对于开启了NAT功能的局域网…

Jenkins插件使用问题总结

Git Push插件 插件介绍 主要是用于git推送代码到远程仓库中使用,插件地址 pipeline中使用 官方说明中只有一句代码gitPush(gitScm: scm, targetBranch: env.BRANCH_NAME, targetRepo: origin) 流水线语法中也做的不齐全所以一开始我老是设置错,导致代…

GPT-5 终于来了 —— 人们的预期与现实

高智慧人工智能的两面性,利用AI和被AI利用 前言:人工智能的热度持续升温,似乎已无处不在,但大家对它的感知却并不显著。这种状况有点像美国 2024 年的总统大选,投票前人们彼此不清楚支持谁,直到最终计票才发…

微服务透传日志traceId

问题 在微服务架构中,一次业务执行完可能需要跨多个服务,这个时候,我们想看到业务完整的日志信息,就要从各个服务中获取,即便是使用了ELK把日志收集到一起,但如果不做处理,也是无法完整把一次业…

Matlab实现鲸鱼优化算法优化随机森林算法模型 (WOA-RF)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1内容介绍 鲸鱼优化算法(Whale Optimization Algorithm, WOA)是受座头鲸捕食行为启发而提出的一种新型元启发式优化算法。该算法通过模拟座头鲸围绕猎物的螺旋游动和缩小包围圈的方式,在…

【学习笔记】网络设备(华为交换机)基础知识 10 —— 信息中心 ① 简介

提示:学习华为交换机信息中心的概述( 包括信息中心的概念、功能、以及信息的分类、分级、和输出 ) ;还包括信息中心常用的命令 ( 使能信息中心、命名信息通道、配置信息过滤、清除统计信息、查看信息中心相关信息的命令…