一、搭建Spark项目结构
在SparkProject模块的pom.xml文件中增加一下依赖,并等待依赖包下载完毕,如上图。
<!-- Spark及Scala的版本号 -->
<properties>
<scala.version>2.11</scala.version>
<spark.version>2.1.1</spark.version>
</properties>
<!-- Mysql组件
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.7.22.1</version>
</dependency> 的依赖 -->
<!-- Spark各个组件的依赖 -->
<dependencies>
<!-- https://mvnrepository.com/artifact/com.thoughtworks.paranamer/paranamer -->
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.7.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.spark</groupId>-->
<!-- <artifactId>spark-streaming-flume-sink_2.10</artifactId>-->
<!-- <version>1.5.2</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.8</version>
</dependency>
</dependencies>
<!-- 配置maven打包插件及打包类型 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
二、解决无法创建scala文件问题
三、编写LoggerLevel特质
在特质 下增加如下代码
Logger.getLogger("org").setLevel(Level.ERROR)
这个时候需要导包
完整代码如下:
import org.apache.log4j.{Level, Logger}
trait LoggerLevel {
Logger.getLogger("org").setLevel(Level.ERROR)
}
四、编写getLocalSparkSession方法
以下是完整代码:
import org.apache.spark.sql.SparkSession
object SparkUnit {
/**
* 一个class参数
**/
def getLocalSparkSession(appName: String): SparkSession = {
SparkSession.builder().appName(appName).master("local[2]").getOrCreate()
}
def getLocalSparkSession(appName: String, support: Boolean): SparkSession = {
if (support) SparkSession.builder().master("local[2]").appName(appName).enableHiveSupport().getOrCreate()
else getLocalSparkSession(appName)
}
def getLocalSparkSession(appName: String, master: String): SparkSession = {
SparkSession.builder().appName(appName).master(master).getOrCreate()
}
def getLocalSparkSession(appName: String, master: String, support: Boolean): SparkSession = {
if (support) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
else getLocalSparkSession(appName, master)
}
def stopSpark(ss: SparkSession) = {
if (ss != null) {
ss.stop()
}
}
}