【Flink-scala】DataStream编程模型之状态编程

news2024/12/18 6:55:30

DataStream编程模型之状态编程

参考:
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器
4.【Flink-scala】DataStream编程模型之水位线
5.【Flink-scala】DataStream编程模型之延迟数据处理


文章目录

  • DataStream编程模型之状态编程
  • 前言
  • 一、状态编程相关概念
    • 1.1Flink中状态始终与特定算子相关联
    • 1.2 演示代码
    • 1.3 状态编程程序输入输出


前言

流计算分为无状态和有状态两种,无状态是观察每个独立事件,根据最后一个事件输出结果。比如传感器只关注当前的水位量,超出水位量就发生报警事件。
有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值,那就是状态的计算。

一、状态编程相关概念

流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态计算。

在传统的批处理中,数据的划分为块分片去完成的,每个task处理一个分片,执行完成后,把结果聚合起来就是最终的结果,这个过程中,对状态的需求还是较少的。

但对于流计算而言,它对状态有着非常高的要求,因为在流系统中,输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要把状态数据很好地管理起来

1.1Flink中状态始终与特定算子相关联

分为算子状态和键控状态
在这里插入图片描述
算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。

算子状态不能由相同或不同算子的另一个任务访问

键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同状态

在这里插入图片描述

1.2 演示代码

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
 
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

object StateTest {
  def main(args: Array[String]): Unit = {
    //设定执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment   
//设定程序并行度
    env.setParallelism(1) 
    //创建数据源
    val source = env.socketTextStream("localhost", 9999) 
    //指定针对数据流的转换操作逻辑
    val stockDataStream = source
      .map(s => s.split(","))
      .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))
    val alertStream = stockDataStream
      .keyBy(_.stockId)
      .flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类  这里重新了flatmap方法
     // 打印输出
    alertStream.print() 
    //触发程序执行
    env.execute("state test")
  }

class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{
    //定义状态保存上一次的价格
    lazy val lastPriceState: ValueState[Double] = getRuntimeContext
      .getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))
    override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {
      // 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较
      val diff = (value.price-lastPrice).abs
      if( diff > threshold)
        out.collect((value.stockId,lastPrice,value.price))
      //更新状态
      lastPriceState.update(value.price)
    }
  }
}

代码分析:
1.传入参数,阈值
2.继承里接受一个stockPrice类型的输入,一个(String,Double,Double)三元组的输出。

String,Double,Double
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

有什么不同呢,两个double代表了两个价格:分别代表股票ID、上次价格、当前价格。

3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化
4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState,如果状态不存在,它将根据提供的 ValueStateDescriptor 创建一个新的状态。

ValueStateDescriptor 包含了状态的名称(代码中是 “last-price”)和状态的值的类型(这个代码中是 Double)。
5. classOf[Double] 提供了状态的值的类型信息。
6. 重写的flatmap应该能看懂,主要是当当前价格超出阈值(代码中是10),就打印。

1.3 状态编程程序输入输出

输入:

stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7

输出:

(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)

其中根据stock_id分类。

初始状态:所有stockId的最近价格都是未定义的(即null或None,在代码中表现为Double的默认值0.0,因为ValueState在初始化时未设置值)。

处理第一条记录:stock_4,1602031562148,43.4。由于没有先前的价格,不会触发输出。最近价格更新为43.4。
处理第二条记录:stock_1,1602036130952,39.7。同样,没有先前的价格,不会触发输出。最近价格更新为39.7。
处理第三条记录:stock_4,1602036131741,59.9。价格从43.4变为59.9,差异为16.5,超过阈值10,因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。
后续记录:对于stock_2、stock_3、stock_0,由于没有先前的价格,30.1 和79.8直接列出,
但是9.9这个价格要注意
stock_0,默认值为0,这里变为9.9,没有超出阈值10,那么输出就没有。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261457.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Gitlab服务管理和仓库项目权限管理

Gitlab服务管理 gitlab-ctl start # 启动所有 gitlab 组件; gitlab-ctl stop # 停止所有 gitlab 组件; gitlab-ctl restart # 重启所有 gitlab 组件; gitlab-ctl status …

SCAU期末笔记 - Linux系统应用与开发教程样卷解析(2024版)

我真的不理解奥,为什么会有给样卷不自带解析的,对答案都没得对,故整理一篇 样卷1 一、选择题 1、为了遍历shell脚本调用时传入的参数,需要在shell脚本中使用_____。 A.$#表示参数的个数B.S表示所有参数C.$0表示脚本名D.$1表示…

学习threejs,区域光THREE.AreaLight效果

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.AreaLight 区域光 二…

RabbitMQ消息队列的笔记

Rabbit与Java相结合 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> 在配置文件中编写关于rabbitmq的配置 rabbitmq:host: 192.168.190.132 /…

VSCode,Anaconda,JupyterNotebook

文章目录 一. 下载VSCode并安装二. 下载Anaconda并安装1. anaconda介绍2. Anaconda的包管理功能3. Anaconda的虚拟环境管理4.Jupyter Notebook5. Jupyter Notebook使用简介6. Jupyter Notebook快捷键7.Jupyter notebook的功能扩展8. Jupyter notebook和Jupyter lab的区别 三. V…

动态导出word文件支持转pdf

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、功能说明二、使用步骤1.controller2.工具类 DocumentUtil 导出样式 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 例如&#xff…

那些不属性的C语言关键字-const

大家都知道const修饰的变量是不可变的&#xff0c;但是到底是怎么实现的那&#xff0c;有方法修改只读变量的值吗&#xff0c;今天我们结合实验代码&#xff0c;分析下const关键字的实现原理 const变量 1.const修饰局部变量 int main(){const int abc 123;printf("%d\…

【Java 数据结构】List -> 给我一个接口!!!

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 什么是 List 2. List 常用的方法 3. List 的使用 1. 什么是 List 其实 List 是一个接口&#xff0c;它继承了 Collection 接口 下列为 List 接口中的各种…

【5G】5G的主要架构选项

最初&#xff0c;在3GPP讨论中考虑了所有可能的聚合和核心网络组合&#xff0c;共有八个架构选项。以下重点介绍option2、3、4和7。 1. 独立组网 (Standalone, SA) 架构选项 2 &#xff1a;Standalone architecture with 5G-core 特点&#xff1a; 5G核心网&#xff08;5GC, …

Ajax简单理解

Ajax 1 什么是ajax AJAXAsynchronous JavaScript and XML (异步的JavaScript和XML)AJAX不是新的编程语言&#xff0c;二十一种使用现有标准的新方法 AJAX 最大的优点是在不重新加载整个页面的情况下&#xff0c;可以与服务器交换数据并更新部分网页内容。 AJAX 不需要任何浏…

【GESP】C++二级考试大纲知识点梳理, (2)计算机网络的基本概念及分类

GESP C二级官方考试大纲中&#xff0c;共有9条考点&#xff0c;本文针对C&#xff08;2&#xff09;号知识点进行总结梳理。 &#xff08;2&#xff09;了解计算机网络的概念&#xff0c;了解计算机网络的分类&#xff08;广域网&#xff08;WAN&#xff09;、城域网&#xff0…

相机与NAS的奇妙组合,如何使用相机拍照自动上传或备份到NAS

相机与NAS的奇妙组合&#xff0c;如何使用相机拍照自动上传或备份到NAS 哈喽小伙伴们好&#xff0c;我是Stark-C~ 对于喜欢使用专业器材拍照摄影的小伙伴来说&#xff0c;想要将相机存储卡中的照片或视频导出到电脑上&#xff0c;要么是使用数据线直接和相机连接&#xff0c;…

window下的qt5.14.2配置vs2022

这里做一个笔记&#xff0c;已知qt5.14.2和vs2022不兼容&#xff0c;无法自动扫描到vs的编译器。但由于团队协作原因&#xff0c;必须使用qt5.14.2&#xff0c;并且第三方库又依赖vs2022。其实qt5.15.2是支持vs2022的&#xff0c;如果能够用qt5.15.2&#xff0c;还是建议使用qt…

QT从入门到精通(一)——Qlabel介绍与使用

1. QT介绍——代码测试 Qt 是一个跨平台的应用程序开发框架&#xff0c;广泛用于开发图形用户界面&#xff08;GUI&#xff09;应用程序&#xff0c;也支持非图形应用程序的开发。Qt 提供了一套工具和库&#xff0c;使得开发者能够高效地构建高性能、可移植的应用程序。以下是…

【协作笔记Trilium Notes Docker部署】开源协作笔记Trilium Notes本地Docker部署远程协作

文章目录 前言1. 安装docker与docker-compose2. 启动容器运行镜像3. 本地访问测试4.安装内网穿透5. 创建公网地址6. 创建固定公网地址 前言 今天分享一款在G站获得了26K的强大的开源在线协作笔记软件&#xff0c;Trilium Notes的中文版如何在Linux环境使用docker本地部署&…

app的测试范围以及web和app的测试区别

目录 图1.App的测试范围1.1功能测试1.2专项测试1.3性能测试 2.Web和App的测试区别2.1相同点2.2不同点 &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&#xff01; ⭐️ 收藏&#xff0c;你的青睐是我努力的方向&#xff01; ✏️ 评论&#xff0c;你的意见是我进步的…

数据分析实战—鸢尾花数据分类

1.实战内容 (1) 加载鸢尾花数据集(iris.txt)并存到iris_df中,使用seaborn.lmplot寻找class&#xff08;种类&#xff09;项中的异常值&#xff0c;其他异常值也同时处理 。 import pandas as pd from sklearn.datasets import load_iris pd.set_option(display.max_columns, N…

docker 拉取镜像 | 创建容器 | 容器运行

拉取镜像 拉取镜像的命令&#xff1a;docker pull name &#xff08;name换为你要拉取的镜像名&#xff09; docker pull docker.1panel.live/hispark/qiankunbp:1.0.0 docker.1panel.live/hispark/qiankunbp:1.0.0为镜像名 拉取海思的镜像&#xff1a;&#xff08;如果之前拉…

添加标签(vue3)

点击添加按钮&#xff1a; 最多添加5个 注意&#xff1a; 不只可以el-form 进行校验&#xff0c;也可以对单个el-form-item 进行校验 vue elementUI form组件动态添加el-form-item并且动态添加rules必填项校验方法-CSDN博客 el-form 里边有el-form-item &#xff0c;el-fo…

Dash for Mac 代码API文档管理软件安装

Mac分享吧 文章目录 Dash for Mac 代码API文档管理软件 效果图展示一、Dash 代码API文档管理软件 Mac电脑版——v7.3.31️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件2.1 左侧安装包拖入右侧文件夹中&#xff0c;等待安装完成&#xff0c;运行软件2.2 打开软件&#xff…