Flink第四章:水位线和窗口

news2025/1/10 10:24:24

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口


文章目录

  • 系列文章目录
  • 前言
  • 一、水位线
  • 二、窗口
  • 二、实际案例
    • 1.自定义聚合函数
    • 2.全窗口函数
    • 3.水位线+窗口
    • 4.统计用户点击数据
    • 5.处理迟到数据
  • 总结


前言

这次博客记录一下Flink框架中的窗口和水位线.
创建以下scala文件
在这里插入图片描述


一、水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。

水位线共有三种,以下代表做了三种水位线的创立示范
WatermarkTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._

import java.time.Duration

object WatermarkTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(500L)

    val stream: DataStream[Event] = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L),
    )

    //1. 有序流的水位线生成策略
    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]()
    .withTimestampAssigner(
      new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
      }
    ))

    //2. 无序流的水位线生成策略
    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      ))


    //3. 自定义水位线生成策略
    stream.assignTimestampsAndWatermarks( new WatermarkStrategy[Event] {
      override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Event] = {
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      }


      override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Event] = {
        new WatermarkGenerator[Event] {
          // 定义一个延迟时间
          val delay=5000L
          //定义属性保存最大时间戳
          var maxTs: Long =Long.MinValue+delay+1



          override def onEvent(t: Event, l: Long, watermarkOutput: WatermarkOutput): Unit = {
            maxTs=math.max(maxTs,t.timestamp)
          }

          override def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit = {
            val watermark = new Watermark(maxTs-delay-1)
            watermarkOutput.emitWatermark(watermark)
          }
        }
      }
    })
  }
}

二、窗口

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想
要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这
就是所谓的“窗口”(Window)。在 Flink 中, 窗口就是用来处理无界流的核心。

这里分别是四种常用的窗口类型,以及一个简单的实现
WindowTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }))

    //stream.keyBy(_.user)
   //   .window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(10))) //基于事件时间的滚动窗口
  //    .window(TumblingProcessingTimeWindows.of(Time.days(1),Time.hours(-8)) ) //基于处理时间的滚动窗口
     // .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)) ) //基于事件时间的滑动窗口
     // .window(EventTimeSessionWindows.withGap(Time.seconds(10)))  //基于事件时间的会话窗口
    //  .countWindow(10) //滚动计数窗口

    stream.map(data=>(data.user,1))
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .reduce((stats,data)=>(data._1,stats._2+data._2))
      .print()

    env.execute()
  }
}

在这里插入图片描述
案例是最简单的点击次数统计,每1秒发送一次数据,5秒进行统计一次,所以点击数相加都是5.

二、实际案例

1.自定义聚合函数

AggregateFunctionTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object AggregateFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    // 统计pv和uv,输出pv/uv
    stream.keyBy(data=>true)
      .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
      .aggregate( new PvUv)
      .print()

    env.execute()
  }

  //实现自定义函数,用一个二元组(Long,Set)表示聚合的(pv,uv)状态
  class PvUv extends AggregateFunction[Event,(Long,Set[String]),Double]{
    override def createAccumulator(): (Long, Set[String]) = (0L,Set[String]())

    //每来一条数据,调用一次
    override def add(in: Event, acc: (Long, Set[String])): (Long, Set[String]) = (acc._1+1,acc._2+in.user)

    //返回最终计算结果
    override def getResult(acc: (Long, Set[String])): Double = acc._1.toDouble/acc._2.size

    override def merge(acc: (Long, Set[String]), acc1: (Long, Set[String])): (Long, Set[String]) = ???
  }
}

在这里插入图片描述

2.全窗口函数

FullWindowFunctionTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object FullWindowFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    //测试全窗口函数,统计UV
    stream.keyBy(data=>"key")
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new UvCountByWindows)
      .print()

    env.execute()

    class UvCountByWindows extends ProcessWindowFunction[Event,String,String,TimeWindow] {
      override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
        //使用一个Set进行去重操作
        var userSet: Set[String] = Set[String]()

        // 从elements中提取所有数据,一次放入set中去重
        elements.foreach( userSet+=_.user)
        val uv: Int = userSet.size
        //提取窗口信息包装String进行输出
        val windeEnd: Long = context.window.getEnd
        val windowStart: Long = context.window.getStart

        out.collect(s"窗口 $windowStart - $windeEnd 的uv值为:$uv")
      }
    }
  }
}

3.水位线+窗口

WatermarkWindowTest.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.time.Duration

object WatermarkWindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(500L)

    val stream: DataStream[Event] = env.socketTextStream("127.0.0.1", 7777)
      .map(data => {
        val fields: Array[String] = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })

    stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      )).keyBy(_.user)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(  new WatermarkWindowsResult)
      .print()

    env.execute()

  }


  class WatermarkWindowsResult extends ProcessWindowFunction[Event,String,String,TimeWindow] {
    override def process(user: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
      val start: Long = context.window.getStart
      val end: Long = context.window.getEnd
      val count: Int = elements.size

      //增加水位线信息
      val currentWatermark: Long = context.currentWatermark
      out.collect(s"窗口 $start - $end ,用户 $user 的活跃度 $count,水位线现在位于:$currentWatermark")
    }
  }
}

因为咱们给窗口设置的时间是10秒,等待时间是5秒,所以需要时间戳达到15秒,窗口才会处理数据.
在这里插入图片描述
因为窗口时间是[0,10000),所以他只统计了1000和3000两个数据.
在这里插入图片描述

4.统计用户点击数据

UrlViewCountExample.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

// 定义统计输出的结果数据结果

case class UrlViewCount(url:String,count:Long,windowStart:Long,windowEnd:Long)

object UrlViewCountExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.addSource(new ClickSource)
      .assignAscendingTimestamps(_.timestamp)

    // 结合使用增量聚合函数和全窗口函数,包装统计信息

    stream.keyBy(_.url)
      .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
      .aggregate(new UrlViewCountAgg,new UrlViewCountResult)
      .print()

    env.execute()
  }
  //实现增量聚合函数,来一个数据就加1
  class UrlViewCountAgg extends AggregateFunction[Event,Long,Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: Event, acc: Long): Long = acc+1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = ???
  }

  //实现全窗口函数
  class UrlViewCountResult extends ProcessWindowFunction[Long,UrlViewCount,String,TimeWindow] {
    override def process(url: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      // 提取需要的数据
      val count: Long = elements.iterator.next()
      val start: Long = context.window.getStart
      val end: Long = context.window.getEnd

      //输出数据
      out.collect(UrlViewCount(url = url, count = count, windowStart = start, windowEnd = end))
    }
  }
}

在这里插入图片描述

5.处理迟到数据

ProcessLateExample.scala

package com.atguigu.chapter03

import com.atguigu.chapter02.Source.Event
import com.atguigu.chapter03.UrlViewCountExample.{UrlViewCountAgg, UrlViewCountResult}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

import java.time.Duration

object ProcessLateExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[Event] = env.socketTextStream("127.0.0.1", 7777)
      .map(data => {
        val fields: Array[String] = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
      .withTimestampAssigner(
        new SerializableTimestampAssigner[Event] {
          override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
        }
      ))

    //定义一个测输出流标签
    val outputTag: OutputTag[Event] = OutputTag[Event]("late-data")

    val result: DataStream[UrlViewCount] = stream.keyBy(_.url)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      //指定窗口允许等待的实践
      .allowedLateness(Time.minutes(1))
      //将迟到数据输入到侧数据窗口
      .sideOutputLateData(outputTag)
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)


    result.print("result")

    stream.print("input")

    result.getSideOutput(outputTag).print("late data")

    env.execute()

  }
}

在这里插入图片描述
可以看到当数据的时间戳达到15毫秒是 0-10秒的窗口才开始统计.我们继续添加数据.
在这里插入图片描述
我们继续添加0-10秒内的数据,窗口还是会继续计算,但是窗口最终还是会关闭,我们设置的等待时间是1分钟,所以我们将水位线推进到70秒.
在这里插入图片描述
可以看到,我们以及触发了第二个窗口计算,现在我们向关闭的0-10秒数据窗口发送数据.
在这里插入图片描述
可以看到窗口依然打开这,可以进行计算,这是因为,我们为水位线这设置了5秒的延迟,所以水位线现在到了65秒,我们发送75秒的数据,将水位线推到70秒.
在这里插入图片描述
可以看到我们将水位线推到了70秒,窗口关闭,依旧可以捕捉到迟到数据,但是无法触发窗口的计算.后续迟到结果需要我们手动加入结果中.


总结

以上就是Flink中有关窗口和水位线的操作.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541861.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT、GPT4、AutoGPT 和 MemoryGPT:初学者指南

人工智能 (AI) 不仅在改变行业,也在改变我们的日常生活。借助人工智能,我们可以改善我们的组织和生产力,让我们能够专注于真正重要的事情。在本文中,我们将探讨一些适用于日常生活的 AI 工具,以及它们如何帮助您保持井…

【Spring框架】--02.容器IoC、原理(手写IoC)

文章目录 3.容器:IoC3.1 IoC容器3.1.1 控制反转(IoC)3.1.2 依赖注入3.1.3 IoC容器在Spring的实现 3.2 基于XML管理Bean3.2.1搭建子模块spring6-ioc-xml3.2.2 获取bean①方式一:根据id获取②方式二:根据类型获取③方式三…

【Jmeter第三章】Jmeter给请求添加请求头

给请求加上请求头最常见的场景就是在请求头上添加token了,这里也拿添加token来举例 1、添加某个请求的请求头 1、选中HTTP请求,右键添加 2、添加请求头 2、添加公共的请求头信息 其实步骤和上面是一样的,只不过是选择:线程组…

极客的git常用命令手册

极客的git常用命令手册 1.1 权限配置篇1.1.1 创建ssh key1.1.2 本地存在多个密钥时,如何根据目标平台自动选择用于认证的密钥? 1.2 基础信息配置篇1.2.1 配置用户名1.2.2 配置用户邮箱1.2.3 设置文件名大小写区分1.2.4 设置命令行显示颜色1.2.5 检查git全…

MySQL高级_第11章_数据库的设计规范

MySQL高级_第11章_数据库的设计规范 1. 为什么需要数据库设计 2. 范 式 2.1 范式简介 在关系型数据库中,关于数据表设计的基本原则、规则就称为范式。 可以理解为,一张数据表的设计结构需要满足的某种设计标准的 级别 。要想设计一个结构合理的关…

如何用postman进行http接口测试?好好看好好学

目录 优点: 1、什么是POSTMAN 2、新建一个项目 2、新增一个用例 3、添加请求信息 4、post请求参数 5、添加头信息 6、预处理和结果检查 7、全局变量与环境变量 8、导出用例为代码 9、批量执行用例 HTTP的接口测试工具有很多,可以进行http请求…

打死都千万不要进外包...

我18年毕业于一个普通二本学校,是一个很不出名的小本科。大学期间专业知识也没有去认真的学习,所以毕业的时候就随便找了一份工作,在一个小公司做功能测试。 记得那时候薪资大概是6k左右,因为是在工厂,工作环境很差&a…

终极猜想 |欧科云链研究院揭秘货币未来形态

前言 5月18日,澎湃科技联合欧科云链研究院重磅发布的《从Web3“去美元化”看货币未来形态的终极猜想》文章,通过分析Web3.0“去美元化”的两大路径,对货币未来形态进行了前沿性的猜想。其中,Web3行业盛会Consensus2023&#xff0…

补充点【机器学习部分】

0518机器学习 身高和体重为特征,和标签训练阔以得到一个式子。 1.有监督学习: 2.训练集和测试集 3.模型学习 4.模型评估 5.基本术语 数据集:记录数据的集合 样本:每条记录关于一个事件或者对象的描述 特征:反映对…

软件安全-课后练习-格式化字符串-fmtstr2-随笔

一、准备工作 题目-百度网盘 1. 2.代码审计 只要输入不是yes或者no,就会调用到存在格式化字符串漏洞的printf函数 如果authenticated 的值为1195526213(十六进制:0x47424845),就可以得到flag。 二、思路&#xf…

10. Redis哨兵(sentinel)

10. Redis哨兵sentinel 是什么?能干嘛怎么玩(实战演示:)Redis Sentinel架构,前提说明案例步骤,不服就干重点参数项说明其他 本次案例哨兵sentinel文件通用配置sentinel26379.confsentinel26380.confsentin…

Java并发常见面试题

Java并发常见面试题总结 1、什么是线程和进程? 何为进程? 进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行程序,是一个进程从创建、运行到消亡的过程。 在Java中,当我们…

Java 的八大基本类型及其包装类型(超级详细)

Java 中有八种内置的基本数据类型,他们分别是 byte、short、int、long、float、double、char 和 boolean,其中,byte、short、int 和 long 都是用来表示整数,float 和 double 是用来表示浮点数的,那它们之间有什么区别和…

【C++】哈希/散列详细解析

前言:上篇文章介绍了unordered_set和unordered_map序列关联式容器,它们之所以效率比较高,是因为其底层使用了哈希结构。,所以这篇文章我们就来详细讲解一下哈希表。有关unordered序列关联式容器的知识,请移步至这篇文章…

单片机--USART

目录 【2】USART 【3】串口通信协议 【4】相关寄存器 串口控制寄存器 波特率寄存器 中断和状态寄存器 ​编辑 数据发送寄存器 数据接收寄存器 【5】 USART功能框图 【6】串口发送实验 实验要求 1.观察实物 2.分析原理图 3.STM32CubeMX配置 7、不定长接收 8、重定向 【1】…

2022 CCPC-final 总结

赛前 去年 CCPC-final 拿了银牌第二。赛后,我选择退役,另一位队友 George_Plover 选择继续。 今年他队友 Kieray 去组女队了,于是邀请我替补参赛。 赛前一个月,约定好每周末组队训一场(在 cf 和 qoj 上&#xff0…

Spring Boot集成Swagger2

文章目录 1.什么是Swagger22.SpringBoot集成Swagger23.Swagger2配置管理(1)对Swagger2信息进行更改(2)swagger配置扫描接口(3)配置api文档分组(分组无非就是多个Docket)(4)实体类的配置 面试题:如果我们希望Swagger在某一个环境中使用&#x…

自学黑客(网络安全),看完这篇,再去追你的黑客梦!

今天专题是替一些想入门网络安全,但还迷茫不知所措的同学解一解惑。想30天零基础入门网络安全,这些你一定要搞清楚。 一、学习网络安全容易造成的误区 1、把编程当作目的,忽略了它的工具职能 千万不要抱着“以编程为目的,再开始…

C++(2):变量和基本类型

基本内置类型 C定义了一套包括算术类型(arithmetic type)和空类型(void)在内的基本数据类型。其中算术类型包含了字符、整型数、布尔值和浮点数。空类型不对应具体的值。 算数类型 算数类型分为两类:整型&#xff0…

Cesium教程(二):Cesium默认控件详解

Cesium初始界面在默认情况下,附带了一些有用的小控件,如下图所示,可以执行一些基本的功能。 1、①Geocoder Geocoder是一种定位搜索工具,它可以定位到查询位置。默认使用微软的Bing地图,若更换其他底图可能出现查找不到…