Structured Streaming

news2025/1/25 1:42:53

目录

一、概述

(一)基本概念

(二)两种处理模型

(三)Structured Streaming和Spark SQL、Spark Streaming关系

二、编写Structured Streaming程序的基本步骤

(一)实现步骤

(二)运行测试

三、输入源

(一)File源

(二)Kafka源

(三)Socket源

(四)Rate源

四、输出操作

(一)启动流计算

(二)输出模式

(三)输出接收器


一、概述

        提供端到端的完全一致性是设计Structured Streaming 的关键目标之一,为了实现这一点,Spark设计了输入源、执行引擎和接收器,以便对处理的进度进行更可靠的跟踪,使之可以通过重启或重新处理,来处理任何类型的故障。如果所使用的源具有偏移量来跟踪流的读取位置,那么,引擎可以使用检查点和预写日志,来记录每个触发时期正在处理的数据的偏移范围;此外,如果使用的接收器是“幂等”的,那么通过使用重放、对“幂等”接收数据进行覆盖等操作,Structured Streaming可以确保在任何故障下达到端到端的完全一致性。
        Spark一直处于不停的更新中,从Spark 2.3.0版本开始引入持续流式处理模型后,可以将原先流处理的延迟降低到毫秒级别。

(一)基本概念

        Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表。可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询。

        在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并更新结果表。如图Structured Streaming编程模型。

(二)两种处理模型

1、微批处理

        Structured Streaming默认使用微批处理执行模型,这意味着Spark流计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询。数据到达和得到处理并输出结果之间的延时超过100毫秒。

2、持续处理模型

        Spark从2.3.0版本开始引入了持续处理的试验性功能,可以实现流计算的毫秒级延迟。在持续处理模式下,Spark不再根据触发器来周期性启动任务,而是启动一系列的连续读取、处理和写入结果的长时间运行的任务。

(三)Structured Streaming和Spark SQL、Spark Streaming关系

        Structured Streaming处理的数据跟Spark Streaming一样,也是源源不断的数据流,区别在于,Spark Streaming采用的数据抽象是DStream(本质上就是一系列RDD),而Structured Streaming采用的数据抽象是DataFrame。

        Structured Streaming可以使用Spark SQL的DataFrame/Dataset来处理数据流。虽然Spark SQL也是采用DataFrame作为数据抽象,但是,Spark SQL只能处理静态的数据,而Structured Streaming可以处理结构化的数据流。这样,Structured Streaming就将Spark SQL和Spark Streaming二者的特性结合了起来。

        Structured Streaming可以对DataFrame/Dataset应用前面章节提到的各种操作,包括select、where、groupBy、map、filter、flatMap等。

        Spark Streaming只能实现秒级的实时响应,而Structured Streaming由于采用了全新的设计方式,采用微批处理模型时可以实现100毫秒级别的实时响应,采用持续处理模型时可以支持毫秒级的实时响应。

二、编写Structured Streaming程序的基本步骤

编写Structured Streaming程序的基本步骤包括:
(1)导入pyspark模块
(2)创建SparkSession对象
(3)创建输入数据源
(4)定义流计算过程
(5)启动流计算并输出结果

        实例任务:一个包含很多行英文语句的数据流源源不断到达,Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率。

(一)实现步骤

1、步骤一:导入pyspark模块

        导入PySpark模块,代码如下:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import split 
from pyspark.sql.functions import explode

        由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自pyspark.sql.functions里面的split和explode函数。

2、步骤二:创建SparkSession对象

        创建一个SparkSession对象,代码如下:

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

3、步骤三:创建输入数据源

        创建一个输入数据源,从“监听在本机(localhost)的9999端口上的服务”那里接收文本数据,具体语句如下:    

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

4、步骤四:定义流计算过程

        有了输入数据源以后,接着需要定义相关的查询语句,具体如下:    

    words = lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word")
    )
    wordCounts = words.groupBy("word").count()

5、步骤五:启动流计算并输出结果

        定义完查询语句后,下面就可以开始真正执行流计算,具体语句如下:

    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .trigger(processingTime="8 seconds") \
        .start()
    query.awaitTermination()

(二)运行测试

        把上述五步的代码写入文件StructuredNetworkWordCount.py。在执行StructuredNetworkWordCount.py之前,需要启动HDFS。启动HDFS的命令如下:

start-dfs.sh

        新建一个终端(记作“数据源终端”),输入如下命令:

nc -lk 9999

        再新建一个终端(记作“流计算终端”),执行如下命令:

cd /usr/local/mycode/structuredstreaming/
spark-submit StructuredNetworkWordCount.py

        为了模拟文本数据流,可以在“数据源终端”内用键盘不断敲入一行行英文语句,nc程序会把这些数据发送给StructuredNetworkWordCount.py程序进行处理,比如输入如下数据:

apache spark
apache hadoop

        则在“流计算终端”窗口内会输出类似以下的结果信息:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+

三、输入源

(一)File源

        File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。

        File源的选项(option)主要包括如下几个。
(1)path:输入路径的目录,所有文件格式通用。path支持glob通配符路径,但是目录或glob通配符路径的格式不支持以多个逗号分隔的形式。
(2)maxFilesPerTrigger:每个触发器中要处理的最大新文件数(默认无最大值)。
(3)latestFirst:是否优先处理最新的文件,当有大量文件积压时,设置为True可以优先处理新文件,默认为False。
(4)fileNameOnly:是否仅根据文件名而不是完整路径来检査新文件,默认为False。如果设置
为True,则以下文件将被视为相同的文件,因为它们的文件名"dataset.txt"相同:

        这里以一个JSON格式文件的处理来演示File源的使用方法,主要包括以下两个步骤:

(1)创建程序生成JSON格式的File源测试数据

(2)创建程序对数据进行统计

1、创建程序生成JSON格式的File源测试数据 

        为了演示JSON格式文件的处理,这里随机生成一些JSON格式的文件来进行测试。代码文件spark_ss_filesource_generate.py内容如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
# 导入需要用到的模块
import os
import shutil
import random
import time

TEST_DATA_TEMP_DIR = '/tmp/'
TEST_DATA_DIR = '/tmp/testdata/'
 
ACTION_DEF = ['login', 'logout', 'purchase']
DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou']
JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n‘

# 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹
def test_setUp():
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
    os.mkdir(TEST_DATA_DIR) 

# 测试环境的恢复,对文件夹进行清理
def test_tearDown():
    if os.path.exists(TEST_DATA_DIR):
        shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)
 
 
# 生成测试文件
def write_and_move(filename, data):
    with open(TEST_DATA_TEMP_DIR + filename,
              "wt", encoding="utf-8") as f:
        f.write(data)
 
    shutil.move(TEST_DATA_TEMP_DIR + filename,
                TEST_DATA_DIR + filename)

if __name__ == "__main__":
    test_setUp()
 
    for i in range(1000):
        filename = 'e-mall-{}.json'.format(i)
 
        content = ''
        rndcount = list(range(100))
        random.shuffle(rndcount)
        for _ in rndcount:
            content += JSON_LINE_PATTERN.format(
                str(int(time.time())),
                random.choice(ACTION_DEF),
                random.choice(DISTRICT_DEF))
        write_and_move(filename, content)
 
        time.sleep(1)
 
    test_tearDown()

        这段程序首先建立测试环境,清空测试数据所在的目录,接着使用for循环一千次来生成一千个文件,文件名为“e-mall-数字.json”, 文件内容是不超过100行的随机JSON行,行的格式是类似如下:

 {"eventTime": 1546939167, "action": "logout", "district": "fujian"}\n

2、创建程序对数据进行统计

        spark_ss_filesource.py”,其代码内容如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
# 导入需要用到的模块
import os
import shutil
from pprint import pprint
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType

# 定义JSON文件的路径常量
TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'

if __name__ == "__main__":
    # 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成
    schema = StructType([
        StructField("eventTime", TimestampType(), True),
        StructField("action", StringType(), True),
        StructField("district", StringType(), True)])
 
    spark = SparkSession \
        .builder \
        .appName("StructuredEMallPurchaseCount") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN')

     lines = spark \
        .readStream \
        .format("json") \
        .schema(schema) \
        .option("maxFilesPerTrigger", 100) \
        .load(TEST_DATA_DIR_SPARK)
 
    # 定义窗口
    windowDuration = '1 minutes'
 
    windowedCounts = lines \
        .filter("action = 'purchase'") \
        .groupBy('district', window('eventTime', windowDuration)) \
        .count() \
        .sort(asc('window')) 

    query = windowedCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option('truncate', 'false') \
        .trigger(processingTime="10 seconds") \
        .start()
 
query.awaitTermination()

3、测试运行程序

        程序运行过程需要访问HDFS,因此,需要启动HDFS,命令如下:

start-dfs.sh

        新建一个终端,执行如下命令生成测试数据:

cd /usr/local/mycode/structuredstreaming/file
python3 spark_ss_filesource_generate.py

        新建一个终端,执行如下命令运行数据统计程序:

cd /usr/local/mycode/structuredstreaming/file
spark-submit spark_ss_filesource.py

        运行程序以后,可以看到类似如下的输出结果:

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---------+------------------------------------------+-----+
|district |window                                    |count|
+---------+------------------------------------------+-----+
|guangzhou|[2019-01-08 17:19:00, 2019-01-08 17:20:00]|283  |
|shanghai |[2019-01-08 17:19:00, 2019-01-08 17:20:00]|251  |
|fujian   |[2019-01-08 17:19:00, 2019-01-08 17:20:00]|258  |
|beijing  |[2019-01-08 17:19:00, 2019-01-08 17:20:00]|258  |
|guangzhou|[2019-01-08 17:20:00, 2019-01-08 17:21:00]|492  |
|beijing  |[2019-01-08 17:20:00, 2019-01-08 17:21:00]|499  |
|fujian   |[2019-01-08 17:20:00, 2019-01-08 17:21:00]|513  |
|shanghai |[2019-01-08 17:20:00, 2019-01-08 17:21:00]|503  |
|guangzhou|[2019-01-08 17:21:00, 2019-01-08 17:22:00]|71   |
|fujian   |[2019-01-08 17:21:00, 2019-01-08 17:22:00]|74   |
|shanghai |[2019-01-08 17:21:00, 2019-01-08 17:22:00]|66   |
|beijing  |[2019-01-08 17:21:00, 2019-01-08 17:22:00]|52   |
+---------+------------------------------------------+-----+

(二)Kafka源

        Kafka源是流处理最理想的输入源,因为它可以保证实时和容错。Kafka源的选项(option)包括如下几个。
(1)assign:指定所消费的Kafka主题和分区。
(2)subscribe:订阅的Kafka主题,为逗号分隔的主题列表。
(3)subscribePattern:订阅的Kafka主题正则表达式,可匹配多个主题。
(4)kafka.bootstrap.servers:Kafka服务器的列表,逗号分隔的 "host:port"列表。
(5)startingOffsets:起始位置偏移量。
(6)endingOffsets:结束位置偏移量。
(7)failOnDataLoss:布尔值,表示是否在Kafka数据可能丢失时(主题被删除或位置偏移量超出范围等)触发流计算失败。一般应当禁止,以免误报。

        在这个实例中,使用生产者程序每0.1秒生成一个包含2个字母的单词,并写入Kafka的名称为“wordcount-topic”的主题(Topic)内。Spark的消费者程序通过订阅wordcount-topic,会源源不断收到单词,并且每隔8秒钟对收到的单词进行一次词频统计,把统计结果输出到Kafka的主题wordcount-result-topic内,同时,通过2个监控程序检查Spark处理的输入和输出结果。

1、启动Kafka 

        在Linux系统中新建一个终端(记作“Zookeeper终端”),输入下面命令启动Zookeeper服务:

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

        不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。另外打开第二个终端(记作“Kafka终端”),然后输入下面命令启动Kafka服务:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

        不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。

        再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic

        再新开一个终端(记作“监控输出终端”),执行如下命令监控输出的结果文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic

2、编写生产者(Producer)程序

        代码文件spark_ss_kafka_producer.py内容如下:

#!/usr/bin/env python3
 
import string
import random
import time
 
from kafka import KafkaProduce

if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
 
    while True:
        s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
        word = ''.join(s2)
        value = bytearray(word, 'utf-8')
 
        producer.send('wordcount-topic', value=value).get(timeout=10)
 
        time.sleep(0.1)

        如果还没有安装Python3的Kafka支持,需要按照如下操作进行安装:

(1)首先确认有没有安装pip3,如果没有,使用如下命令安装:

apt-get install pip3

(2)安装kafka-python模块,命令如下:

pip3 install kafka-python

然后在终端中执行如下命令运行生产者程序:

cd /usr/local/mycode/structuredstreaming/kafka/
python3 spark_ss_kafka_producer.py

生产者程序执行以后,在“监控输入终端”的窗口内就可以看到持续输出包含2个字母的单词。

3、编写消费者(Consumer)程序

        代码文件spark_ss_kafka_consumer.py内容如下:

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession
 
 
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredKafkaWordCount") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN‘)

    lines = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", 'wordcount-topic') \
        .load() \
        .selectExpr("CAST(value AS STRING)")
 
    wordCounts = lines.groupBy("value").count()

    query = wordCounts \
        .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \
        .writeStream \
        .outputMode("complete") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "wordcount-result-topic") \
        .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
        .trigger(processingTime="8 seconds") \
        .start()
 
    query.awaitTermination()

        在终端中执行如下命令运行消费者程序:

cd /usr/local/mycode/structuredstreaming/kafka/
/usr/local/spark/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
spark_ss_kafka_consumer.py

        消费者程序运行起来以后,可以在“监控输出终端”看到类似如下的输出结果:

sq:3
bl:6
lo:8
…

(三)Socket源

        Socket源从一个本地或远程主机的某个端口服务上读取数据,数据的编码为UTF8。因为Socket源使用内存保存读取到的所有数据,并且远端服务不能保证数据在出错后可以使用检查点或者指定当前已处理的偏移量来重放数据,所以,它无法提供端到端的容错保障。Socket源一般仅用于测试或学习用途。

        Socket源的选项(option)包括如下几个。
(1)host:主机IP地址或者域名,必须设置。
(2)port:端口号,必须设置。
(3)includeTimestamp:是否在数据行内包含时间戳。使用时间戳可以用来测试基于时间聚合的
功能。

        Socket源的实例可以参考“二、编写Structured Streaming程序的基本步骤”的StructuredNetworkWordCount.py。

(四)Rate源

        Rate源可每秒生成特定个数的数据行,每个数据行包括时间戳和值字段。时间戳是消息发送的时间,值是从开始到当前消息发送的总个数,从0开始。Rate源一般用来作为调试或性能基准测试。

        Rate源的选项(option)包括如下几个。
(1)rOwsPerSecond:每秒产生多少行数据,默认为1。
(2)rampUpTime:生成速度达到rowsPerSecond需要多少启动时间,使用比秒更精细的粒度将
会被截断为整数秒,默认为0秒。
(3)numPartitions:使用的分区数,默认为Spark的默认分区数。

        Rate源会尽可能地使每秒生成的数据量达到rowsPerSecond,可以通过调整numPartitions以尽快达到所需的速度。这几个参数的作用类似一辆汽车从0加速到100千米/小时并以100千米/小时进行巡航的过程,通过增加“马力”(numPartitions),可以使得加速时间(rampUpTime)更短。

        代码文件spark_ss_rate.py内容如下:

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession
 
 
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("TestRateStreamSource") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN‘)

    lines = spark \
        .readStream \
        .format("rate") \
        .option('rowsPerSecond', 5) \
        .load()
 
    print(lines.schema)
 
    query = lines \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option('truncate', 'false') \
        .start()
 
    query.awaitTermination()

        在Linux终端中执行如下命令执行spark_ss_rate.py:

cd /usr/local/mycode/structuredstreaming/rate/
spark-submit spark_ss_rate.py

        上述命令执行后,会得到类似如下的结果:

StructType(List(StructField(timestamp,TimestampType,true),StructField(value,LongType,true)))
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+ 

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2018-10-01 15:42:38.595|0    |
|2018-10-01 15:42:38.795|1    |
|2018-10-01 15:42:38.995|2    |
|2018-10-01 15:42:39.195|3    |
|2018-10-01 15:42:39.395|4    |
+-----------------------+-----+

四、输出操作

(一)启动流计算

        DataFrame/Dataset的.writeStream()方法将会返回DataStreamWriter接口,接口通过.start()真正启动流计算,并将DataFrame/Dataset写入到外部的输出接收器,DataStreamWriter接口有以下几个主要函数:

(1)format:接收器类型。
(2)outputMode:输出模式,指定写入接收器的内容,可以是Append模式、Complete模式或Update模式。
(3)queryName:查询的名称,可选,用于标识查询的唯一名称。
(4)trigger:触发间隔,可选,设定触发间隔,如果未指定,则系统将在上一次处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成导致超过触发间隔,则系统将在处理完成后立即触发新的查询。

(二)输出模式

        输出模式用于指定写入接收器的内容,主要有以下几种:
(1)Append模式:只有结果表中自上次触发间隔后增加的新行,才会被写入外部存储器。这种模式一般适用于“不希望更改结果表中现有行的内容”的使用场景。
(2)Complete模式:已更新的完整的结果表可被写入外部存储器。
(3)Update模式:只有自上次触发间隔后结果表中发生更新的行,才会被写入外部存储器。这种模式与Complete模式相比,输出较少,如果结果表的部分行没有更新,则不会输出任何内容。当查询不包括聚合时,这个模式等同于Append模式。

        不同的流计算查询类型支持不同的输出模式,二者之间的兼容性如下表所示。

查询类型支持的输出模式备注
聚合查询在事件时间字段上使用水印的聚合Append
Complete
Update
Append模式使用水印来清理旧的聚合状态
其他聚合Complete
Update
连接查询Append
其他查询Append
Update
不支持Complete模式,因为无法将所有未分组数据保存在结果表内

(三)输出接收器

        系统内置的输出接收器包括File接收器、Kafka接收器、Foreach接收器、Console接收器、Memory接收器等,其中,Console接收器和Memory接收器仅用于调试用途。有些接收器由于无法保证输出的持久性,导致其不是容错的。Spark内置的输出接收器的详细信息如下表所示。

接收器支持的输出模式选项容错
File接收器Appendpath:输出目录的路径必须指定是。数据只会被处理一次
Kafka接收器Append
Complete
Update
选项较多,具体可查看Kafka对接指南是。数据至少被处理一次
Foreach接收器Append
Complete
Update
依赖于ForeachWriter的实现
Console接收器Append
Complete
Update
numRows:每次触发后打印多少行,默认为20;
truncate:如果行太长是否截断,默认为“是”
Memory接收器Append
Complete
否。在Complete输出模式下,重启查询会重建全表

        以File接收器为例,这里把“二、编写Structured Streaming程序的基本步骤”的实例修改为使用File接收器,修改后的代码文件为StructuredNetworkWordCountFileSink.py:

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql.functions import length

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCountFileSink") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN')
 
 
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load() 

     words = lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word")
    )
 
    all_length_5_words = words.filter(length("word") == 5)
 
    query = all_length_5_words \
        .writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "file:///tmp/filesink") \
        .option("checkpointLocation", "file:///tmp/file-sink-cp") \
        .trigger(processingTime="8 seconds") \
        .start() 
    query.awaitTermination()

        在Linux系统中新建一个终端(记作“数据源终端”),输入如下命令:

nc -lk 9999

        再新建一个终端(记作“流计算终端”),执行如下命令执行StructuredNetworkWordCountFileSink.py:

cd /usr/local/mycode/structuredstreaming
spark-submit StructuredNetworkWordCountFileSink.py

        为了模拟文本数据流,可以在数据源终端内用键盘不断敲入一行行英文语句,并且让其中部分英语单词长度等于5。

        由于程序执行后不会在终端输出信息,这时可新建一个终端,执行如下命令查看File接收器保存的位置:

cd /tmp/filesink
ls

        可以看到以parquet格式保存的类似如下的文件列表:

part-00000-2bd184d2-e9b0-4110-9018-a7f2d14602a9-c000.snappy.parquet
part-00000-36eed4ab-b8c4-4421-adc6-76560699f6f5-c000.snappy.parquet
part-00000-dde601ad-1b49-4b78-a658-865e54d28fb7-c000.snappy.parquet
part-00001-eedddae2-fb96-4ce9-9000-566456cd5e8e-c000.snappy.parquet
_spark_metadata

        可以使用strings命令查看文件内的字符串,具体如下:

strings part-00003-89584d0a-db83-467b-84d8-53d43baa4755-c000.snappy.parquet

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1446138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jsp计算机线上教学系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 计算机线上教学系统是一套完善的java web信息管理系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql5…

WebSocketServer方法里注入不了其他类

请直接看原文: WebSocketServer无法注入其他对象的问题 - 知乎 (zhihu.com) WebSocket服务无法使用自动注入解决方法_websocket sever不可以直接注入吧-CSDN博客 ------------------------------------------------------------------------------------------------------…

2.11日学习打卡----初学RocketMQ(二)

2.11日学习打卡 一. RocketMQ整合springboot 首先配置pom.xml文件 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>annotationProcessor</scope></dependency><dependency>…

Java图形化界面编程——处理位图 笔记

2.8.3 处理位图 ​ 如果仅仅绘制一些简单的几何图形&#xff0c;程序的图形效果依然比较单调 。 AWT 也允许在组件上绘制位图&#xff0c; Graphics 提供了 drawlmage() 方法用于绘制位图&#xff0c;该方法需要一个Image参数一一代表位图&#xff0c;通过该方法就可 以绘制出…

苹果Mac键盘如何将 F1 到 F12 取消按Fn

苹果电脑安装了Win10操作系统之后&#xff0c;F1到F12用不了怎么办的解决方法。本文将介绍一些解决方法&#xff0c;帮助您解决无法使用F1到F12功能键的问题。 使用 Mac系统的人都知道&#xff0c;Mac系统默认是没有开启 F1-F12 的使用的&#xff0c;平时我们使用的系统都可以使…

【C语言】实现双向链表

目录 &#xff08;一&#xff09;头文件 &#xff08;二&#xff09; 功能实现 &#xff08;1&#xff09;初始化 &#xff08;2&#xff09;打印链表 &#xff08;3&#xff09; 头插与头删 &#xff08;4&#xff09;尾插与尾删 &#xff08;5&#xff09;指定位置之后…

降噪和音频修复 iZotope RX 7 Advanced

iZotope RX 7 Advanced 是一款功能强大的音频修复和增强软件&#xff0c;它能够帮助用户轻松应对各种音频问题&#xff0c;提供全面的工具和技术来优化和改善音频质量。 首先&#xff0c;iZotope RX 7 Advanced 具有出色的降噪功能。无论是背景噪音、杂音还是其他干扰因素&…

【Java EE初阶十二】网络编程TCP/IP协议(二)

1. 关于TCP 1.1 TCP 的socket api tcp的socket api和U大片的socket api差异很大&#xff0c;但是和前面所讲的文件操作很密切的联系 下面主要讲解两个关键的类&#xff1a; 1、ServerSocket&#xff1a;给服务器使用的类&#xff0c;使用这个类来绑定端口号 2、Socket&#xf…

服务治理中间件-Eureka

目录 简介 搭建Eureka服务 注册服务到Eureka 简介 Eureka是Spring团队开发的服务治理中间件&#xff0c;可以轻松在项目中&#xff0c;实现服务的注册与发现&#xff0c;相比于阿里巴巴的Nacos、Apache基金会的Zookeeper&#xff0c;更加契合Spring项目&#xff0c;缺点就是…

论文阅读:《Deep Learning-Based Human Pose Estimation: A Survey》——Part 1:2D HPE

目录 人体姿态识别概述 论文框架 HPE分类 人体建模模型 二维单人姿态估计 回归方法 目前发展 优化 基于热图的方法 基于CNN的几个网络 利用身体结构信息提供构建HPE网络 视频序列中的人体姿态估计 2D多人姿态识别 方法 自上而下 自下而上 2D HPE 总结 数据集…

GEE:最小距离(minimumDistance)回归教程(样本点、特征添加、训练、精度、参数优化)

作者:CSDN @ _养乐多_ 对于分类问题,这个输出通常是一个类别标签 ,而对于回归问题,输出通常是一个连续的数值。回归可以应用于多种场景,包括预测土壤PH值、土壤有机碳、土壤水分、碳密度、生物量、气温、海冰厚度、不透水面积百分比、植被覆盖度等。 本文将介绍在Google…

位运算+leetcode(1)

基础 1.基础知识 以下都是针对数字的二进制进行操作 >> 右移操作符<< 左移操作符~ 取反操作符 & 有0就是0&#xff0c;全一才一 | 有一才一 &#xff0c;全0才0^ 相同为0&#xff0c;相异为1 异或( ^ )运算的规律 a ^ 0 a a ^ a 0a ^ b ^ c a ^ (b …

2本对微服务拆分有帮助的书

迁移到云原生应用架构 可在线观看的免费书籍 https://pivotal.io/platform-as-a-service/migrating-to-cloud-native-application-architectures-ebook 微服务架构设计模式 世界十大架构师之一&#xff1a;克里斯理查森著

【51单片机】LED点阵屏(江科大)

9.1LED点阵屏 1.LED点阵屏介绍 LED点阵屏由若干个独立的LED组成,LED以矩阵的形式排列,以灯珠亮灭来显示文字、图片、视频等。 2.LED点阵屏工作原理 LED点阵屏的结构类似于数码管,只不过是数码管把每一列的像素以“8”字型排列而已。原理图如下 每一行的阳极连在一起,每一列…

如何在C# Windows Forms应用程序中实现控件之间的连接线

帮我实现绘图工具多个控件连接线&#xff0c;请用c#代码实现 实现绘图工具中多个控件之间的连接线功能&#xff0c;可以通过以下几个步骤来进行&#xff1a; 定义连接线的数据模型&#xff1a;首先需要定义一个模型来表示连接线&#xff0c;这个模型应该包含起点和终点的坐标。…

【Effective Objective - C 2.0】——读书笔记(四)

文章目录 二十三、通过委托与数据源协议进行对象间通信二十四、将类的实现代码分散到便于管理的数个分类之中二十五、总是为第三方的分类名称加前缀二十六、切勿在分类里面声明属性二十七、使用“class-continuation分类”隐藏实现细节二十八、通过协议提供匿名对象 二十三、通…

【Chrono Engine学习总结】4-vehicle-4.1-vehicle的基本概念

由于Chrono的官方教程在一些细节方面解释的并不清楚&#xff0c;自己做了一些尝试&#xff0c;做学习总结。 1、基本介绍 Vehicle Overview Vehicle Mannel Vehicle的官方demo 1.1 Vehicle的构型 一个车辆由许多子系统构成&#xff1a;悬挂、转向、轮子/履带、刹车/油门、动…

书生谱语-大语言模型测试demo

课程内容简介 通用环境配置 开发机 InterStudio pip 换源 临时使用镜像源安装&#xff0c;如下所示&#xff1a;some-package 为你需要安装的包名 pip install -i https://mirrors.cernet.edu.cn/pypi/web/simple some-package设置pip默认镜像源&#xff0c;升级 pip 到最新…

力扣1122. 数组的相对排序(哈希表)

Problem: 1122. 数组的相对排序 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.利用arr2创建一个无序映射&#xff08;map集合&#xff09;&#xff0c;以其中的元素作为键&#xff0c;值默认设置为0&#xff1b; 2.扫描arr1数组统计arr2元素在其中的个数(将个…