大数据分析与应用实验任务十一

news2025/1/16 20:52:42

大数据分析与应用实验任务十一

实验目的

  • 通过实验掌握spark Streaming相关对象的创建方法;

  • 熟悉spark Streaming对文件流、套接字流和RDD队列流的数据接收处理方法;

  • 熟悉spark Streaming的转换操作,包括无状态和有状态转换。

  • 熟悉spark Streaming输出编程操作。

实验任务

一、DStream 操作概述
  1. 创建 StreamingContext 对象

    登录 Linux 系统后,启动 pyspark。进入 pyspark 以后,就已经获得了一个默认的 SparkConext 对象,也就是 sc。因此,可以采用如下方式来创建 StreamingContext 对象:

    from pyspark.streaming import StreamingContext 
    sscluozhongye = StreamingContext(sc, 1)
    

    image-20231207112253827

    如果是编写一个独立的 Spark Streaming 程序,而不是在 pyspark 中运行,则需要在代码文件中通过类似如下的方式创建 StreamingContext 对象:

    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1)
    print("创建成功,lzy防伪")
    

    image-20231207112652285

二、基本输入源
  1. 文件流
  • 在 pyspark 中创建文件流

    首先,在 Linux 系统中打开第 1 个终端(为了便于区分多个终端,这里记作“数据源终端”),创建一个 logfile 目录,命令如下:

    cd /root/Desktop/luozhongye/
    mkdir streaming 
    cd streaming 
    mkdir logfile
    

    image-20231207112923323

    其次,在 Linux 系统中打开第二个终端(记作“流计算终端”),启动进入 pyspark,然后,依次输入如下语句:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    

image-20231207113305405

  • 采用独立应用程序方式创建文件流

    #!/usr/bin/env python3 
    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    print("2023年12月7日lzy")
    

    保存该文件,并执行以下命令:

    cd /root/Desktop/luozhongye/streaming/logfile/ 
    spark-submit FileStreaming.py
    

image-20231207114014647

  1. 套接字流
  • 使用套接字流作为数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中输入如下内容:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	if len(sys.argv) != 3:
    		print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
    		exit(-1)
    	sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    	ssc = StreamingContext(sc, 1)
    	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    	counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    	counts.pprint()
    	ssc.start()
    	ssc.awaitTermination()
    

    使用如下 nc 命令生成一个 Socket 服务器端:

    nc -lk 9999
    

    新建一个终端(记作“流计算终端”),执行如下代码启动流计算:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208002212790

  • 使用 Socket 编程实现自定义数据源

    新建一个代码文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中输入如下代码:

    #!/usr/bin/env python3 
    import socket
    
    # 生成 socket 对象
    server = socket.socket()
    # 绑定 ip 和端口
    server.bind(('localhost', 9999))
    # 监听绑定的端口
    server.listen(1)
    while 1:
    	# 为了方便识别,打印一个“I’m waiting the connect...”
    	print("I'm waiting the connect...")
    	# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例
    	# 所以下面的传输要使用 conn 实例操作
    	conn, addr = server.accept()
    	# 打印连接成功
    	print("Connect success! Connection is from %s " % addr[0])
    	# 打印正在发送数据
    	print('Sending data...')
    	conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
    	conn.close()
    	print('Connection is broken.')
    print("2023年12月7日lzy")
    

    执行如下命令启动 Socket 服务器端:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit DataSourceSocket.py
    

    新建一个终端(记作“流计算终端”),输入以下命令启动 NetworkWordCount 程序:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208003303167

  1. RDD 队列流

    Linux 系统中打开一个终端,新建一个代码文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,输入以下代码:

    #!/usr/bin/env python3 
    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        print("")
    	sc = SparkContext(appName="PythonStreamingQueueStream")
    	ssc = StreamingContext(sc, 2)
    	# 创建一个队列,通过该队列可以把 RDD 推给一个 RDD 队列流
    	rddQueue = []
    	for i in range(5):
    		rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
    	time.sleep(1)
    	# 创建一个 RDD 队列流
    	inputStream = ssc.queueStream(rddQueue)
    	mappedStream = inputStream.map(lambda x: (x % 10, 1))
    	reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
    	reducedStream.pprint()
    	ssc.start()
    	ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    下面执行如下命令运行该程序:

    cd /root/Desktop/luozhongye/streaming/rddqueue 
    /usr/local/spark/bin/spark-submit RDDQueueStream.py
    

image-20231208004439462

三、转换操作
  1. 滑动窗口转换操作

    对“套接字流”中的代码 NetworkWordCount.py 进行一个小的修改,得到新的代码文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其内容如下:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	if len(sys.argv) != 3:
    		print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
    		exit(-1)
    	sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
    	ssc = StreamingContext(sc, 10)
    	ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")
    	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    	counts = lines.flatMap(lambda line: line.split(" ")) \
    		.map(lambda word: (word, 1)) \
    		.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
    	counts.pprint()
    	ssc.start()
    	ssc.awaitTermination()
    

为了测试程序的运行效果,首先新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

   cd /root/Desktop/luozhongye/streaming/socket/ 
   nc -lk 9999

然后,再新建一个终端(记作“流计算终端”),运行客户端程序 WindowedNetworkWordCount.py,命令如下:

   cd /root/Desktop/luozhongye/streaming/socket/ 
   /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999

在数据源终端内,连续输入 10 个“hadoop”,每个 hadoop 单独占一行(即每输入一个 hadoop就按回车键),再连续输入 10 个“spark”,每个 spark 单独占一行。这时,可以查看流计算终端内显示的词频动态统计结果,可以看到,随着时间的流逝,词频统计结果会发生动态变化。

image-20231208005821701

  1. updateStateByKey 操作

    在“/root/Desktop/luozhongye/streaming/stateful/”目录下新建一个代码文件 NetworkWordCountStateful.py,输入以下代码:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	if len(sys.argv) != 3:
    		print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
    		exit(-1)
    	sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    	ssc = StreamingContext(sc, 1)
    	ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")
    	# RDD with initial state (key, value) pairs
    	initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
    
    
    	def updateFunc(new_values, last_sum):
    		return sum(new_values) + (last_sum or 0)
    
    
    	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    	running_counts = lines.flatMap(lambda line: line.split(" ")) \
    		.map(lambda word: (word, 1)) \
    		.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
    	running_counts.pprint()
    	ssc.start()
    	ssc.awaitTermination()
    

    新建一个终端(记作“数据源终端”),执行如下命令启动 nc 程序:

    nc -lk 9999
    

    新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

    cd /root/Desktop/luozhongye/streaming/stateful 
    /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
    

image-20231208010814959

四、把 DStream 输出到文本文件中

下面对之前已经得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代码进行简单的修改,把生成的词频统计结果写入文本文件中。

修改后得到的新代码文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的内容如下:

#!/usr/bin/env python3 
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
	if len(sys.argv) != 3:
		print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=
		sys.stderr)
		exit(-1)
	sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
	ssc = StreamingContext(sc, 1)
	ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")
	# RDD with initial state (key, value) pairs 
	initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])


	def updateFunc(new_values, last_sum):
		return sum(new_values) + (last_sum or 0)


	lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
	running_counts = lines.flatMap(lambda line: line.split(" ")) \
		.map(lambda word: (word, 1)) \
		.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
	running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")
	running_counts.pprint()
	ssc.start()
	ssc.awaitTermination()

新建一个终端(记作“数据源终端”),执行如下命令运行nc 程序:

cd /root/Desktop/luozhongye/streaming/socket/ 
nc -lk 9999

新建一个 Linux 终端(记作“流计算终端”),执行如下命令提交运行程序:

cd /root/Desktop/luozhongye/streaming/stateful 
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999

image-20231208012123002

实验心得

通过本次实验,我深入理解了Spark Streaming,包括创建StreamingContext、DStream等对象。同时,我了解了Spark Streaming对不同类型数据流的处理方式,如文件流、套接字流和RDD队列流。此外,我还熟悉了Spark Streaming的转换操作和输出编程操作,并掌握了map、flatMap、filter等方法。最后,我能够自定义输出方式和格式。总之,这次实验让我全面了解了Spark Streaming,对未来的学习和工作有很大的帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294028.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索Spring事件监听机制的奇妙世界

文章目录 什么是Spring事件监听机制主要组件内置的事件监听类自定义事件监听类总结 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 什么是Spring事件监听机制 Spring事件监听机制是Spr…

【网络安全】零日漏洞(0day)是什么?如何防范零日攻击?

零日攻击是利用零日漏洞&#xff08;0day&#xff09;对系统或软件应用发动的网络攻击&#xff0c;近年来&#xff0c;零日攻击威胁在日益增长且难以防范&#xff0c;零日攻击已成为企业网络信息安全面临的最严峻的威胁之一。 文章目录 What is a zero-day attack&#xff1f;…

计算机操作系统4

1.什么是进程同步 2.什么是进程互斥 3.进程互斥的实现方法(软件) 4.进程互斥的实现方法(硬件) 5.遵循原则 6.总结&#xff1a; 线程是一个基本的cpu执行单元&#xff0c;也是程序执行流的最小单位。 调度算法&#xff1a;先来先服务FCFS、短作业优先、高响应比优先、时间片…

用 Bytebase 做数据库 schema 迁移

数据库 schema 迁移指修改管理数据库结构的变更&#xff0c;包括为数据库添加视图或表、更改字段类型或定义新约束。Bytebase 提供了可视化 GUI 方便迁移数据库 schema&#xff0c;本教程将展示如何使用 Bytebase 为 schema 迁移配上 SQL 审核&#xff0c;自定义审批流&#xf…

浅谈城镇老旧小区改造中电动汽车充电桩应用探讨

摘要&#xff1a;《2020 年政府工作报告》指出&#xff1a;加强新型基础设施建设&#xff0c;发展新一代信息网络&#xff0c;拓展 5G 应用&#xff0c;建设数据中心&#xff0c;增加充电桩、换电站等设施&#xff0c;推广新能源汽车&#xff0c;激发新消费需求、助力产业升级。…

【天线了解】2.WTW天线了解与使用

注意网段&#xff1a;&#xff08;计算机与设备同一网段才可以通信&#xff09; 1.LS28接收机使用的网段是192.168.16.X&#xff0c;所以电脑应该同样设置 2.WTW天线使用网段192.168.98.X 0.WTW使用原理 1.计算机控制LS28&#xff08;接收机&#xff09;&#xff0c;WTW天线。 …

面试官:说说webpack proxy工作原理?为什么能解决跨域?

面试官&#xff1a;说说webpack proxy工作原理&#xff1f;为什么能解决跨域? 一、是什么 webpack proxy&#xff0c;即webpack提供的代理服务 基本行为就是接收客户端发送的请求后转发给其他服务器 其目的是为了便于开发者在开发模式下解决跨域问题&#xff08;浏览器安全…

Minio保姆级教程

转载自&#xff1a;www.javaman.cn Minio服务器搭建和整合 1、centos安装minio 1.1、创建安装目录 mkdir -p /home/minio1.2、在线下载minio #进入目录 cd /home/minio #下载 wget https://dl.minio.io/server/minio/release/linux-amd64/minio1.3、minio配置 1.3.1、添加…

为什么越来越多的网站使用https,有什么好处?什么是https加密协议?

首先回答“什么是https加密协议&#xff1f;” HTTPS&#xff08;HyperText Transfer Protocol Secure&#xff09;是一种通过加密传输数据的安全版本的HTTP协议。它使用了SSL&#xff08;Secure Sockets Layer&#xff09;或TLS&#xff08;Transport Layer Security&#xf…

代码混淆技术探究与工具选择

代码混淆技术探究与工具选择 引言 在软件开发中&#xff0c;保护程序代码的安全性是至关重要的一环。代码混淆&#xff08;Obfuscated code&#xff09;作为一种常见的保护手段&#xff0c;通过将代码转换成难以理解的形式来提升应用被逆向破解的难度。本文将介绍代码混淆的概…

LainChain 原理解析:结合 RAG 技术提升大型语言模型能力

摘要&#xff1a;本文将详细介绍 LainChain 的工作原理&#xff0c;以及如何通过结合 RAG&#xff08;Retrieval-Aggregated Generation&#xff09;技术来增强大型语言模型&#xff08;如 GPT 和 ChatGPT 等&#xff09;的性能。我们将探讨 COT、TOT、RAG 以及 LangChain 的概…

千万别碰SLAM,会变得不幸--下阙

0.书接上回 之前的工作内容总结&#xff1a; 1.学习了回环检测的流程&#xff0c;还学习了DLoopDetector算法。 2.修改了vins-mono将匹配和回环到的图片进行保存。 3.找到了一个不是办法的办法来代替pr曲线&#xff0c;指定范围作真值。 4.大致了解了DTW地磁匹配算法&#xff…

SLAM算法与工程实践——SLAM基本库的安装与使用(3):Pangolin库

SLAM算法与工程实践系列文章 下面是SLAM算法与工程实践系列文章的总链接&#xff0c;本人发表这个系列的文章链接均收录于此 SLAM算法与工程实践系列文章链接 下面是专栏地址&#xff1a; SLAM算法与工程实践系列专栏 文章目录 SLAM算法与工程实践系列文章SLAM算法与工程实践…

脉冲压缩及MATLAB仿真

文章目录 前言一、脉冲压缩二、MATLAB 仿真1、LFM 脉冲压缩匹配滤波实现测距①、MATLAB 源码②、仿真结果1) LFM 时域波形2) LFM 频域波形3) 两个未分辨目标的合成回波信号4) 脉冲压缩检测距离 2、去协处理仿真①、MATLAB 源码②、仿真结果1) 未压缩回波信号&#xff0c;3个目标…

白皮书 | 分布式存储发展白皮书(2023)

12月1日&#xff0c;在2023云原生产业大会上&#xff0c;中国信通院云大所联合华为、戴尔科技、IBM等分布式存储产业方阵成员单位共同发布《分布式存储发展白皮书&#xff08;2023年&#xff09;》 一、数据智能的需求 &#xff08;一&#xff09;大模型训练需要海量的非结构…

转转闲鱼链接后台搭建教程+完整版源码

最新仿二手闲置链接源码 后台一键生成链接&#xff0c;后台管理教程&#xff1a;解压源码&#xff0c;修改数据库config/Congig 不会可以看源码里有教程 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3 后台一键生成链接&#xff0c;后台管理教…

【二分查找】LeetCode2141: 同时运行 N 台电脑的最长时间

作者推荐 贪心算法LeetCode2071:你可以安排的最多任务数目 本文涉及的基础知识点 二分查找算法合集 题目 你有 n 台电脑。给你整数 n 和一个下标从 0 开始的整数数组 batteries &#xff0c;其中第 i 个电池可以让一台电脑 运行 batteries[i] 分钟。你想使用这些电池让 全…

【微软技术栈】发布自己造的轮子 -- 创建Nuget包(分布操作)

目录 1、您的项目 2、创建 .nuspec 文件 3、一张图片胜过一千个拉取请求 4、包括自述文件 MD 文件 5、构建软件包 6、将包部署到 Nuget.Org 7、手动上传软件包 8、自动化和脚本化部署 9、我们如何构建和部署 ErrLog.IO Nuget 包 10、Nuget统计数据 11、最后的思考 创建 Nuget 包…

Hiera实战:使用Hiera实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…

【XILINX】ERROR:Place:1136 - This design contains a global buffer instance

记录一个ISE软件使用过程中遇到的问题及解决方案。 芯片&#xff1a;spartan6 问题 ERROR:Place:1136 - This design contains a global buffer instance, , driving the net,>, that is driving the following (first 30) non-clock load pins. This is not a recommended…