Spark Streaming的核心功能及其示例PySpark代码

news2025/1/23 20:23:16

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext

# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔

# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)

# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# 打印每个RDD中的前10个元素
word_counts.pprint()

# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))

# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)

windowed_word_counts.pprint()

ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点

def updateFunction(new_values, running_count):
    if running_count is None:
        running_count = 0
    return sum(new_values, running_count)

lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))

# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)

stateful_word_counts.pprint()

ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)

# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]

# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])

words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

word_counts.pprint()

ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2281072.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JS宏进阶:正则表达式的使用

正则表达式,对于任何一门编程语言来说,都是一种非常强大的工具,主要用于搜索、编辑或操作文本和数据。因此,在JS中,也存在相应的对象new RegExp( ),在本章中,将详细介绍正则表达式在JS宏中的运用…

深度学习笔记——循环神经网络RNN

大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细介绍面试过程中可能遇到的循环神经网络RNN知识点。 文章目录 文本特征提取的方法1. 基础方法1.1 词袋模型(Bag of Words, BOW)工作原…

Git进阶笔记系列(01)Git核心架构原理 | 常用命令实战集合

读书笔记:卓越强迫症强大恐惧症,在亲子家庭、职场关系里尤其是纵向关系模型里,这两种状态很容易无缝衔接。尤其父母对子女、领导对下属,都有望子成龙、强将无弱兵的期望,然而在你的面前,他们才是永远强大的…

SpringBoot读取yml配置文件一组对象数据初始化

1. yml的短横杠语法2. yml数组元素读取并初始化3. 测试结果 1. yml的短横杠语法 - 短横杠加空格,可以表示数组元素,如下配置 表示有名为apps的一组数据,数组的元素对象包含有corpId、corpSecret、appCode三个字段像server.port没有 - 表示的…

基于JAVA的校园二手商品交易平台的设计与开发

摘 要:政府政策引导与社会观念的转变使得国内大学生的创业意识逐渐提高,很多高校大学生开始自主创业。目前我国各大高校暂且还没有较为成型的针对校内学生创业者的校园网络服务平台。本文首先主要是介绍了关于java语言以及web开发的相关技术,…

深度学习核函数

一、核函数的基本概念 核函数在机器学习中具有重要应用价值,常用于支持向量机(SVM)等算法中。 核函数是面试中经常被考到的知识点,对于找工作和实际数据转换都有重要作用。 二、数据建模与核函数的作用 数据越多,可…

数据结构(三) 排序/并查集/图

目录 1. 排序 2.并查集 3.图 1.排序: 1.1 概念: 排序就是将数据按照某种规则进行排列, 具有某种顺序. 分为内排序和外排序. 内排序就是: 将数据放在内存中的排序; 外排序是: 数据太多无法在内存中排序的. 1.2 插入排序: 插入排序包含: 直接插入排序和希尔排序. (1) 直接插入…

ECCV 2024,全新激活函数!

激活函数对深度神经网络的成功可太重要了,它可以提升学习复杂关系的能力,减少过拟合,增强模型性能,与它相关的研究一直是重中之重。最近,这方向有了不少新突破。 ECCV 2024上的这篇,提出了一种可训练的高表…

小米Vela操作系统开源:AIoT时代的全新引擎

小米近日正式开源了其物联网嵌入式软件平台——Vela操作系统,并将其命名为OpenVela。这一举动在AIoT(人工智能物联网)领域掀起了不小的波澜,也为开发者们提供了一个强大的AI代码生成器和开发平台。OpenVela项目源代码已托管至GitH…

ComfyUI实现老照片修复——AI修复老照片(ComfyUI-ReActor / ReSwapper)尚待完善

AI修复老照片,试试吧,不一定好~~哈哈 2023年4月曾用过ComfyUI,当时就感慨这个工具和虚幻的蓝图很像,以后肯定是专业人玩的。 2024年我写代码去了,AI做图没太关注,没想到,现在ComfyUI真的变成了工…

YOLOv5训练自己的数据及rknn部署

YOLOv5训练自己的数据及rknn部署 一、下载源码二、准备自己的数据集2.1 标注图像2.2 数据集结构 三、配置YOLOv5训练3.1 修改配置文件3.2 模型选择 四、训练五、测试六、部署6.1 pt转onnx6.2 onnx转rknn 七、常见错误7.1 训练过程中的错误7.1.1 cuda: out of memory7.1.2 train…

C# OpenCvSharp 部署文档矫正,包括文档扭曲/模糊/阴影等情况

目录 说明 效果 模型 项目 代码 下载 参考 C# OpenCvSharp 部署文档矫正,包括文档扭曲/模糊/阴影等情况 说明 地址:https://github.com/RapidAI/RapidUnDistort 修正文档扭曲/模糊/阴影等情况,使用onnx模型简单轻量部署&#xff0c…

贪心算法(题1)区间选点

输出 2 #include <iostream> #include<algorithm>using namespace std;const int N 100010 ;int n; struct Range {int l,r;bool operator <(const Range &W)const{return r<W.r;} }range[N];int main() {scanf("%d",&n);for(int i0;i&l…

煤矿场景下安全帽检测数据集VOC+YOLO格式179张2类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;170 标注数量(xml文件个数)&#xff1a;170 标注数量(txt文件个数)&#xff1a;170 标注…

RTX 5090原型据称有24576个CUDA核心和800 W TDP -两个16针连接器

英伟达今年早些时候发布、将于1月30日上市的GeForce RTX 5090&#xff0c;有望成为最出色的显卡之一。然而&#xff0c;硬件侦探HXL发掘出了一款疑似早期原型产品。不过&#xff0c;考虑到传闻中的规格参数&#xff0c;它很有可能会成为GeForce RTX 5090 Ti或者RTX Titan Black…

哪个控制面板适合您?

如今&#xff0c;VPS云主机的控制面板在网站托管中变得越来越重要。对于网站管理者和普通用户来说&#xff0c;这类控制面板提供了一站式的管理工具&#xff0c;可以在同一个界面中处理所有网站的管理任务&#xff0c;极大地减少了多系统间重复操作的麻烦。 但随着越来越多的公…

Redis - 通用命令

目录 了解Redis客户端set 和 getRedis全局命令keys命令exists命令del命令expire命令ttl命令Redis中key的过期策略type 了解Redis客户端 想要输入Redis命令,必须先进入Redis客户端 使用redis-cli连接本机的命令行客户端 redis-cli如果想连接其他的ip和端口的客户端&#xff0…

重学SpringBoot3-WebClient配置与使用详解

更多SpringBoot3内容请关注我的专栏&#xff1a;《SpringBoot3》 期待您的点赞??收藏评论 重学SpringBoot3-WebClient配置与使用详解 1. 简介2. 环境准备 2.1 依赖配置 3. WebClient配置 3.1 基础配置3.2 高级配置3.3 retrieve()和exchange()区别 4. 使用示例 4.1 基本请求操…

HTML5 新表单属性详解

HTML5 为 <form> 和 <input> 标签引入了一系列新属性&#xff0c;极大地增强了表单的功能和用户体验。这些新属性不仅简化了开发者的工作&#xff0c;还为用户提供了更友好、更高效的交互方式。本文将详细介绍这些新属性&#xff0c;并结合代码示例帮助大家更好地理…

【专题】为2025制定可付诸实践的IT战略规划报告汇总PDF洞察(附原数据表)

原文链接&#xff1a;https://tecdat.cn/?p39055 在当今瞬息万变的商业环境中&#xff0c;制定有效的 IT 战略规划对于企业的成功与可持续发展至关重要。本报告深入探讨了制定 IT 战略规划的关键活动&#xff0c;旨在为企业和决策者提供全面且实用的指导。 Gartner的《为202…