目录
一、Spark Streaming概述
二、添加依赖
三、配置log4j
1.依赖下载好后打开IDEA最左侧的外部库
2.找到spark-core
3.找到apache.spark目录
4.找到log4j-defaults.properties文件
5.将该文件放在资源目录下,并修改文件名
6.修改log4j.properties第19行的内容
四、Spark Streaming读取Socket数据流
1.代码编写
2.开启nc -lk
3.启动Scala程序
五、Spark Streaming读取kafka消息
1.代码编写
2.开启生产者sparkkafkastu并生产消息
3. 运行scala代码
一、Spark Streaming概述
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的RDD如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
Spark Streaming与Flink的区别:Spark Streaming是基于秒级别,而Flink是基于毫秒级别,是真正的实时流,Spark Streaming属于伪实时。因此,在选择实时流计算框架时,如果对实时速度要求不高的话,选择Spark Streaming基本足够。
Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。
应用于 DStream 上的转换操作都会转换为底层RDD上的操作。如对行 DStream中的每个RDD应用flatMap操作以生成单词 DStream 的RDD。
二、添加依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>3.1.2</spark.version>
<mysql.version>8.0.29</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
三、配置log4j
1.依赖下载好后打开IDEA最左侧的外部库
2.找到spark-core
3.找到apache.spark目录
4.找到log4j-defaults.properties文件
5.将该文件放在资源目录下,并修改文件名
6.修改log4j.properties第19行的内容
log4j.rootCategory=ERROR, console
四、Spark Streaming读取Socket数据流
1.代码编写
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamDemo1 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")
// 定义流,采集周期3秒
val streamingContext = new StreamingContext(conf, Seconds(3))
// TODO 配置数据源为指定机器和端口
val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)
// TODO 业务处理
val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))
val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)
// TODO 输出结果
wordCountStream.print()
// TODO 启动采集器
streamingContext.start()
streamingContext.awaitTermination()
}
}
2.开启nc -lk
3.启动Scala程序
五、Spark Streaming读取kafka消息
1.代码编写
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingKafkaSource {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
// 如果没有topic需要创建
// kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1
ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
)
// KeyValue(key,value)
val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
wordCountStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
2.开启生产者sparkkafkastu并生产消息
3. 运行scala代码