【头歌实训】PySpark Streaming 入门

news2024/11/17 3:49:09

文章目录

  • 第1关:SparkStreaming 基础 与 套接字流
    • 任务描述
    • 相关知识
      • Spark Streaming 简介
      • Python 与 Spark Streaming
      • Python Spark Streaming API
      • Spark Streaming 初体验(套接字流)
    • 编程要求
    • 测试说明
    • 答案代码
  • 第2关:文件流
    • 任务描述
    • 相关知识
      • 文件流概述
      • Python 与 Spark Streaming 文件流
      • Spark Streaming 文件流初体验
    • 编程要求
    • 测试说明
    • 答案代码
  • 第3关:RDD 队列流
    • 任务描述
    • 相关知识
      • 队列流概述
      • Python 与 Spark Streaming 队列流
      • Spark Streaming 队列流初体验
    • 编程要求
    • 测试说明
    • 答案代码

第1关:SparkStreaming 基础 与 套接字流

任务描述

本关任务:使用 Spark Streaming 实现词频统计。

相关知识

为了完成本关任务,你需要掌握:

  1. Spark Streaming 简介;
  2. Python 与 Spark Streaming;
  3. Python Spark Streaming API;
  4. Spark Streaming 初体验(套接字流)。

Spark Streaming 简介

Spark Streaming 是 Spark 的核心组件之一,为 Spark 提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming 可整合多种输入数据源,如 Kafka、Flume、HDFS,甚至是普通的 TCP 套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。

img

Spark Streaming 的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经 Spark 引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示。

img

Spark Streaming 最主要的抽象是 DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming 的输入数据按照时间片(如 1 秒)分成一段一段的 DStream,每一段数据转换为 Spark 中的 RDD,并且对 DStream 的操作都最终转变为对相应的 RDD 的操作。例如,下图展示了进行单词统计时,每个时间片的数据(存储句子的 RDD)经 flatMap 操作,生成了存储单词的 RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。

Python 与 Spark Streaming

在 Python 中使用 Spark Streaming 只需要下载 pyspark 扩展库即可,命令如下:

pip install pyspark

键入命令后,等待下载完成。

img

出现如上图所示,则表示安装完成。

创建 Spark Streaming 的上下文对象:

方式一

# 导入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# 通过 SparkConf() 设置配置参数
sparkConf = SparkConf()
sparkConf.setAppName('demo')
sparkConf.setMaster('local[*]')
# 创建 spark 上下文对象
sc = SparkContext(conf=sparkConf)
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 5)

方式二

# 导入包
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 spark 上下文对象
sc = SparkContext("local[*]", "demo")
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 5)

Python Spark Streaming API

pyspark 库中有很多丰富的 API 提供使用,下面将介绍常用的一些 API。

Spark Streaming 核心 API

名称释义
StreamingContext(sparkContext[, …])Spark Streaming 功能的主要入口点。
DStream(jdstream、ssc、jrdd_deserializer)离散流 (DStream) 是 Spark Streaming 中的基本抽象,是表示连续数据流的 RDD 的连续序列(相同类型)。

Spark Streaming 操作 API

名称释义
StreamingContext.addStreamingListener(…)添加一个 [[org.apache.spark.streaming.scheduler.StreamingListener]] 对象,用于接收与流相关的系统事件。
StreamingContext.awaitTermination([timeout])等待执行停止。
StreamingContext.awaitTerminationOrTimeout([timeout])等待执行停止。
StreamingContext.checkpoint(directory)设置上下文以定期检查 DStream 操作以实现主控容错。
StreamingContext.getActive()返回当前活动的 StreamingContext 或无。
StreamingContext.getActiveOrCreate(……)要么返回活动的 StreamingContext(即当前已启动但未停止),要么从检查点数据重新创建 StreamingContext 或使用提供的 setupFunc 函数创建新的 StreamingContext。
StreamingContext.remember(duration)在此上下文中设置每个 DStreams 以记住它在最后给定持续时间内生成的 RDD。
StreamingContext.sparkContext返回与此 StreamingContext 关联的 SparkContext。
StreamingContext.start()开始执行流。
StreamingContext.stop([stopSparkContext,…])停止流的执行,可选择确保所有接收到的数据都已处理。
StreamingContext.transform(dstreams,……)创建一个新的 DStream,其中每个 RDD 都是通过在 DStream 的 RDD 上应用函数来生成的。
StreamingContext.union(*dstreams)从多个相同类型和相同幻灯片时长的 DStream 创建一个统一的 DStream。

输入与输出 API

名称释义
StreamingContext.binaryRecordsStream(……)创建一个输入流,用于监控与 Hadoop 兼容的文件系统中的新文件,并将它们作为具有固定长度记录的平面二进制文件读取。
StreamingContext.queueStream(rdds[, …])从 RDD 或列表的队列中创建输入流。
StreamingContext.socketTextStream(hostname, port)从 TCP 源主机名创建输入:端口。
StreamingContext.textFileStream(directory)创建一个输入流,用于监视与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取。
DStream.pprint([num])打印此 DStream 中生成的每个 RDD 的前 num 个元素。
DStream.saveAsTextFiles(prefix[, suffix])将此 DStream 中的每个 RDD 保存为文本文件,使用元素的字符串表示。

常用的转换与操作 API

名称释义
DStream.count()返回一个新的 DStream,其中每个 RDD 都有一个元素,该元素是通过计算此 DStream 的每个 RDD 生成的。
DStream.countByValue()返回一个新的 DStream,其中每个 RDD 包含此 DStream 的每个 RDD 中每个不同值的计数。
DStream.filter(F)返回一个新的 DStream,仅包含满足条件的元素。
DStream.flatMap(f[,preservesPartitioning])通过对该 DStream 的所有元素应用一个函数,然后将结果展平,返回一个新的 DStream。
DStream.flatMapValues(F)通过将 flatmap 函数应用于此 DStream 中每个键值对的值而不更改键,返回一个新的 DStream。
DStream.foreachRDD(func)对这个 DStream 中的每个 RDD 应用一个函数。
DStream.groupByKey([numPartitions])通过在每个 RDD 上应用 groupByKey 返回一个新的 DStream。
DStream.join(other[,numPartitions])通过在这个 DStream 和其他DStream 的 RDD 之间应用 ‘join’ 返回一个新的DStream。
DStream.map(f[,preservesPartitioning])通过对 DStream 的每个元素应用一个函数来返回一个新的 DStream。
DStream.mapValues(F)通过对该 DStream 中每个键值对的值应用映射函数返回一个新的 DStream,而不更改键。
DStream.reduce(func)返回一个新的 DStream,其中每个 RDD 具有通过减少此 DStream 的每个 RDD 生成的单个元素。
DStream.reduceByKey(func[,numPartitions])通过对每个 RDD 应用 reduceByKey 来返回一个新的 DStream。
DStream.updateStateByKey(updateFunc[, …])返回一个新的“状态” DStream,其中每个键的状态通过对键的先前状态和键的新值应用给定函数来更新。

Spark Streaming 初体验(套接字流)

下面让我们快速了解一下简单的 Spark Streaming 程序是什么样的,假设我们要计算从侦听 TCP 套接字的数据服务器接收到的文本数据中的字数,实现一个流式的 WordCount 计算程序。

第一步,导入包

打开右侧命令行窗口,等待连接后,在主目录下创建文件 test.py,导入 Spark Streaming 所需要的包。

touch test.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

第二步,创建上下文对象

首先,我们导入StreamingContext,它是所有流功能的主要入口点。创建一个具有多个执行线程的本地 StreamingContext,批处理间隔为 20 秒。

sc = SparkContext("local[*]", "demo")
# 每 20 秒读取一次
ssc = StreamingContext(sc, 20) 

第三步,指定数据流

使用这个上下文,我们可以创建一个表示来自 TCP 源的流数据的 DStream,指定为主机名(例如:localhost)和端口(例如:7777)。

lines = ssc.socketTextStream("localhost", 7777)

第四步,分词统计与输出

接下来,我们要按空格(根据数据流的情况来)将行拆分为单词。

words = lines.flatMap(lambda line: line.split(" "))

在这种情况下,每一行将被拆分为多个单词,单词流表示为 wordsDStream。接下来,我们要计算这些单词,输出结果到屏幕。

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()

第五步,启动与停止

请注意,当执行这些行时,Spark Streaming 仅设置它在启动时将执行的计算,并且尚未开始真正的处理。在所有转换设置完成后才开始处理,所以我们最后调用。

# 开始执行流
ssc.start()
# 等待计算终止
ssc.awaitTermination()

启动前我们需要先新开一个命令行窗口用于创建数据流服务器发送端。点击右侧 + 号,新增一个命令行窗口,启动数据流服务器。

nc -l -p 7777

必须先启动数据流服务器,然后再开始执行程序。

回到刚刚的代码窗口,启动程序,开始监听。启动后,我们切换到数据流服务器窗口,输入如下单词:

hello python
hello spark
hello spark streaming

代码窗口界面结果输出如下:

,

当我们在数据流服务器窗口再次输入和上面一样的单词时,发现结果没有进行累加,如下所示:

,

这是由于我们并没有实现更新的操作,我们需要使用 updateStateByKey(func) 方法对其进行累加统计,其参数为一个函数,也就是根据传入的这个函数来实现状态更新功能。

当我们使用累加器时还需借助 checkpoint() 方法设置检查点,告知累加器其检查区域,其参数为一个字符串,指定为保存检查点的目录,如果指定目录未存在,则会自动创建。

具体实现方式如下代码所示:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)
sc = SparkContext("local[*]", "demo")
# 设置输入日志等级
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 20)
# 设置检查点
ssc.checkpoint("file:///usr/local/word_log")
# 指定监听端口
lines = ssc.socketTextStream("localhost", 7777)
# 进行词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# 调用累加器
wordCounts = pairs.updateStateByKey(updateFunction)
# 输出到屏幕
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

**运行程序,注意,先启动数据流服务器。**输入如下数据两次,请在第一次数据输出到屏幕上后再输入第二次:

hello python
hello spark
hello spark streaming

第一次结果如下:

,

第二次结果如下:

,

从结果中可以看出,我们已经实现了从套接字流中读取数据并完成词频统计。

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,执行程序,读取套接字流数据,按空格进行分词,完成词频统计。补充代码,将词频统计的输出内容存储到 /data/workspace/myshixun/project/step1/result 文件中。

代码文件目录: /data/workspace/myshixun/project/step1/work.py

套接字流相关信息:

  • 地址:localhost
  • 端口:8888
  • 输入数据:

程序启动后(5s),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 ssc.awaitTermination(timeout=60) timeout 指定时间。

It is believed that the computer is bringing the world into a brand new era. 
At the time the computer was invented, scientists, marveling at its calculating speed, 
felt that they had created a miracle.
Nowadays, the function of the computer is no longer confined to calculation; 
It permeates peoples daily lives and has become an inseparable part of human society.

输入内容后,注意按回车。

检查点存放本地目录:/root/mylog/

请在程序运行完成后再点击评测,否则会影响评测结果。

小贴士:

  • pprint() 方法中可以设置数据输出显示的数量。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

先写入代码

#!/usr/local/bin/python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)


sc = SparkContext("local[*]", "work")

ssc = StreamingContext(sc, 10)

###################### Begin ######################
# 设置检查点
ssc.checkpoint("/root/mylog/")
# 指定监听端口
lines = ssc.socketTextStream("localhost", 8888)
# 进行词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# 调用累加器
wordCounts = pairs.updateStateByKey(updateFunction)
# 输出到屏幕
wordCounts.pprint()

# 保存输出内容到指定文件中
wordCounts.saveAsTextFiles("/data/workspace/myshixun/project/step1/result","txt")

###################### End ######################

ssc.start()

ssc.awaitTermination(timeout=60)

 

在第一个命令行窗口执行,启动数据流服务器

mkdir -p /root/mylog/
cd /root/mylog/
nc -l -p 8888

启动程序,开始监听后,打开另一个命令行窗口执行

cd /data/workspace/myshixun/project/step1/
chmod 777 work.py
python work.py # 现在开始运行代码文件,请在 60 秒内写入下面数据

回到第一个命令行窗口下把下面数据粘贴上去,再打一个回车

It is believed that the computer is bringing the world into a brand new era. 
At the time the computer was invented, scientists, marveling at its calculating speed, 
felt that they had created a miracle.
Nowadays, the function of the computer is no longer confined to calculation; 
It permeates peoples daily lives and has become an inseparable part of human society.

再去另一个命令行窗口就可以看到正在统计词频了

第2关:文件流

任务描述

本关任务:使用 Spark Streaming 实现文件目录监听,完成词频统计。

相关知识

为了完成本关任务,你需要掌握:

  1. 文件流概述;
  2. Python 与 Spark Streaming 文件流;
  3. Spark Streaming 文件流初体验。

文件流概述

文件流就是数据从一个地方流到另一个地方,像一块大蛋糕一样把一个大的文件分成一块一块的流过去就叫文件流。其中流分为输入流与输出流,输入流指从外界向我们的程序中移动的方向,因此是用来获取数据的流,作用就是读操作。输出流与之相反,从程序向外界移动的方向,用来输出数据的流,作用就是写操作。流是单向的,输入用来读,输出用来写。

img

那么我们为什么需要流呢?

  • 当外部设备与内存中的数据规模不一致,内存小,外部设备大,如果内存大小只有 1G ,但从磁盘读 2G,不能一次读完,这时就需要流。
  • 当外部设备与内存处理数据的能力不一致,内存处理数据快,外部设备慢,内存给磁盘写了 1G ,磁盘可能需要 5 秒去处理写数据,其他事件就会受到影响,这时就需要流。
  • 当读取或者写入大文件时数据会推挤在内存中,导致效率低(内存数据多,导致执行时间变长),这时就需要流。

Python 与 Spark Streaming 文件流

Spark 支持从兼容 HDFS API 的文件系统中读取数据,创建数据流。在 Python 中使用 Spark Streaming 文件流十分简单,通过 textFileStream() 方法就可以对创建文件流。

在 Python 中创建 Spark Streaming 文件流:

# 导入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# 通过 SparkConf() 配置参数
sparkConf = SparkConf()
sparkConf.setAppName('demo')
sparkConf.setMaster('local[*]')
# 创建 spark 上下文对象
sc = SparkContext(conf=sparkConf)
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 10)
# 指定目录或文件,创建文件流
ssc.textFileStream("xxxxxx")

Spark Streaming 文件流初体验

通过对文件流及其创建方法的了解,我们现在通过实际的文件流案例来学习 Spark Streaming 读取文件流的具体实现。

打开右侧命令行窗口,创建一个目录 test,并在里面创建两个子文件 log1.txtlog2.txt,用于模拟数据。

cd /root
mkdir test
cd /root/test
touch log1.txt log2.txt

创建完成后,我们在新建的两个文件 log1.txtlog2.txt 中任意写入一些数据。

echo -e "hello python \nhello spark streaming \nI love big data!" > /root/test/log1.txt
echo -e "hello python \nhello spark streaming \nI love big data!" > /root/test/log2.txt

下面我们就进入 python shell 界面,创建文件流。

python

进入后,出现如下界面:

,

第一步,指定监听目录 /root/test,创建 Spark Streaming 文件流

# 导入包
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 spark 上下文对象
sc = SparkContext("local[*]","demo")
# 创建 Spark Streaming 上下文对象
ssc = StreamingContext(sc, 10)
# 指定 /root/test 目录,创建文件流
lines = ssc.textFileStream("/root/test")

第二步,数据处理

完成对文件流中相关数据的处理。

lines.pprint()

第三步,启动与停止

ssc.start()
ssc.awaitTermination()

运行后发现,并没有输出我们之前写入到文件 log1.txtlog2.txt中的内容。

,

原因是,程序启动后只会只监听 /root/test 目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件,即使你对其进行更新操作。

现在我们点击 + 号新增一个命令行窗口,验证是否真的如此。打开新窗口后,切换到监听目录中,创建一个新文件 log3.txt,任意写入一些数据。

cd /root/test
vi log3.txt
hello python
hello spark

此时我们返回程序运行窗口,稍作等待,查看输出内容,发现刚刚创建的文件 log3.txt 其中的内容输出到了屏幕上。

,

现在我们来测试更新操作是否会被读取到,对程序运行前创建的文件 log2.txt 进行更新操作,任意追加一些内容。

cd /root/test
vi log2.txt
I like ping!

,

此时我们返回程序运行窗口,稍作等待,查看输出内容,发现对 log2.txt 文件追加的内容并没有输出到屏幕上。

,

说明程序运行前监听目录下的文件并不会被识别。

文件流的扩展知识:

  • 可以提供 POSIX glob 模式,例如:hdfs://namenode:8040/logs/2017/*,在这里,DStream 将包含与该模式匹配的目录中的所有文件。也就是说:它是目录的模式,而不是目录中的文件。
  • 所有文件必须采用相同的数据格式。
  • 文件根据其修改时间而非创建时间被视为时间段的一部分。
  • 一旦处理完毕,在当前窗口中对文件的更改不会导致文件被重新读取,即:更新被忽略。
  • 目录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
  • 如果使用通配符来标识目录,例如:hdfs://namenode:8040/logs/2016-*,重命名整个目录以匹配路径,则会将该目录添加到受监视目录列表中。只有目录中修改时间在当前窗口内的文件才会包含在流中。
  • 调用FileSystem.setTimes() 修复时间戳是一种在以后的窗口中拾取文件的方法,即使它的内容没有改变。

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,执行程序,读取文件流数据,按空格进行分词,完成词频统计。补充代码,将词频统计的输出内容存储到 /data/workspace/myshixun/project/step2/result 文件中。

代码文件目录: /data/workspace/myshixun/project/step2/work.py

文件流相关信息:

  • 监听目录:/root/file_stream (需要自行创建)
  • 文件名称:words.txt (需要自行创建)
  • 文件内的数据:

程序启动后(5s),请在 60 秒内创建文件并写入数据,如果需要调整时间,你可以通过修改代码文件中 ssc.awaitTermination(timeout=60) timeout 指定时间。

Hiding behind the loose dusty curtain, a teenager packed up his overcoat into the suitcase.
He planned to leave home at dusk though there was thunder and lightning outdoors.
As a result, his score in each exam never added up to over 60, his name is LiMing.

输入内容后,注意保存退出。

检查点存放本地目录:/root/mylog2/

小贴士:

  • pprint() 方法中可以设置数据输出显示的数量。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

#!/usr/local/bin/python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)


sc = SparkContext("local[*]", "work")

ssc = StreamingContext(sc, 10)

###################### Begin ######################
# 设置检查点
ssc.checkpoint("/root/mylog2/")
# 指定监听端口
lines = ssc.textFileStream("/root/test")
# 进行词频统计
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# 调用累加器
wordCounts = pairs.updateStateByKey(updateFunction)
# 输出到屏幕
wordCounts.pprint()

# 保存输出内容到指定文件中
wordCounts.saveAsTextFiles("/data/workspace/myshixun/project/step2/result","txt")

###################### End ######################

ssc.start()

ssc.awaitTermination(timeout=60)

 

在第一个命令行窗口执行

mkdir -p /root/test/
mkdir -p /root/mylog2/
cd /data/workspace/myshixun/project/step2/
chmod 777 work.py
python work.py # 现在开始运行代码文件,请在 60 秒内创建文件并写入下面数据

再打开一个命令行窗口创建文件并写入下面数据

vim /root/test/words.txt

把下面数据粘贴上去

Hiding behind the loose dusty curtain, a teenager packed up his overcoat into the suitcase.
He planned to leave home at dusk though there was thunder and lightning outdoors.
As a result, his score in each exam never added up to over 60, his name is LiMing.

再去另一个命令行窗口就可以看到正在统计词频了

第3关:RDD 队列流

任务描述

本关任务:使用 Spark Streaming 实现队列流,完成词频统计。

相关知识

为了完成本关任务,你需要掌握:

  1. 队列流概述;
  2. Python 与 Spark Streaming 队列流;
  3. Spark Streaming 队列流初体验。

队列流概述

队列是无须的或共享的消息。使用队列消息传递,可以创建多个消费者来从点对点消息传递通道接收消息。当通道传递消息时,任何消费者都可能收到消息。消息传递系统的实现确定哪个消费者实际接收消息。Queuing 通常与无状态应用程序一起使用。无状态应用程序不关心顺序,但它们确实需要识别或删除单个消息的能力,以及尽可能扩展并行消耗的能力。

img

相比之下,流是严格有序的或独占的消息传递。使用流消息传递,始终只有一个消费者使用消息传递通道。消费者接收从通道发送的消息,其顺序与消息的写入顺序一致。Streaming 通常与有状态的应用程序一起使用。有状态应用程序关心消息顺序及其状态。消息的顺序决定有状态应用程序的状态。当发生无序消费时,排序将影响应用程序,需要处理逻辑的正确性。

Python 与 Spark Streaming 队列流

为了使用测试数据测试 Spark Streaming 应用程序,还可以基于 RDD 队列创建 DStream,使用 streamingContext.queueStream(queueOfRDDs). 每个推入队列的 RDD 都会被视为 DStream 中的一批数据,像流一样处理。

img

在 Python 中创建 Spark Streaming 队列流:

# 导入包
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# 通过 SparkConf() 配置参数
sparkConf = SparkConf()
sparkConf.setAppName('demo')
sparkConf.setMaster('local[*]')
# 创建 spark 上下文对象
sc = SparkContext(conf=sparkConf)
# 创建 Spark Streaming 上下文对象
# 参数1:spark 上下文对象
# 参数2:读取间隔时间(秒)
ssc = StreamingContext(sc, 10)
# 创建列表(RDD)
rddQueue = ["Hello python", "Hello spark", "Hello spark streaming"]
# 创建队列流
inputStream = ssc.queueStream(rddQueue)

Spark Streaming 队列流初体验

通过对队列流及其创建方法的了解,我们现在通过一个案例来学习 Spark Streaming 读取队列流的具体实现。

打开右侧命令行窗口,等待连接后,进入 python shell 界面,创建队列流。

python

进入后,出现如下界面:

,

第一步,创建 Spark Streaming 队列流

# 导入包
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 spark 上下文对象
sc = SparkContext("local[*]","demo")
# 创建 Spark Streaming 上下文对象
ssc = StreamingContext(sc, 10)
# 创建列表(RDD)
rddQueue = ["Hello python", "Hello spark", "Hello spark streaming"]
# 创建队列流
inputStream = ssc.queueStream(rddQueue)

第二步,数据处理

完成对队列流中相关数据的处理。

inputStream.pprint()

第三步,启动与停止

ssc.start()
# 检测到没有数据流输入后就会停止
ssc.stop()

运行后发现,并没有一次输出所有的数据,而是依次的进行输出处理。

img

img

这就是 Spark Streaming 队列流的特性,我们在使用时需要注意。

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,根据所给出的 rdd 列表,创建队列流,按空格进行分词,完成词频统计,使用 pprint() 输出结果。

词频统计要求:

  • 对数据按照 26 个字母进行扁平化统计,例如:('g', 10)
  • 过滤掉所有为 '' 的值。

检查点存放本地目录:/root/mylog3/

小贴士:

  • pprint() 方法中可以设置数据输出显示的数量。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

mkdir -p /root/mylog3/
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


# 累加器(状态更新)
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)


sc = SparkContext("local[*]", "work")

ssc = StreamingContext(sc, 5)

# rdd 列表
rdd = ["My father is a basketball fan, he watches the NBA match when he is free.",
            "Because of the effect from my father, I fell in love with basketball when I was very small.",
            " So when I go to middle school, I join the basketball team in my class",
            " I meet many friends who have the same love for basketball.",
            " We will play basketball after class or sometimes in the weekend, we will play the match with other team."]


###################### Begin ######################
# 设置检查点
ssc.checkpoint("/root/mylog3/")

# 创建队列流
inputStream = ssc.queueStream([sc.parallelize([line]) for line in rdd])

# 按空格进行分词
words = inputStream.flatMap(lambda line: line.split(" "))

# 过滤掉空字符串
words_filter = words.filter(lambda word: word != '')

# 按字母进行扁平化统计
words_flatMap = words_filter.flatMap(lambda word: [(letter, 1) for letter in word.lower()])

# 使用 updateStateByKey 进行状态更新
wordCnt = words_flatMap.updateStateByKey(updateFunction)

# 输出结果
wordCnt.pprint()

###################### End ######################

ssc.start()

time.sleep(30)

ssc.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于采样的自动驾驶规划算法 - PRM,RRT,RRT*,CL-RRT

本文将讲解PRM,RRT,RRT*自动驾驶规划算法原理,不正之处望读者指正 0 前言 机器人运动规划的基本任务:从开始位置到目标位置的运动 (1)如何躲避构型空间出现的障碍物 (2)如何满足机器…

【回溯】0-1背包Python实现

文章目录 [toc]问题描述形式化描述 回溯法时间复杂性Python实现 个人主页:丷从心 系列专栏:回溯法 问题描述 给定 n n n种物品和一背包,物品 i i i的重量是 w i w_{i} wi​,其价值为 v i v_{i} vi​,背包的容量为 c …

第27关 在K8s集群上使用Helm3部署最新版本v2.10.0的私有镜像仓库Harbor

------> 课程视频同步分享在今日头条和B站 大家好,我是博哥爱运维。 在前面的几十关里面,博哥在k8s上部署服务一直都是用的docker hub上的公有镜像,对于企业服务来说,有些我们是不想把服务镜像放在公网上面的; 同时…

记一次史诗级灾难的恢复方法:笨蛋edge把我收藏夹清空了

先在文件管理器中打开隐藏文件和文件扩展名的按钮 这一点可以去网上搜搜,教程非常多。 打开edge,关闭收藏夹的同步按钮 根据下图所示路径找到文件 Bookmarks 15060是我的用户名,各位要找到以自己的用户名命令的文件夹哈。 按顺序执行&…

金三银四,软件测试面试题总结,offer稳稳的。。。

前言 前面看到了一些面试题,总感觉会用得到,但是看一遍又记不住,所以我把面试题都整合在一起,都是来自各路大佬的分享,为了方便以后自己需要的时候刷一刷,不用再到处找题,今天把自己整理的这些…

【AI】计算机视觉VIT文章(Transformer)源码解析

论文:Dosovitskiy A, Beyer L, Kolesnikov A, et al. An image is worth 16x16 words: Transformers for image recognition at scale[J]. arXiv preprint arXiv:2010.11929, 2020 源码的Pytorch版:https://github.com/lucidrains/vit-pytorch 0.前言 …

绝缘电阻测试仪的测量范围有多少?它的测量方法是什么?

绝缘电阻测试仪广泛应用于设备检测和故障排除。它广泛应用于电力检测行业。甚至可以说,电力设备离不开绝缘电阻测试仪设备。对于许多经验丰富的电力测试工人来说,绝缘电阻测试仪的常规测量范围和方法应该非常清楚。在本文中,我们将向一些新的…

分享免费视频素材网站,第三弹

今天继续给大家分享免费视频素材网站,整理不易,觉得内容不错的话,可以点赞收藏一下哦~ 1.livelybg 一个免费视频素材小站,资源很少很少,但均为很少见的科幻、超现实、赛博朋克风格素材!&#xf…

Linux账号和权限管理

目录 前言 一、管理用户账号 1、Linux系统中用户账号类型 2、用户标识UID的分类 3、用户账号文件 4、用户账号的初始配置文件 5、用户账号的管理命令 5.1 useradd 5.2 usermod 5.3 passwd 5.4 userdel 二、管理组账号 1、Linux系统中组账号类型 2、组标识号GID的…

blackbox黑盒监控部署(k8s内)tensuns专用

一、前言 部署在k8s中需要用到deployment、configmap、service服务 二、部署 创建存放yaml的目录 mkdir /opt/blackbox-exporter && cd /opt/blackbox-exporter 编辑blackbox配置文件,使用configmap挂在这 vi configmap.yaml apiVersion: v1 kind: Confi…

移动app软件开发50个创意

那些需要努力工作才能赚钱的日子已经一去不复返了。现在,可以通过软件app赚钱。好吧,人类完全依赖智能手机这并不是谎言,无论是订购项目还是呼叫某人提供各种服务。因此,投资移动软件app开发是一个好主意。然而,移动软…

【51单片机系列】DS1302时钟模块

本文是关于DS1302时钟芯片的相关介绍。 文章目录 一、 DS1302时钟芯片介绍二、DS1302的使用2.1、DS1302的控制寄存器2.2、DS1302的日历/时钟寄存器2.3、片内RAM2.4、DS1302的读写时序 三、SPI总线介绍四、DS1302使用示例 一、 DS1302时钟芯片介绍 DS1302是DALLAS公司推出的涓流…

【教程】使用ipagurd打包与混淆Cocos2d-x的Lua脚本

文章目录 摘要引言正文1. 准备工作2. 使用ipaguard处理Lua文件3. 运行ipagurd进行混淆代码加密具体步骤测试和配置阶段IPA 重签名操作步骤4. IPA重签名与发布 总结 摘要 本文将介绍如何使用ipagurd工具对Cocos2d-x中的Lua脚本进行打包与混淆,以及在iOS应用开发中的…

IDEA2023创建web项目

一、新建项目 点击File->New->Project...,如果是第一次创建项目则单击New Project 二、添加Web Application 建好的样子 把web移动到main目录下同时改名为webapp 三、不存在Add Framework Support添加Web Application 如何存在Add Framework Support&#…

React快速入门之交互性

响应事件 创建事件处理函数 处理函数名常以handle事件名命名 function handlePlayClick() {alert(Playing);}传递事件处理函数 函数名、匿名两种方式&#xff01; function PlayButton() {function handlePlayClick() {alert(Playing);}return (<Button handleClick{handl…

信息科技成“新课标”重点,家长必须要懂!

日新月异的当下&#xff0c;人工智能无疑是与生活最为密切相关的核心词语。或许在不远的将来&#xff0c;技术含量低的重复性工作将会被机器取代。甚至有人认为&#xff0c;现在的小学生&#xff0c;大概多数会在未来从事目前尚未发明出来的工作。 近年来&#xff0c;我国的教…

微信消息撤回拦截:x64dbg反汇编实现揭秘

在数字世界中&#xff0c;信息传递的速度快如闪电&#xff0c;但也常常伴随着一些遗憾。微信作为我们日常生活中最常用的通讯工具之一&#xff0c;其撤回功能让许多人在发出信息后有了后悔的机会。然而&#xff0c;有时候我们却希望能够拦截这些即将被撤回的信息。通过x64dbg反…

YOLOv7+Pose姿态估计+tensort部署加速

YOLOv7是一种基于深度学习的目标检测算法&#xff0c;它能够在图像中准确识别出不同目标的位置和分类。而姿态估计pose和tensort则是一种用于实现人体姿态估计的算法&#xff0c;可以对人体的关节位置和方向进行精准的检测和跟踪。 下面我将分点阐述YOLOv7姿态估计posetensort…

用 Unity 实现的安检模拟小游戏源码,通过安检设备 (扫描仪) 检查乘客的随身物品 根据禁止名单对乘客做出判断是否允许通行

介绍 用 Unity 实现的安检模拟小游戏 软件版本 Unity 2019.4.9f1 (64-bit) Visual Studio 2019 游戏玩法 在游戏中你将扮演一名安全检查员 通过安检设备 (扫描仪) 检查每位乘客的随身物品 根据禁止名单对乘客做出判断&#xff1a;允许通行或者下令逮捕 游戏效果 游戏截图…

maven工具的搭建以及使用

文章目录 &#x1f412;个人主页&#x1f3c5;JavaEE系列专栏&#x1f4d6;前言&#xff1a;&#x1f380;首先进行maven工具的搭建&#x1f993;1.[打开下载 maven 服务器官网](http://maven.apache.org)&#x1fa85;2.解压之后&#xff0c;配置环境变量&#x1f3e8;3.打开设…