大数据——Spark Streaming

news2025/1/11 7:38:22

是什么

Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。
之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个实时的数据大屏,显示实时订单。
在这里插入图片描述
实时计算框架对比

框架类别框架类型数据单位其他吞吐量延迟
Storm流式计算框架record的处理数据单位支持micro-batch方式一般更低
Spark批处理计算框架RDD处理数据单位支持micro-batch流式处理数据更强一般

Spark Streaming组件

  • Streaming Context
    • 一个Context启动,则不能有新的DStream建立或者添加;
    • 一个Context停止,不能重新启动;
    • 在JVM中,只能有一个Streaming Context活跃;一个Spark Context会创建一个Streaming Context;
    • Streaming Context上调用stop方法,SparkContext也会关闭,如果只想关闭Streaming Context,可以设置stop()方法里的false参数;
    • 一个SparkContext对象可以重复创建多个Streaming Context对象,但每次只能运行一个,即需要关闭一个再开下一个。
  • DStream
    • 表示一个连续的数据流;
    • DStream内部是由一系列的RDD组成;
    • DStream中的每个RDD都有确定时间间隔内的数据;
    • 对DStream的操作都转换成对DStream隐含的RDD操作;
    • 数据源:
数据源类型
基本源TCP/IP or FileSystem
高级源Kafka or Flume

Spark Streaming编码步骤

import os
# 配置spark driver和pyspark运⾏时,所使⽤的python解释器路径
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
JAVA_HOME='/root/bigdata/jdk'
SPARK_HOME = "/root/bigdata/spark"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
	sc = SparkContext("local[2]",appName="NetworkWordCount")
	#参数2:指定执⾏计算的时间间隔
	ssc = StreamingContext(sc, 1)
	#监听ip,端⼝上的上的数据
	lines = ssc.socketTextStream('localhost',9999)
	#将数据按空格进⾏拆分为多个单词
	words = lines.flatMap(lambda line: line.split(" "))
	#将单词转换为(单词,1)的形式
	pairs = words.map(lambda word:(word,1))
	#统计单词个数
	wordCounts = pairs.reduceByKey(lambda x,y:x+y)
	#打印结果信息,会使得前⾯的transformation操作执⾏
	wordCounts.pprint()
	#启动StreamingContext
	ssc.start()
	#等待计算结束
	ssc.awaitTermination()

Spark Streaming状态操作

Spark Streaming存在两种状态操作:UpdateStateByKey和Window操作。

  • updateStateByKey
    如果没有updateStateByKey,我们需要将每一秒的数据计算好放入mysql中,再用mysql进行计算,而updateStateByKey将每隔一段数据进行打包,封装成RDD,这样每个时间片段的数据之间是没有关联的。一般为以下步骤:
  1. ⾸先,要定义⼀个state,可以是任意的数据类型
  2. 其次,要定义state更新函数–指定⼀个函数如何使⽤之前的state和新值来更新state
  3. 对于每个batch,Spark都会为每个之前已经存在的key去应⽤⼀次state更新函数,⽆论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
  4. 对于每个新出现的key,也会执⾏state更新函数
  • Window
    在这里插入图片描述
    Window操作是基于窗⼝⻓度和滑动间隔来⼯作的;窗⼝的⻓度控制考虑前⼏批次数据量;默认为批处理的滑动间隔来确定计算结果的频率。
    窗口长度L是运算的数据量;
    滑动间隔G是控制每隔多长时间做一次运算。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1071694.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#,数值计算——数据建模Fitexy的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { public class Fitexy { private double a { get; set; } private double b { get; set; } private double siga { get; set; } private double sigb { get; set; } …

快速搭建Springboot项目(一)

目录 第一章、Spring Boot框架介绍1.1)Springboot是什么,有什么好处1.2)spring boot的两大策略与四大核心 第二章、快速搭建spring boot 项目2.1)idea快速创建spring boot项目2.2)pom文件内容的含义2.3)起步…

195、SpringBoot--配置RabbitMQ消息Broker的SSL 和 管理控制台的HTTPS

开启Rabbitmq的一些命令: 小黑窗输入: rabbitmq-plugins enable rabbitmq_management 启动控制台插件,就是启动登录rabbitmq控制台的页面 rabbitmq_management 代表了RabbitMQ的管理界面。 rabbitmq-server 启动rabbitMQ服务器 上面这个&…

springboot中的静态资源规则~

静态资源处理: 默认的静态资源路径为 calsspath:/META-INF/resources/ classpath:/resources/ classpath:/static/ classpath:/public/如果我们将静态资源放置上述四种路径处,那么可以通过项目根路径/静态资源名称的方式访问到,否则会访问不…

Oracle-ASM实例communication error问题处理

问题背景: Oracle数据库日志出现大量的WARNING: ASM communication error: op 0 state 0x0 (15055)错误 问题分析: 首先检查ASM实例的状态,尝试通过sqlplus / as sysasm连接asm实例,出现Connected to an idle instance连接asm实例失败 检查ASM实例的后台…

mysql面试题27:数据库中间件了解过吗?什么是sharding jdbc、mycat,并且讲讲怎么使用?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:数据库中间件了解过吗,比如sharding jdbc、mycat? 我知道的数据库中间件有以下这些: MySQL Proxy:MySQL Proxy是一个开源的数据库中间件,它位…

SSM170基于SSM的疫情物质管理系统

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…

怎么禁止windows server2003系统中的用户进行本地登陆

随着科技的发展,电脑已经成为人们日常生活中必不可少的工具,电脑系统也在逐步更新,这就导致了许多人对于陌生的系统都不知道应该怎么办?当我们在使用windows server2003时,如何设置用户禁止本地登陆呢?接下…

【Linux初阶】多线程1 | 页表的索引作用 线程基础

本文要点 再次理解页表,了解页表是如何利用虚拟地址进行索引,实现数据读取和传输的了解线程概念,线程的优缺点,线程异常的后果了解线程和进程的差异了解线程库及其基本调用接口(进程创建、终止、等待、控制&#xff0…

SQL sever中的视图

目录 一、视图概述: 二、视图好处 三、创建视图 法一: 法二: 四、查看视图信息 五、视图插入数据 六、视图修改数据 七、视图删除数据 八、删除视图 法一: 法二: 一、视图概述: 视图是一种常用…

如何使用 Datree 避免 Kubernetes 错误配置

Kubernetes 是一个复杂的系统,具有许多移动部件。正确的配置规则对于您的服务可靠运行至关重要。当您在没有经过全面审查过程的情况下手动编写 Kubernetes 清单时,可能会出现错误。 Datree是一个基于规则的工具,可以自动查找清单中的问题。您可以使用它来发现策略违规行为,…

常见算法-洗扑克牌(乱数排列)

常见算法-洗扑克牌(乱数排列) 1、说明 洗扑克牌的原理其实与乱数排列是相同的,都是将一组数字(例如1∼N)打乱重新排列,只不过洗扑克牌多了一个花色判断的动作而已。 初学者通常会直接想到,随…

基于 ACK Fluid 的混合云优化数据访问(一):场景与架构

作者:车漾(必嘫) 本系列文章将介绍如何基于 ACK Fluid 支持和优化混合云的数据访问场景。 概述 在 AI 和大数据时代,算力即正义,强大的算力推动了源源不断的创新。然而,企业自建的算力集群存在资源容量和…

继续改进 换一种 使用 result 想直接用CourseExtend

改 c.cid cid, 表示 c.cid 在 from timetable tt inner join teacher t on tt.tidt.tid inner join course c on tt.cidc.cid where tt.cid#{cid} 查出来了 任何赋值给 后面那个cid t.tname "teacher.tname", 表示查出来 赋值给下图那个teacher类的对应属性…

解决:yarn 无法加载文件 “C:\Users\XXXXX\AppData\Roaming\npm\yarn.ps1,因为在此系统上禁止运行脚本“ 的问题

1、问题描述: 报错的整体代码为: yarn : 无法加载文件 C:\Users\admin\AppData\Roaming\npm\yarn.ps1,因为在此系统上禁止运行脚本 // 整体的报错代码为 : yarn : 无法加载文件 C:\Users\admin\AppData\Roaming\npm\yarn.ps1&…

HarmonyOS/OpenHarmony原生应用开发-华为Serverless云端服务支持说明(一)

云端服务的实现是HarmonyOS/OpenHarmony原生应用开发的一个重要的环节,如果用户端是鸿蒙原生应用,但是服务端即云端还是基于传统的各种WEB网络框架、数据库与云服务器,那么所谓的原生应用开发实现的数据即后端服务是和以前、现在的互联网、移…

线性代数中涉及到的matlab命令-第一章:行列式

目录 1,逆序数 2,行列式定义和性质 2.1,常用特性及命令 2.2,求行列式 2.3,行列式的性质 2,行列式按行(列)展开 3,范德蒙德行列式 在学习线性代数过程中&#…

ssm172基于SSM的旅行社管理系统的设计与实现

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…

python每日一练(3)

🌈write in front🌈 🧸大家好,我是Aileen🧸.希望你看完之后,能对你有所帮助,不足请指正!共同学习交流. 🆔本文由Aileen_0v0🧸 原创 CSDN首发🐒 如…

安装torchtext遇到的坑及解决办法

刚开始秉着需要什么就pip install什么的原则直接pip install torchtext,结果: 把我这个环境打乱了,自作主张的删掉之前的很多包重新安装了其他版本的包而不是自适应的安装当前torch所对应的torchtext。因为这个环境比较重要也用在其他的工程…