Spark Steaming有状态转换实验

news2025/1/15 17:33:49

创建一个streaming目录

mkdir streaming

一、运行网络版的WordCount

1. 连接虚拟机后利用sudo打开hosts后加入红色方框内语句并保存:

   sudo vim /etc/hosts

  

Netcat是一个用于TCP/UDP连接和监听的Linux工具, 主要用于网络传输及调试领域。先下载:

sudo apt-get update
sudo apt-get -y install netcat-traditional

2. 启动 NetCat 服务端,并在1234端口监听

nc -lk  1234    #lsn下面输入nc –l –p 1234

  1. 使用xshell 打开一个新的选项卡,连接虚拟机。启动NetCat客户端,并连接Netcat服务端

nc localhost  1234

注意:如果客户端和服务端不在同一台机器,localhost 可以换成实际IP

  1. 在服务端输入以下字符串,并按回车,可以在客户端收到消息,并打印出来。这里注意替换学号为你个人学号。

hello 你的学号

  1. 在客户端输入字符串,并按回车,可以在服务端收到消息,并打印出来。这里注意替换学号为你个人学号。

你好  你的学号

  1. NetCat 客户端的选项卡使用quit终止客户端进程。
  2. 利用有状态操作updateStateByKey实现词频统计。

streaming目录下新建一个stateful目录,用于保存持久化的数据;接着编写独立的NetWordCountStateful.py代码

1

from pyspark import SparkContext

2

from pyspark.streaming import StreamingContext

3

sc = SparkContext("local[2]","NetworkWordCountStateful")

4

ssc = StreamingContext(sc,10)

5

6

ssc.checkpoint("file:///home/ubuntu/streaming/stateful")

7

def updateFunction(newValues, runningCount):

8

    if runningCount is None:

9

        runningCount = 0

10

    return sum(newValues, runningCount)

11

lines = ssc.socketTextStream('localhost', 1234)

12

running_counts = lines.flatMap(lambda line:line.split(' ')).map(lambda x:(x,1)).updateStateByKey(updateFunction)

13

running_counts.pprint()

14

ssc.start()

15

ssc.awaitTermination()

注意:有状态转换需要进行设置检查点。

新建一个终端,开启服务端

nc -l -p 1234

再建一个“流计算”终端,运行NetWordCountStateful.py代码:

cd $SPARK_HOME/bin

spark-submit /路径/NetWordCountStateful.py

  1. NetCat 服务端输入以下字符串,并按回车,观察Streaming WordCount的输出,并截图。

You jump I jump 1234

  1. 在客户端查看统计结果:

再次在服务端口输入以下字符串:

You and I jump 1234

回车后再次观察Streaming WordCount的输出,是否是累加后的结果。

可以用quitctrl+c停掉客户端,利用xshell的回滚来查看结果,因为回滚较快,所以在运行状态下查看结果截图较困难。

二、利用滑动窗口实现WordCount

1. 创建文件流监听目录:

mkdir logfile

cd logfile

  1. 在streaming目录下新建一个code文件夹,用于持久化数据的存储;然后新开一个终端,输入“pyspark”进入PySpark交互式环境后,输入以下代码:

from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc,10)

ssc.checkpoint("file:///home/ubuntu/streaming/code")

lines = ssc.textFileStream("file:///home/ubuntu/streaming/logfile")

counts = lines.flatMap(lambda x:x.split(' ')).map(lambda x:(x,1)).reduceByKeyAndWindow(lambda x, y:x+y, lambda x,y:x-y, 30, 10)

counts.pprint()

ssc.start()

ssc.awaitTermination()

输入ssc.start()后,程序就开始自动进入循环监听状态,如下图所示:

打开一个新的shell窗口,切换到logfile目录下,创建一个log.txt文档保存,再创建一个log_new.txt文档保存,里面输入一些随意的单词,并用空格间隔开。查看监听页面,可以看到打印结果,例如下图所示:

注:log.txt输入为“a b c a b c d”,log_new.txt输入为“a b d e f a b e f”。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1968307.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简明中医辨证施治小程序

这是一款学习中医辨证施治的好工具,将中医内科、妇科、儿科常见疾病的辨证施治进行整理,各种疾病的辨证分型、症状、治法、方剂选用等均编辑成简明的条目,一目了然,另外内含方剂学及中药学,内容包括常用方剂的出处、组…

GO goroutine状态流转

Gidle -> Grunnable newproc获取新的goroutine,并放置到P运行队列中 这也是go关键字之后实际编译调用的方法 func newproc(fn *funcval) {// 获取当前正在运行中的goroutinegp : getg()// 获取调用者的程序计数器地址,用于调试和跟踪pc : getcallerp…

量化小白也能自动化挖掘出6万+因子

最近逛某乎,碰到了这个问题:如何看待量化交易WorldQuant世坤大赛北大牛人提交了6万因子? 我的第一直觉,这肯定不是纯手工挖出来的,6w个因子,一天挖一个,节假日都不休息的话,需要164年…

轻松入门Linux—CentOS,直接拿捏 —/— <6>vim集合

一、Vim操作详解 1、linux彩蛋 输入命令python会启动Python解释器,允许你输入和执行Python代码。然后,输入import this会导入this模块,它是Python的一种彩蛋(Easter egg),然后得到下列结果 选中这段结果复…

Nacos安装教程(全网最靠谱,最简单~)

Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。 本文将详细介绍 Nacos 的安装及使用。 官方网址:Nacos官网| Nacos 配置中心 | Nacos 下载| Nacos 官方…

3.7.物体检测算法

物体检测算法 1.R-CNN ​ 首先使用启发式搜索算法来选择锚框,使用预训练模型对每个锚框抽取特征,训练一个SVM来对类别分类,最后训练一个线性回归模型来预测边缘框偏移。 ​ R-CNN比较早,所以使用的是SVM 1.1 兴趣区域(RoI)池化…

【Qwen2微调实战】LLaMA-Factory框架对Qwen2-7B模型的微调实践

系列篇章💥 No.文章1【Qwen部署实战】探索Qwen-7B-Chat:阿里云大型语言模型的对话实践2【Qwen2部署实战】Qwen2初体验:用Transformers打造智能聊天机器人3【Qwen2部署实战】探索Qwen2-7B:通过FastApi框架实现API的部署与调用4【Q…

【数据结构进阶】手撕红黑树

🔥个人主页: Forcible Bug Maker 🔥专栏: C || 数据结构 目录 🌈前言🔥红黑树的概念🔥手撕红黑树红黑树结点的定义红黑树主体需要实现的成员函数红黑树的插入findEmpty和Size拷贝构造析构函数和…

CANFD报文 位时间 理解

🍅 我是蚂蚁小兵,专注于车载诊断领域,尤其擅长于对CANoe工具的使用🍅 寻找组织 ,答疑解惑,摸鱼聊天,博客源码,点击加入👉【相亲相爱一家人】🍅 玩转CANoe&…

PCB设计——51单片机核心板布线以及原理图

首先是最小系统板,包括晶振电路,电源电路,复位电路 对应pcb板图

HTTP协议:网络通信的基石

一、引言 HTTP(HyperText Transfer Protocol),即超文本传输协议,是当今互联网世界中最为重要的协议之一。它是客户端和服务器之间进行通信的规则和标准,使得我们能够在浏览器中浏览网页、下载文件、提交表单等各种操作…

AT32F403A/421 SVPWM驱动无刷电机开环速度测试

AT32F403A/421 SVPWM驱动无刷电机开环速度测试 📌相关篇《HAL STM32F4 ARM DSP库跑SVPWM开环速度测试》 ✨本测试工程基于上面的运行例程移植而来。主要用来测试驱动无刷电机性能方面的差异。 🔖工程基于AT32_Work_Bench创建。 🔰AT32F403A和…

卷积神经网络随记

1.问题描述:一般而言,几个小滤波器卷积层的组合比一个大滤波器卷积层要好,比如层层堆叠了3个3x3的卷积层,中间含有非线性激活层,在这种排列下面,第一个卷积层中每个神经元对输入数据的感受野是3x3&#xff…

Verilog语言和C语言的本质不同点是什么?

在开始前刚好我有一些资料,是我根据网友给的问题精心整理了一份「c语言的资料从专业入门到高级教程」,点个关注在评论区回复“666”之后私信回复“666”,全部无偿共享给大家!!! 在c语言中,如果你…

7.Redis的Hash类型

Hash类型,也叫散列,其value是一个无序字典,类似于HashMap结构。 问题 String结构是将对象序列化为json字符串后存储,当需要修改对象某个字段是不是很方便。 key value…

【计算机遥感方向】SCI期刊推荐!水刊、顶刊齐聚在此,速投!

本期将为您带来五本计算机SCI 妥妥毕业神刊! IEEE TRANSACTIONS ON GEOSCIENCE AND REMOTE SENSING International Journal of Applied Earth Observation and Geoinformation INTERNATIONAL JOURNAL OF REMOTE SENSING Geocarto International RADIO SCIEN…

蔚来智驾的大模型之路:自研芯片 + 世界模型 + 群体智能

作者 |德新 编辑 |王博 7月27日上周末,蔚来举办第二届NIO IN。 李斌说,2023年的第一届NIO IN像是一个大纲,第一次对外完整展示了蔚来布局的12大技术领域。 而这届,更像第一个交付的章节。它重点展示了5项阶段性的进展&#xff…

智能电池管理,soc、soh、comsol锂电池仿真

锂离子电池,作为能源转型与电动车市场崛起的基石,正迎来研发与应用的飞跃。面对繁杂设计参数与实验盲点,电池仿真技术,尤以COMSOL为代表的多物理场仿真,精准解析电池内部机理,从微观行为到宏观性能&#xf…

LoRA:大模型的轻量级高效微调方法

文章目录 1. 模型微调的两种方式2. LoRA 实现 LoRA是一种轻量化且效果非常突出的大模型微调方法,与使用Adam微调的GPT-3 175B相比,LoRA可以将可训练参数的数量减少10000倍,并将GPU内存需求减少3倍。 paper:LoRA: Low-Rank Adapta…

二维码门楼牌管理应用平台建设:流程优化与全面考量

文章目录 前言一、工作流程优化:移动端采集与实时更新二、数据完整性与准确性保障三、效率提升与成本节约四、扩展性与未来发展五、数据安全与隐私保护六、用户培训与技术支持 前言 随着智慧城市建设的不断深入,二维码门楼牌管理应用平台作为城市管理的…