Spark 从入门到精通
环境搭建
准备工作
创建安装目录
mkdir /opt/soft
cd /opt/soft
下载scala
wget https://downloads.lightbend.com/scala/2.13.10/scala-2.13.10.tgz -P /opt/soft
解压scala
tar -zxvf scala-2.13.10.tgz
修改scala目录名称
mv scala-2.13.10 scala-2
下载spark
wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz -P /opt/soft
解压spark
tar -zxvf spark-3.4.0-bin-hadoop3-scala2.13.tgz
修改目录名称
mv spark-3.4.0-bin-hadoop3-scala2.13 spark3
修改环境遍历
vim /etc/profile
export JAVA_HOME=/opt/soft/jdk8
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export ZOOKEEPER_HOME=/opt/soft/zookeeper
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_INSTALL=${HADOOP_HOME}
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HIVE_HOME=/opt/soft/hive3
export HCAT_HOME=/opt/soft/hive3/hcatalog
export SQOOP_HOME=/opt/soft/sqoop-1
export FLUME_HOME=/opt/soft/flume
export HBASE_HOME=/opt/soft/hbase2
export PHOENIX_HOME=/opt/soft/phoenix
export SCALA_HOME=/opt/soft/scala-2
export SPARK_HOME=/opt/soft/spark3
export SPARKPYTHON=/opt/soft/spark3/python
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin:$HBASE_HOME/bin:$PHOENIX_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SPARKPYTHON
source /etc/profile
Local模式
scala java
启动
spark-shell
页面地址:http://spark01:4040
![sparkl local spark-shell
退出
:quit
pyspark
启动
pyspark
页面地址:http://spark01:4040
退出
quit() or Ctrl-D
本地模式提交应用
在spark目录下执行
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
- –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
- –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
- spark-examples_2.13-3.4.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
- 数字10表示程序的入口参数,用于设定当前应用的任务数量
Standalone模式
编写核心配置文件
cont目录下
cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native
export SPARK_MASTER_HOST=spark01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03
配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc
在其它节点刷新环境遍历
source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui
http://spark01:6633
http://spark01:18080
提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
HA模式
编写核心配置文件
cont目录下
cd /opt/soft/spark3/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/opt/soft/jdk8
export HADOOP_HOME=/opt/soft/hadoop3
export HADOOP_CONF_DIR=/opt/soft/hadoop3/etc/hadoop
export JAVA_LIBRAY_PATH=/opt/soft/hadoop3/lib/native
SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=spark01:2181,spark02:2181,spark03:2181
-Dspark.deploy.zookeeper.dir=/spark3"
export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_CORES=4
export SPARK_MASTER_WEBUI_PORT=6633
hdfs dfs -mkdir /spark3
编辑slaves
cp workers.template workers
vim workers
spark01
spark02
spark03
配置历史日志
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://lihaozhe/spark-log
hdfs dfs -mkdir /spark-log
vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://lihaozhe/spark-log"
修改启动文件名称
mv sbin/start-all.sh sbin/start-spark.sh
mv sbin/stop-all.sh sbin/stop-spark.sh
分发搭配其他节点
scp -r /opt/soft/spark3 root@spark02:/opt/soft
scp -r /opt/soft/spark3 root@spark03:/opt/soft
scp -r /etc/profile root@spark02:/etc
scp -r /etc/profile root@spark03:/etc
在其它节点刷新环境遍历
source /etc/profile
启动
start-spark.sh
start-history-server.sh
webui
http://spark01:6633
http://spark01:18080
提交作业到集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark01:7077 \
./examples/jars/spark-examples_2.13-3.4.0.jar \
10
提交作业到Yarn
bin/spark-submit --master yarn \
--class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.13-3.4.0.jar 10
spark-code
spark-core
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lihaozhe</groupId>
<artifactId>spark-code</artifactId>
<version>1.0.0</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2010</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.13.10</scala.version>
<park-core_2.13.version>3.4.0</park-core_2.13.version>
<hadoop.version>3.3.5</hadoop.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<lombok.version>1.18.26</lombok.version>
<lombok.version>1.18.26</lombok.version>
<java-testdata-generator.version>1.1.2</java-testdata-generator.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-tools.testing</groupId>
<artifactId>specs_2.10</artifactId>
<version>1.6.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.13</artifactId>
<version>3.2.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.binarywang</groupId>
<artifactId>java-testdata-generator</artifactId>
<version>${java-testdata-generator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<!--spark core-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.6</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
hdfs-conf
在 resources 目录下存放 hdfs 核心配置文件 core-site.xml 和hdfs-site.xml
rdd
数据集方式构建RDD
public class JavaDemo01 {
public static void main(String[] args) {
String appName = "rdd";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
// 借助并行数据集 Parallelized Collections 构建 RDD
JavaRDD<Integer> distData = sc.parallelize(data);
}
}
}
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
val appName: String = "rdd"
// val conf = new SparkConf().setAppName(appName).setMaster("local")
// spark基础配置
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5)
// 借助并行数据集 Parallelized Collections 构建 RDD
val distData = sc.parallelize(data)
}
}
文件方式构建RDD
public static void main(String[] args) {
String appName = "rdd";
// SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
// spark基础配置
SparkConf conf = new SparkConf().setAppName(appName);
// 本地运行
conf.setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// 借助文件 构建 RDD
JavaRDD<String> lines = sc.textFile("hdfs://lihaozhe/data/words.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
System.out.println(totalLength);
}
}
}
object ScalaDemo02 {
def main(args: Array[String]): Unit = {
val appName: String = "rdd"
// val conf = new SparkConf().setAppName(appName).setMaster("local")
// spark基础配置
val conf = new SparkConf().setAppName(appName)
// 本地运行
conf.setMaster("local")
// 构建 SparkContext spark 上下文
val sc = new SparkContext(conf)
// 借助HDFS文件 RDD
val distFile = sc.textFile("hdfs://lihaozhe/data/words.txt")
// 遍历输出
distFile.foreach(println)
}
}