Spark-Streaming有状态计算

news2025/1/8 12:45:49

一、上下文

《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。

二、官方例子

所属包:org.apache.spark.examples.streaming

object StatefulNetworkWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    //创建微批为 1 秒的上下文
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    //指定 checkpoint 目录
    ssc.checkpoint(".")

    // 用一个 List 初始化一个 RDD
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

    // 在目标ip:port上创建一个ReceiverInputDStream,并对分隔测试的输入流中的单词进行计数(例如由'nc'生成)
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // 使用mapWithState更新累积计数这将给出一个由状态组成的DStream(即单词的累积计数)
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }

    val stateDstream = wordDstream.mapWithState(
      StateSpec.function(mappingFunc).initialState(initialRDD))
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

三、分析

1、构建SparkConf

它是Spark应用程序的配置,用于设置Spark的各种参数。支持链式设置

new SparkConf().setMaster("local").setAppName("My app")

 一旦SparkConf对象传递给Spark,用户就不能再对其进行修改。Spark不支持在运行时修改配置

2、构建StreamingContext

它是Spark Streaming功能的主要入口点,且提供了从各种输入源创建[[org.apache.spark.streaming.dstream.DStream]] 的方法。

创建和转换DStreams后,可以分别使用start()、stop()启动和停止流计算,awaitTermination()允许当前线程通过stop()或异常等待上下文的终止。

3、设置checkpoint

StreamingContext最终还是通过SparkContext来设置checkpoint,但其实都是为各自的checkpointDir设置checkpoint路径,在有状态计算中checkpoint是必须的。

所谓有状态计算就必须要把历史状态给存储下来,spark中使用使用checkpoint来实现这个存储,每个微批的数据的计算都要更新到历史状态中。

class SparkContext(config: SparkConf) extends Logging {

  private[spark] var checkpointDir: Option[String] = None

}
class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {

  private[streaming] var checkpointDir: String = {
    if (isCheckpointPresent) {
      sc.setCheckpointDir(_cp.checkpointDir)
      _cp.checkpointDir
    } else {
      null
    }
  }

}

4、初始化一个RDD

为什么要初始化一个RDD呢?我们看看下面是如何用到的。

5、创建一个ReceiverInputDStream

这里是从TCP源hostname:port创建输入流。使用TCP套接字接收数据,并使用给定的转换器将接收字节解释为对象

6、处理单词

从源码中可以看出会把这样的文本

hadoop spark flink kafka hadoop spark-streaming

处理成这样的格式

hadoop 1

spark 1

flink 1

kafka 1

hadoop 1

spark-streaming 1

6、使用mapWithState更新累积计数

该算子可以维护并更新每个key的状态。

这里用到一个新对象:StateSpec,且用到了它的两个方法,initialState和function

initialState:设置包含“mapWithState”将使用的初始状态的RDD`

function:设置实际的状态更新操作

//第1个参数:状态 key 的类别
//第2个参数:状态 value 的类别
//第3个参数:状态 数据 的类别
//第4个参数:状态 处理完要返回 的类别
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {

  // 使用state.exists()、state.get()、state.update()和state.remove()来管理状态,并返回必要的字符串
}

四、运行

运行Netcat

nc -lk 9999

新建一个窗口运行官方例子

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272508.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

gesp(C++四级)(4)洛谷:B3851:[GESP202306 四级] 图像压缩

gesp(C四级)&#xff08;4&#xff09;洛谷&#xff1a;B3851&#xff1a;[GESP202306 四级] 图像压缩 题目描述 图像是由很多的像素点组成的。如果用 0 0 0 表示黑&#xff0c; 255 255 255 表示白&#xff0c; 0 0 0 和 255 255 255 之间的值代表不同程度的灰色&#xff0…

链地址法(哈希桶)

链地址法&#xff08;哈希桶&#xff09; 解决冲突的思路 开放定址法中所有的元素都放到哈希表⾥&#xff0c;链地址法中所有的数据不再直接存储在哈希表中&#xff0c;哈希表 中存储⼀个指针&#xff0c;没有数据映射这个位置时&#xff0c;这个指针为空&#xff0c;有多个数…

【通识安全】煤气中毒急救的处置

1.煤气中毒的主要症状与体征一氧化碳中毒&#xff0c;其中毒症状一般分为轻、中、重三种。 (1)轻度&#xff1a;仅有头晕、头痛、眼花、心慌、胸闷、恶心等症状。如迅速打开门窗&#xff0c;或将病人移出中毒环境&#xff0c;使之吸入新鲜空气和休息&#xff0c;给些热饮料&am…

Synthesia技术浅析(二):虚拟人物视频生成

Synthesia 的虚拟人物视频生成模块是其核心技术之一&#xff0c;能够将文本输入转换为带有同步语音和口型的虚拟人物视频。该模块如下所示&#xff1a; 1.文本输入处理 2.语音生成&#xff08;TTS, Text-to-Speech&#xff09; 3.口型同步&#xff08;Lip Syncing&#xff0…

【算法】算法初步

要学好数据结构和算法的设计与分析&#xff0c;请务必先打好C语言基础&#xff0c;因为C语言中的数据存储、内存映射、指针等等概念最接近计算机的底层原理&#xff0c;数据结构是数据在内存空间当中的组织形式&#xff0c;而算法则是提供了解决某个问题的一种思路&#xff0c;…

年会抽奖Html

在这里插入图片描述 <!-- <video id"backgroundMusic" src"file:///D:/background.mp3" loop autoplay></video> --> <divstyle"width: 290px; height: 580px; margin-left: 20px; margin-top: 20px; background: url(D:/nianhu…

LLM 实现Malleable 软件

All computer users may soon have the ability to author small bits of code. What structural changes does this imply for the production and distribution of software? 如果每个终端用户都能修改一部分代码&#xff0c; 这个将会对软件的生产和分发有何重大改变&#…

国产编辑器EverEdit - 两种删除空白行的方法

1 使用技巧&#xff1a;删除空白行 1.1 应用场景 用户在编辑文档时&#xff0c;可能会遇到很多空白行需要删除的情况&#xff0c;比如从网页上拷贝文字&#xff0c;可能就会存在大量的空白行要删除。 1.2 使用方法 1.2.1 方法1&#xff1a; 使用编辑主菜单 选择主菜单编辑 …

出租号平台网站系统源码/单合租用模式 提供用户提现功能

这是一款租号平台源码&#xff0c;采用常见的租号模式对接的易支付。目前网络上还很少见到此类类型的源码。 程序采用thinkphp6.0开发&#xff0c;前端采用layui 程序开发&#xff1a;PHPMySQL 程序演示&#xff1a;zh1.yetukeji.top, 账户 13112215717 &#xff0c;密码qq2…

C++:位与运算符

& 一&#xff0c;位与运算符的运算规则 有0则0。 二&#xff0c;判断奇偶性 %&#xff1a;优先级高&#xff0c;效率低 &&#xff1a;优先级低&#xff0c;效率高 数与1的位与运算结果为1则为奇数&#xff0c;结果为0则为偶数 三&#xff0c;获取一个数二进制的后…

第 31 章 - 源码篇 - Elasticsearch 写入流程深入分析

写入源码分析 接收与处理 请求首先会被 Netty4HttpServerTransport 接收&#xff0c;接着交由 RestController 进行路由分发。 private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {…

C语言----指针

目录 1.概念 2.格式 3.指针操作符 4.初始化 1. 将普通变量的地址赋值给指针变量 a. 将数组的首地址赋值给指针变量 b. 将指针变量里面保存的地址赋值给另一个指针变量 5.指针运算 5.1算术运算 5.2 关系运算 指针的大小 总结&#xff1a; 段错误 指针修饰 1. con…

Java高频面试之SE-09

hello啊&#xff0c;各位观众姥爷们&#xff01;&#xff01;&#xff01;本牛马baby今天又来了&#xff01;哈哈哈哈哈嗝&#x1f436; final关键字有什么作用&#xff1f; 在 Java 中&#xff0c;final 关键字有多个用途&#xff0c;它可以用于类、方法和变量。根据使用的上…

ChatGPT 主流模型GPT-4/GPT-4o mini的参数规模是多大?

微软论文又把 OpenAI 的机密泄露了&#xff1f;&#xff1f;在论文中明晃晃写着&#xff1a; o1-preview 约 300B&#xff1b;o1-mini 约 100BGPT-4o 约 200B&#xff1b;GPT-4o-mini 约 8BClaude 3.5 Sonnet 2024-10-22 版本约 175B微软自己的 Phi-3-7B&#xff0c;这个不用约…

某纪检工作委员会视频监控网络综合运维项目

随着某纪检工作委员会信息化建设的不断深入&#xff0c;网络基础设施的数量持续增加&#xff0c;对网络设备的运维管理提出了更为复杂和艰巨的要求。为了确保这些关键信息基础设施能够安全稳定地运行&#xff0c;该纪检工作委员会决定引入智能化运维管理系统&#xff0c;以科技…

显示器太薄怎么用屏幕挂灯?使用前先了解屏幕挂灯的最佳角度

人们对用眼健康的重视以及数字化办公和娱乐的普及&#xff0c;屏幕挂灯作为一种能够有效减少屏幕反光、保护眼睛的照明设备&#xff0c;受到了越来越多消费者的青睐。随着科技的进步&#xff0c;显示器设计日益轻薄&#xff0c;为我们的桌面节省了空间并带来了美观的视觉效果。…

HTTP/HTTPS ②-Cookie || Session || HTTP报头

这里是Themberfue 上篇文章介绍了HTTP报头的首行信息 本篇我们将更进一步讲解HTTP报头键值对的含义~~~ ❤️❤️❤️❤️ 报头Header ✨再上一篇的学习中&#xff0c;我们了解了HTTP的报头主要是通过键值对的结构存储和表达信息的&#xff1b;我们已经了解了首行的HTTP方法和UR…

excel快速计算周数的方法

业务中经常要通过周汇总计算&#xff0c;为方便后续汇总在源数据引入“周”列 公式&#xff1a; "W"&IF((ROW()1)/7<1,1,ROUNDUP((ROW()1)/7,0))函数释义&#xff1a; ①一周有7天&#xff0c;如果1月1号刚好是从周一开始&#xff0c;那么计算周数可以简单得…

redis各种数据类型介绍

Redis 是一种高性能的键值存储数据库&#xff0c;它支持多种数据类型&#xff0c;使得开发者可以灵活地存储和操作数据。以下是 Redis 支持的主要数据类型及其介绍&#xff1a; 1. 字符串&#xff08;String&#xff09; 字符串是 Redis 中最基本的数据类型&#xff0c;它可以存…

Python 模块,包(详解)

一. 引用变量 引用变量&#xff1a;值的传递通常可以分为两种方式&#xff0c;一种是值的传递&#xff0c;一种是引用地址传递&#xff0c;在Python中一般都是用引用地址传递 变量名和对象&#xff1a;变量名&#xff08;如 a&#xff09;和它指向的对象&#xff08;如整数 5&a…