Struct Streaming

news2025/1/19 2:24:17

spark进行实时数据流计算时有两个工具

Spark Streaming:编写rdd代码处理数据流,可以解决非结构化的流式数据

Structured Streaming:编写df代码处理数据流,可以解决结构化和半结构化的流式数据

实时计算

实时计算,通常也称为“实时流计算”、“流式计算”

流数据处理是指实时、连续地处理数据流。数据在被产生或接收后立即处理,并不需要等待所有数据到齐。数据的处理和传输是“逐条”进行的。

处理时间:由于数据被实时处理,系统响应时间非常短,通常在毫秒或秒级。

数据量:流数据通常是无限的,数据持续不断地被生成和处理,系统需要持续运行。

公司活动需要实时查看活动效果,适合短期时间的计算

离线计算

离线计算,通常也称为“批处理”,表示那些离线批量、延时较高的静态数据处理过程。

批数据处理是指在一个预定时间内收集一批数据,然后一次性对这批数据进行处理。数据是成批处理的,而不是逐条处理。

处理时间:批处理通常不是实时的,处理的延迟可能是分钟、小时甚至更长,T+1。

数据量:批处理通常在所有数据收集完毕后进行,这意味着处理的数据集是固定大小的(如每日、每小时的数据)。数据处理完成后自动结束

固定指标计算,需要每天查看的数据内容

 有界和无界数据

  • 有界数据

    • 有起始位置,有结束位置。比如文件数据 有起始行,有结束行

    • 有明确的数据容量大小。处理数据时就能知道处理的数据大小

    • 在处理数据时,按批次处理。

    • 数据处理完成程序就结束

    • 离线计算时处理的都是有界数据

  • 无界数据

    • 有起始位置,没有结束位置,知道数据的起始位置在哪里,但是数据到哪结束不知道(因为数据在不断产生,什么时候结束不知道)

    • 流式数据都是无界数据

    • 无界数据的总量是不确定,数据是不断产生的

    • 数据有时效性 (有效期)

    • 处理无界数据时,程序时持续运行的

实现

1)需要安装ncat服务 

在线安装:yum install nc

离线安装:rpm -ivh ncat-7.93-1.x86_64.rpm

2)启动服务绑定端口号:9999(若这个端口号被占用,可以换没有占用的端口号)

Structured Streaming代码程序

  • 使用的是sparkSQl,所以在进行代码编写使用sparksql的方法进行编写

  • 使用sparksession

3)执行代码

# 读取socket产生的实时数据流
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
# 数据流读取使用readStream
option = {
    # 指定读取的socket服务的ip地址
    'host': '192.168.88.100',
    # 指定读取的端口
    'port': 9999
}
# 将读取的流式数据转为无界表保存在dataframe
df = ss.readStream.load(format='socket', **option)

# df数据处理


# 数据的输出展示
# format 指定输出的服务  console 输出终端界面   kafka  csv   json   es
# outputMode 输出模式   append  complete  update
# awaitTermination 让程序阻塞等待,可以是实现程序持续运行获取实时数据
df.writeStream.start(format='console',outputMode='append').awaitTermination()

 4)在终端输入数据

5)查看运行结果,满足条件输出,不满足条件为空 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2214145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面腾讯后台开发,二面挂掉了,,,

随着各厂秋招的开启,收到面试邀请的同学也越来越多。在当年和我一起找实习的同学里面,有实力较强的同学收到了腾讯后台开发的校招面试邀请。但面试不止是实力的竞争,也有很重要的运气的因素。 虽然我的同学在腾讯后台开发的二面中挂掉了&…

Mybatis中的映射文件编写原则

先来回顾一下,在Java项目中如何使用Mybatis执行SQL语句: 添加依赖:在项目中添加 MyBatis 和数据库驱动的依赖。配置 MyBatis:创建 MyBatis 的配置文件,配置数据源和 Mapper 映射。创建 POJO 类:定义与数据…

拒绝飞单,微信监控轻松搞定!

微信作为广泛使用的社交工具,其安全性和监控问题受到了广泛关注。对于企业来说,确保客户资源的安全和防止员工“飞单”是重要的管理挑战。以下是一些有效的方法和工具,可以帮助企业提高微信的安全性,防止飞单,从而保护…

LLM - 配置 ModelScope SWIFT 测试 Qwen2-VL 模型推理(Infer) 教程 (1)

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/142827217 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 SWIFT …

QT文件操作【记事本】

mainwindow.h核心函数 QFileDialog::getOpenFileName()QFileDialog::getSaveFileName() #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include<QFileDialog> #include<QMessageBox> #include<QDebug> #include<QFile> #…

Apache Kafka的生态

Kafka 生态系统 微信公众号&#xff1a;阿俊的学习记录空间 小红书&#xff1a;ArnoZhang wordpress&#xff1a;arnozhang1994 博客园&#xff1a;arnozhang CSDN&#xff1a;ArnoZhang1994 以下是与 Kafka 集成的工具列表&#xff0c;涵盖了不同领域的工具和扩展。这些…

Jmeter如何进行多服务器远程测试?

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 JMeter是Apache软件基金会的开源项目&#xff0c;主要来做功能和性能测试&#xff0c;用Java编写。 我们一般都会用JMeter在本地进行测试&#xff0c;但是受到单…

鱼跃医疗荣获深交所信息披露工作“A”级评价

2024年10月11日&#xff0c;深圳证券交易所&#xff08;以下简称“深交所”&#xff09;发布了《关于深市上市公司2023-2024年度信息披露评价结果的通报》&#xff0c;鱼跃医疗&#xff08;002223.SZ&#xff09;凭借在信息披露质量、投资者关系管理等各方面的优异表现&#xf…

总结拓展十四:批次管理(1)

1、批次的概念 批次是指生产或采购过程中&#xff0c;为了区分不同供应商之间相同产品间的微小区别而进行的管理方式。它通常用于确保产品质量的一致性和可追溯性。批次的概念可以应用于多个领域&#xff0c;包括生产、采购、物流、销售等。 2、批次管理的概念 批次管理是指对…

SIC MOS的保护方式

SIC MOS与IGBT短路保护有所不同的原因&#xff1a; 由于SIC MOS芯片尺寸较小(散热能力较差&#xff0c;在短路情况下&#xff0c;浪涌电流会产生大量的热量)&#xff0c;SIC MOS的浪涌能力低于IGBT。SiC MOSFET 和 IGBT 的输出特性不同&#xff0c;在正常导 通状态期间&#x…

【vue3】弹幕效果实现

本次弹幕基于vue3-danmaku组件实现。 弹幕效果 1.安装插件 npm install vue3-danmaku --save 2.基础使用方法 <template><vue-danmaku v-model:danmus"danmus" loop style"height:100px; width:300px;"></vue-danmaku> </templat…

发布自己的python包

文章目录 概要模块和包发布自己的package创建目录结构发布 概要 提示&#xff1a;这里可以添加技术概要 例如&#xff1a; openAI 的 GPT 大模型的发展历程。 模块和包 在Python中&#xff0c;程序的划分可以分为三个层次&#xff1a;脚本、模块和包 script&#xff1a;独…

【人工智能】探索最强AI工具:实际应用与影响

随着人工智能&#xff08;AI&#xff09;技术的迅猛发展&#xff0c;AI工具已经深入到人们日常生活和工作的方方面面。这些工具不仅提高了生产力&#xff0c;还改变了我们解决问题和处理信息的方式。在中文互联网和国际市场中&#xff0c;众多AI工具已成为人们工作、学习、创作…

软考结构化开发 -- (耦合,内聚,设计原则,系统文档,数据字典)

文章目录 一、耦合二、内聚三、设计原则四、系统文档五、数据字典 一、耦合 模块化&#xff1a;将一个待开发的软件分解成若干个小的简单部分–模块模块独立 无直接耦合&#xff1a;指两个模块之间没有直接的关系&#xff0c;它们分别从属于不同模块的控制与调用&#xff0c;…

spring boot itext7 修改生成文档的作者、制作者、标题,并且读取相关的信息。

1、官方的example文件&#xff1a;iText GitHub itext-java-7.2.5\kernel\src\test\java\com\itextpdf\kernel\pdf\PdfStampingTest.java 2、修改代码&#xff1a; Testpublic void stamping1() throws IOException {String filename1 destinationFolder "stamping1_…

特惠电影票API接口的优势功能以及对接因素?

特惠电影票对接接口通常是指允许第三方开发者或平台通过编程方式接入电影票预订服务的API。这些接口可以提供查询电影场次、座位信息、票价、订票、支付等功能。以下是一些关键点和考虑因素&#xff0c;以及一些提供特惠电影票API接口的平台&#xff1a; 关键功能 电影场次信息…

RK3568 4G模块移远 EM05-CE

首先确保4G模块的天线是否正确安装,这一步会影响到后面测试成功与失败,购买模块时可以咨询厂家 然后就可以进行测试,首先需要关闭所有网络设备,以我的开发板为例子,确保等会使用的是4G模块的功能 ifconfig eth0 down ifconfig eth1 down ifconfig eth2 down 然后查看ifc…

java-day7

练习一&#xff1a;飞机票 需求: ​ 机票价格按照淡季旺季、头等舱和经济舱收费、输入机票原价、月份和头等舱或经济舱。 ​ 按照如下规则计算机票价格&#xff1a;旺季&#xff08;5-10月&#xff09;头等舱9折&#xff0c;经济舱8.5折&#xff0c;淡季&#xff08;11月到来…

【Spring相关技术】spring进阶-自定义请求报文转对象HttpMessageConverter

文章目录 类继承体系核心类与接口说明底层调用链完整示例步骤 1: 创建自定义的HttpMessageConverter步骤 2: 配置Spring MVC使用自定义转换器步骤 3: 使用自定义转换器 相关文献 类继承体系 默认转换器即springmvc默认的转换器&#xff0c; 用的比较多的是以下两种&#xff0c;…

生信技能61 - 获取比对后BAM文件的多项基础统计指标

获取比对后BAM文件的多项基础统计指标 1. 运行实例 采用pysam库解析bam文件,将bam文件路径作为输入参数,统计输出比对的参考基因组大小、ATCGN各碱基数量/比率、GC含量、UR reads数量、平均测序深度、总reads数量等统计指标。 python bam_statistics.py -b sample.sorted.…