【Spark分布式内存计算框架——Spark Streaming】4.入门案例(下)Streaming 工作原理

news2025/1/13 3:36:03

2.3 Streaming 工作原理

SparkStreaming处理流式数据时,按照时间间隔划分数据为微批次(Micro-Batch),每批次数据当做RDD,再进行处理分析。
在这里插入图片描述
以上述词频统计WordCount程序为例,讲解Streaming工作原理。

创建 StreamingContext
当SparkStreaming流式应用启动(streamingContext.start)时,首先创建StreamingContext流式上下文实例对象,整个流式应用环境构建,底层还是SparkContext。
在这里插入图片描述

当StreamingContext对象构建以后,启动接收器Receiver,专门从数据源端接收数据,此接收器作为Task任务运行在Executor中,一直运行(Long Runing),一直接收数据。
在这里插入图片描述
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器,一直在运行,以Task方式运行,需要1Core CPU。
在这里插入图片描述
可以从多个数据源端实时消费数据进行处理,例如从多个TCP Socket接收数据,对每批次数据进行词频统计,使用DStream#union函数合并接收数据流,演示代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 从TCP Socket 中读取数据,对每批次(时间为5秒)数据进行词频统计,将统计结果输出到控制台。
* TODO: 从多个Socket读取流式数据,进行union合并
*/
object StreamingDStreamUnion {
def main(args: Array[String]): Unit = {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[4]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream01: DStream[String] = ssc.socketTextStream("node1.itcast.cn", 9999)
val inputDStream02: DStream[String] = ssc.socketTextStream("node1.itcast.cn", 9988)
// 合并两个DStream流
val inputDStream: DStream[String] = inputDStream01.union(inputDStream02)
// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)
// TODO: 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

接收器接收数据
启动每个接收器Receiver以后,实时从数据源端接收数据(比如TCP Socket),也是按照时间间隔将接收的流式数据划分为很多Block(块)。
在这里插入图片描述
接收器 Receiver划分流式数据的时间间隔BlockInterval ,默认值为 200ms,通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后,按照设置的存储级别对Block进行存储,从TCP Socket中接收数据默认的存储级别为:MEMORY_AND_DISK_SER_2,先存储内存,不足再存储磁盘,存储2副本。

从TCP Socket消费数据时可以设置Block存储级别,演示代码如下:

// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", //
9999, //
// TODO: 设置Block存储级别为先内存,不足磁盘,副本为1
storageLevel = StorageLevel.MEMORY_AND_DISK
)

汇报接收Block报告
接收器Receiver将实时汇报接收的数据对应的Block信息,当BatchInterval时间达到以后,StreamingContext将对应时间范围内数据block当做RDD,加载SparkContextt处理数据。

在这里插入图片描述
以此循环处理流式的数据,如下图所示:
在这里插入图片描述

Streaming 工作原理总述
整个Streaming运行过程中,涉及到两个时间间隔:

  • 批次时间间隔:BatchInterval
    • 每批次数据的时间间隔,每隔多久加载一个Job;
  • Block时间间隔:BlockInterval
    • 接收器划分流式数据的时间间隔,可以调整大小哦,官方建议最小值不能小于50ms;
    • 默认值为200ms,属性:spark.streaming.blockInterval,调整设置
      在这里插入图片描述

官方案例:

BatchInterval: 1s = 1000ms = 5 * BlockInterval
每批次RDD数据中,有5个Block,每个Block就是RDD一个分区数据

从代码层面结合实际数据处理层面来看,Streaming处理原理如下,左边为代码逻辑,右边为实际每批次数据处理过程。
在这里插入图片描述
具体运行数据时,每批次数据依据代码逻辑执行。

// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)

流式数据流图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/372610.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[数据结构]:06-队列(链表)(C语言实现)

目录 前言 已完成内容 队列实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-QueueCommon.cpp 04-QueueFunction.cpp 结语 前言 此专栏包含408考研数据结构全部内容,除其中使用到C引用外,全为C语言代码。使用C引用主要是为了简化…

Spring Cache的使用--快速上手篇

系列文章目录 分页查询–Java项目实战篇 全局异常处理–Java实战项目篇 完善登录功能–过滤器的使用 更多该系列文章请查看我的主页哦 文章目录系列文章目录前言一、Spring Cache介绍二、Spring Cache的使用1. 导入依赖2. 配置信息3. 在启动类上添加注解4. 添加注解4.1 CacheP…

duboo+zookeeper分布式架构入门

分布式 dubbo Zookeeper 分布式系统就是若干独立计算机的集合(并且这些计算机之间相互有关联,就像是一台计算机中的C盘F盘等),这些计算对于用户来说就是一个独立的系统。 zookeeper安装 下载地址:Index of /dist/z…

MyBatis——增删改查操作的实现

开启mybatis sql日志打印 可以在日志中看到sql中执行的语句 在配置文件中加上下面这几条语句 mybatis.configuration.log-implorg.apache.ibatis.logging.stdout.StdOutImpl logging.level.com.example.demodebug查询操作 根据用户id查询用户 UserMapper: User…

RTD2169芯片停产|完美替代RTD2169芯片|CS5260低BOM成本替代RTD2169方案设计

RTD2169芯片停产|完美替代RTD2169芯片|CS5260低BOM成本替代RTD2169方案设计 瑞昱的RTD2169芯片目前已经停产了, 那么之前用RTD2169来设计TYPEC转VGA方案的产品,该如何生产这类产品?且RTD2169芯片价格较贵,芯片封装尺寸是QFN40&…

JS函数的4种调用方式

函数可以声明定义,也可以是一个表达式,函数使用关键字function定义函数被定义时,函数内部的代码不会执行函数被调用时,函数内部的代码才会执行函数有四种调用方式,每种方式的不同在于this的初始化。(this是…

HTML#1快速入门

一. 简介HTML是一门语言, 所有的网页都是用HTML编写的HTML(Hyper Text Markup Language): 超文本(超越了文本限制,除了文字信息还可以定义图片,音频,视频等)标记语言(有标签构成的语言)W3C标准: 网页主要由三部分组成(1) 结构: HTML(2) 表现: CSS(3) 行为: JavaScript二. 快速入…

optional说明

1.说明 public final class Optional<T> extends Object 可能包含或不包含非空值的容器对象。 如果一个值存在&#xff0c; isPresent()将返回true和get()将返回值。 提供依赖于存在或不存在包含值的其他方法&#xff0c;例如orElse() &#xff08;如果值不存在则返回…

印度这事真的干的挺棒的! |

来源&#xff1a;statista最近逛外网看到一张图&#xff0c;是关于印度家庭自来水供应的对比图。Crore是印度的单位千万(卢比)&#xff0c;所以他们从2019年供应3.23千万家庭&#xff0c;增长到了2022年的9.57万家庭&#xff0c;印度这事真的干的挺棒的&#xff01;一直以来印度…

【USB】windows热插拔通知接口分析

文章目录接口介绍概述过滤器介绍举例接收通知创建窗口参考文档接口介绍 概述 window提供了RegisterDeviceNotificationW方法&#xff0c;可以用来监听设备的热插拔事件。 HDEVNOTIFY RegisterDeviceNotificationW([in] HANDLE hRecipient,[in] LPVOID NotificationFilter,[in]…

Android 多种支付方式的优雅实现

场景App 的支付流程&#xff0c;添加多种支付方式&#xff0c;不同的支付方式&#xff0c;对应的操作不一样&#xff0c;有的会跳转到一个新的webview&#xff0c;有的会调用系统浏览器&#xff0c;有的会进去一个新的表单页面&#xff0c;等等。并且可以添加的支付方式也是不确…

计算机网络-网络核心(day02)

网络核心 最主要的功能&#xff1a; 数据交换的功能 转发&#xff0c;路由 主要分为线路交换&#xff0c;分组交换 线路交换 可以认为所有的电话通信都是线路交换 线路交换&#xff0c;比如打电话&#xff0c;需要先建立连接&#xff08;主机要经过哪些链路哪些交换机&#…

软件测试之因果图法

因果图法 1. 概述 因果图法是一种**利用图解法分析输入条件、输出结果的各种组合情况,**从而设计测试用例的方法. 因果图法适用于有多个输入和多个输出&#xff0c;而且输入和输入之间有相互的组合关系&#xff0c;输入和输出之间有相互的制约和依赖关系. 使用场景和判定表…

第一个 Spring MVC 注解式开发案例(初学必看)

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

HashMap存入一个键值对的中间发生了什么(浅读源码)

存入键值对就是调用HashMap.put()过程&#xff08;面试高频问题&#xff09;首先放出粗狂的总结假如是Integer类型就会先查看IntegerCache中是否存在这个数字&#xff0c;有就从缓存中调用&#xff0c;没有则创建新的Integer对象&#xff08;String类型没有这个过程&#xff0c…

【JavaSE】复习(进阶)

文章目录1.final关键字2.常量3.抽象类3.1概括3.2 抽象方法4. 接口4.1 接口在开发中的作用4.2类型和类型之间的关系4.3抽象类和接口的区别5.包机制和import5.1 包机制5.2 import6.访问控制权限7.Object7.1 toString()7.2 equals()7.3 String类重写了toString和equals8.内部类8.1…

【谷粒学院】vue、axios、element-ui、node.js(44~58)

44.前端技术-vue入门 &#x1f9e8;Vue.js 是什么 Vue (读音 /vjuː/&#xff0c;类似于 view) 是一套用于构建用户界面的渐进式框架。 Vue 的核心库只关注视图层&#xff0c;不仅易于上手&#xff0c;还便于与第三方库或既有项目整合。另一方面&#xff0c;当与现代化的工具…

RK3568编译Android11和目录讲解

文章目录 前言一、下载android11源码二、环境搭建1.增加交换内存三、编译瑞芯微原厂源码四、目录讲解总结前言 本文记录在Ubuntu18.04中编译Android11,只有编译了源码,后面才能进行驱动的开发,有兴趣的小伙伴可以和我一起学习吧! 提示:以下是本篇文章正文内容,下面案例可…

@Value注解的使用(可用于配置文件)

基本概念Value&#xff1a;注入配置文件中的内容。只要是spring的注解类&#xff08;service,compotent, dao等&#xff09;中都可以。Component&#xff1a;泛指组件&#xff0c;当组件不好归类的时候&#xff0c;可以使用这个注解进行标注。AutoWired&#xff1a;自动导入依赖…

【JAVA程序设计】(C00104)基于Springboot的家庭理财管理系统——有文档

基于Springboot的家庭理财管理系统项目简介项目获取开发环境项目技术运行截图运行视频项目简介 基于Springboot开发的家庭理财管理系统设计与实现共分为三个角色&#xff1a;系统管理员、家庭管理员、家庭用户 管理员角色包含以下功能&#xff1a; 用户管理、修改密码、角色管…