Flink学习——状态编程

news2024/12/25 12:32:46

目录

一、Flink中的状态

二、状态编程

(一)ValueState案例——判断传感器的数据

1.代码实现

2.端口进行传输数据

3.运行结果

(二)ListState

(三)MapState案例——比较学生每次考试成绩

1.代码实现

2.端口传输学生成绩

3.运行结果

(四)ReducingState


一、Flink中的状态

        在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

        例如,监测温度的变化趋势的时候,如果现在的温度与上一秒的温度不一样,就说明处于不同的状态。

二、状态编程

(一)ValueState案例——判断传感器的数据

1.代码实现

import source.SensorReading
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object TransformTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.socketTextStream("ant168", 7777) //加载集合数据源*/

    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    val alterStream: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id)
      .flatMap(new ChangeAlert)
    alterStream.print()

    env.execute()
  }
}

class ChangeAlert extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  // TODO 定义状态对象,保存上一次的温度值
  // TODO "last-temp"是指给当前状态在运行程序的上下文中起名  后面的classOf[Double]是要指明last-temp的类型
  // TODO 关键字要改为lazy,等到使用的时候才创建对象
  lazy val lastState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))

  // TODO 判断机器是否为第一次开启
  lazy val firstState: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("first-start", classOf[Int]))

  // lastState中有value()——取数和update()——更新 两个方法
  override def flatMap(value: SensorReading, out: Collector[
    (String, Double, Double)]): Unit = {
    // 首先,机器开启,判断是否为第一次开启,如果不是第一次开启,就进行温度判断,如果是第一次开启,就默认温度为0.0
    val firstValue: Int = firstState.value()
    val lastValue: Double = lastState.value()
    val dif: Double = (lastValue - value.temperature).abs
    if (firstValue != 0) {
      // 取到上一次状态中的值
      // TODO 对比这一次的值与上一次的温度的差
      if (dif > 10)
      // 如果差值>10,就输出
        out.collect((value.id, lastValue, value.temperature))
    } else {
      // 如果机器第一次开启,就更新机器状态
      firstState.update(1)
      if (dif > 10)
        out.collect((value.id, lastValue, value.temperature))
    }
    // 每次新来一个温度值,原有的温度状态就要更新
    lastState.update(value.temperature)
  }
}

2.端口进行传输数据

3.运行结果

(二)ListState

class MyRichFunction extends RichFlatMapFunction[SensorReading, String] {
  lazy val listState: ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String]("liststate", classOf[String]))

  override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
    val strings: lang.Iterable[String] = listState.get()
    listState.add("hello")
    val ls = new util.ArrayList[String]()
    ls.add("html")
    ls.add("flink")
    listState.addAll(ls)
    listState.update(ls)
  }
}

(三)MapState案例——比较学生每次考试成绩

1.代码实现

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

import java.util.Map
import java.util

/**
 * 状态编程判断学生成绩
 */
object StateTest2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.socketTextStream("ant168", 7777)
    // TODO 对成绩进行清洗
    val dataStream: DataStream[Test] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      Test(arr(0).trim, arr(1).trim, arr(2).trim.toDouble)
    })

    val scoreStream: DataStream[(String, String, Double, Double)] = dataStream
.keyBy(data => (data.id, data.subject))
      .flatMap(new MyTest)

    scoreStream.print()
    env.execute("state test")
  }
}

// 定义一个样例类,表示测试结果
case class Test(id: String, subject: String, score: Double)

class MyTest extends RichFlatMapFunction[Test, (String, String, Double, Double)] {
  lazy val mapState: MapState[(String, String), Double] = getRuntimeContext
    .getMapState(new MapStateDescriptor[(String, String), Double]("map-temp", classOf[(String, String)], classOf[Double]))

  override def flatMap(value: Test, 
                       out: Collector[(String, String, Double, Double)]): Unit = {
    // 放入第一次的成绩
    val previousScore: Double = Option(mapState.get(value.id, value.subject)).getOrElse(0.0)
    mapState.put((value.id, value.subject), previousScore)
    // 获取第二次的成绩
    val iter: util.Iterator[Map.Entry[(String, String), Double]] = mapState.iterator()
    while (iter.hasNext) {
      val unit: Map.Entry[(String, String), Double] = iter.next()
      val key: (String, String) = unit.getKey// 第一次考试的id,subject
      val value1: Double = unit.getValue// 第一次开始的score
      val dif: Double = (previousScore - value.score).abs
      if (dif >= 10) {
        out.collect((key._1, key._2, previousScore, value.score))
      }
      mapState.put((key._1, key._2), value.score)
    }
  }
}

2.端口传输学生成绩

3.运行结果

(四)ReducingState

class MyRichFunction extends RichFlatMapFunction[SensorReading, String] {
  lazy val reducingState: ReducingState[SensorReading] = getRuntimeContext
.getReducingState(new ReducingStateDescriptor[SensorReading]("reducestate", 
new MyReduceFunction2, classOf[SensorReading]))

  override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
    val reading: SensorReading = reducingState.get()
    reducingState.add(reading)
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/544194.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

0Ω的电阻作用

0欧姆电阻即电阻标值为0欧姆的电阻,多用于PCB设计等方面,是一种理想电阻。那0欧姆电阻是表示没有电阻吗?当然不是,0欧姆电阻的阻值不是0欧姆,只是接近0欧姆。 1、调试方便或者兼容设计:可以选择器件、功能…

网络安全--XXE漏洞利用思路

一、XXE 是什么 介绍 XXE 之前,我先来说一下普通的 XML 注入,这个的利用面比较狭窄,如果有的话应该也是逻辑漏洞 如图所示: 既然能插入 XML 代码,那我们肯定不能善罢甘休,我们需要更多,于是出…

Python环境安装,操作MySQL数据脚本

安装Python 下载地址Python Releases for macOS | Python.org下载安装包点击安装执行python3命令查看安装及版本 安装插件 安装PyMySQL python3 -m pip install PyMySQL 编写脚本 创建文件selectMysql.py #!/usr/bin/python import pymysql.cursors def updateuser(user_…

chatgpt赋能Python-python3_4

Python3-4: 编程领域的瑰宝 简介 Python3-4 是一种开源解释型高级编程语言,具有简单易学、可读性强、语法简洁的特点。它由谷歌公司所开发,在全球范围内被广泛应用于Web开发、人工智能、科学计算、数据分析等领域。 优势 1. 语法简洁 Python3-4采用…

今天面试招了个20K的人,从腾讯出来的果然都有两把刷子···

现在找个会自动化测试的人真是难呀,10个里面有8个写了会自动化,但一问就是三不知 公司前段时间缺人,也面了不少测试,前面一开始瞄准的就是中级的水准,也没指望来大牛,提供的薪资在15-20k,面试的…

实施基于零信任网络安全的设备控制

零信任安全是一种数据保护策略,除非系统管理员进行彻底验证,否则网络边界内外的所有设备和实体都不受信任。Device Control Plus 可帮助管理员为其网络实施和自动化零信任安全协议,以确保对来自未经批准的外围设备的所有端点数据提供最佳保护…

嵌入式软件测试笔记1 | 简单说明 嵌入式系统认识和测试目标

1 | 简单说明 & 嵌入式系统认识和测试目标 1 为什么看这个?2 一些说明3 主要内容是什么?4 嵌入式系统测试的目标4.1 测试的任务4.2 最终目标4.3 测试过程4.4 通用元素 5 嵌入式系统的一些基础 1 为什么看这个? 一直在间断性的学习和了解…

chatgpt赋能Python-python3_6怎么调整字体大小

Python3.6 是一种广泛使用的编程语言,可以帮助人们创建各种各样的应用程序。不过,当我们在使用 Python3.6 编写程序时,有时会遇到一些困难,比如如何调整字体大小。那么,今天我们就来看看如何应对这个问题。 如何在 Py…

(十二)centos7案例实战——swap虚拟内存配置

前言 在实际生产环境中,我们的服务器由于内存配置资源有限,会遇到一些线上服务宕机或者内存溢出等问题,那么如何解决这些问题呢,一方面我们要确认问题的具体原因,通过排查自身应用服务的问题,一方面增加我…

Linux 之 supervisor安装和使用

Supervisor 官网 一、介绍 Supervisor有四个组件: 1. supervisord 运行Supervisor的后台服务,它用来启动和管理那些你需要Supervisor管理的子进程,响应客户端发来的请求,重启意外退出的子进程,将子进程的stdout和s…

Bean的自动装配

目录结构 导入pom.xml依赖包 <dependencies><!-- https://mvnrepository.com/artifact/org.springframework/spring-webmvc --><dependency><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId><vers…

工赋开发者社区 | Gartner发布2023年十大数据和分析趋势

来源&#xff1a;Gartner Gartner&#xff08;技术研究和分析机构&#xff09;近日公布了2023年十大数据和分析&#xff08;D&A&#xff09;趋势&#xff0c;可帮助企业领导者通过预测变化将不确定性转化为新的商机。 正文开始 Gartner在Gartner数据与分析峰会上介绍了企…

诞生两年,这个产品便成为腾讯安全的“秘密武器”

腾讯既是企业服务产品的服务商又是使用者&#xff0c;很多产品最原始的出发点最早只是为了解决腾讯自身某一个需求&#xff0c;经过不断地发展完善和业务场景锤炼&#xff0c;最终从进化成一个成熟的企服产品。本系列文章讲述的是这样一组Made in Tencent故事&#xff0c;这是系…

2023年 - 我们遇到的那些面试题 (1) - 面试技巧和功能测试篇

前言 最近收到了很多粉丝反馈的面试题 。。 有主观题 &#xff0c;有功能测试题&#xff0c;有python编程题 &#xff0c;有自动化测试题&#xff0c;有数据库题&#xff0c;linux等。。 本文作为《测试面试宝典》内容&#xff0c;将面试题以及部分参考答案开放出来。。 1、…

只要做好这布成为测试经理不是梦!

之前说了太多的测试技术和测试用例设计方法&#xff0c;猛地发现有点“偏科“了。今天我们放松一些&#xff0c;泡一杯茶&#xff0c;一起来聊一聊测试策略吧。 当然&#xff0c;文章脉络肯定是咱们老三样&#xff1a;什么是测试策略&#xff0c;为什么要制定测试策略&#xf…

3年测试经验,跳进腾讯,3面终获20K的Offer...

前言 时间过得飞快&#xff0c;一代又一代就这么成长了起来&#xff0c;曾经的95后备受争议&#xff0c;如今的95后进入社会&#xff0c;扮演者各行角色&#xff0c;成为了行业顶梁柱&#xff0c;今天&#xff0c;要分享的是自己的成长经历。今年24岁&#xff0c;毕业之后进入…

5th-Generation Mobile Communication Technology(六)

目录 一、5G/NR 1、 快速参考&#xff08;Quick Reference&#xff09; 2、5G Success 3、5G Challenges 4、Qualcomm Videos 二、PHY and Protocol 1、Frame Structure 2、Numerology 3、Waveform 4、Frequency Band 5、BWP 6、Synchronization 7、Beam Management 8、CSI Fra…

UE4 监听游戏窗口最小化事件

UE4 监听游戏窗口最小化事件 结论&#xff1a; 先说结论&#xff1a;Windows相关事件在UE4中引擎部分也会处理&#xff0c;包括窗口创建&#xff0c;销毁&#xff0c;最大化最小化&#xff0c;窗口尺寸改变等。通常&#xff0c;每个事件与Windows一样都是WM_***的样子表示&…

静态程序分析学习心得 tai-e

0x00 前言 经过将近2个月的时间&#xff0c;看完了b站上南大的静态程序分析课程&#xff0c;并且完成了其oj上的作业&#xff0c;在这里记录一下在做题过程中&#xff0c;遇到的一些坑点&#xff0c;文章不会贴源码&#xff0c;只记录一下思路&#xff0c;因此大家可以放心阅读…

aac怎么转化为mp3?4个超简易转换方法推荐给大家!

aac怎么转化为mp3&#xff1f;音乐是人类灵魂的表达&#xff0c;只有懂得欣赏音乐的人才能领略到生活的美好与价值。除了运动、看电影等&#xff0c;聆听音乐也是小伙伴们调剂生活的一个好方式。很多小伙伴都有在网上下载音乐的习惯&#xff0c;通过我们也都知道音乐的格式种类…