【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

news2025/2/22 8:52:36

DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

1. 窗口的划分

1.1 窗口分为:基于时间的窗口 和 基于数量的窗口

基于时间的窗口:基于起始时间戳终止时间戳来决定窗口的大小
基于数量的窗口:根据固定的数量定义窗口 的大小

这里我看到的都是只有基于时间的窗口做的划分,没有数量的,发现运用数量窗口划分也比较少,因此很多地方都省略了。

Count Window 也有滚动窗口、滑动窗口等,可以借鉴Flink实战之CountWindowr的滚动窗口、滑动窗口WindowsAPI使用示例
接下来就开始介绍基于时间的窗口

1.2 在Flink中窗口的设定和数据本身无关,而是系统事先定义好的

窗口是Flink划分数据一个基本单位,窗口的划分方式是默认会根据自然时间划分,并且划分方式是前闭后开。
如图:窗口的划分,左闭右开
左闭右开

2.时间概念

流数据中,数据具有时间属性。Flink根据时间的产生时间把时间划分为3中类型:1.事件生成时间(Event time)2.事件接入时间 (Ingestion Time)3.事件处理时间 (Processing Time)
可以借鉴下图理解:
在这里插入图片描述

1.基站产生数据,分区传入Flink数据源
2.传入数据的时间 IngstionTime
3.划分窗口进行处理时间 ProcessingTime

2.1 事件生成时间

是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink前就已经进入到事件当中了,也就是说,事件时间是从原始的消息中提取到的。比如 Kafka消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间.

2.2事件接入时间

是数据进入Flink系统的时间,它主要依赖于其数据源算子所在主机的系统时钟。理论上,接入时间处于事件时间和处理时间之间。接入时间不能处理乱序问题或者延迟数据。如果需要处理此类问题,建议使用事件时间

2.3 事件处理时间

是指数据在操作算子计算过程中获取到的所在主机时间,这个时间是由Flink系统自己提供的。这种处理时间方式实时性是最好的,但计算结果未必准确,主要用于时间计算精度要求不是特别高的计算场景,比如延时比较高的日志数据

2.4事件时间和处理时间区别

这里的时间方便了解,比如事件时间,一个在米国产生的时间,一个在中国产生的时间,这两个有时差 不一样,但是数据世界是一样的,应该是从1970计算的时间戳。
在这里插入图片描述
在Flink初始化流式运行环境时,会设置流处理的时间特性

//设置执行环境 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
//把时间特性设置为“事件时间” 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 


//或者,把时间特性设置为“处理时间” 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

3.窗口计算

3.1 窗口计算的程序结构

3.1.1.分组

分为:分组窗口和非分组窗口
看一组图:
左侧为分组窗口,右侧不分组。
分组窗口

3.2窗口的计算过程

如图
在这里插入图片描述

在窗口计算时,需要将数据流先分组 keyby,再分窗口。那么在写程序的时候也是先进行这两步,当窗口程序计算完成后(ruduce,aggregate,process等),又变为DataStream。

分组数据流程序结构如下:

dataStream.keyBy(...)         //是分组数据流
       .window(...)          //指定窗口分配器类型
      [.trigger(...)]           //指定触发器类型(可选)
      [.evictor(...)]           //指定驱逐器或者不指定(可选)
      [.allowedLateness()]    //指定是否延迟处理数据(可选)
       .reduce/fold/apply()   //指定窗口计算函数

非分组数据流程序结构如下:

dataStream.windowAll(...)      //指定窗口分配器类型
      [.trigger(...)]           //指定触发器类型(可选)
      [.evictor(...)]           //指定驱逐器或者不指定(可选)
      [.allowedLateness()]     //指定是否延迟处理数据(可选)
       .reduce/fold/apply()    //指定窗口计算函数

不分组理解为所有数据为一个窗口。 windowAll(…)

3.2 窗口分配器

窗口分配器是负责将每一个到来的元素分配给一个或者多个窗口。
Flink提供预定义窗口分配器
在这里插入图片描述
窗口分配器在程序就一行可以了。

这几个窗口可以理解为火车站/汽车站/飞机场的屏幕,假设有25条消息,一页显示10条消息。
滚动:一页显示10条消息,滚动一下,下一页 11-20,滚动 21-25.
滑动:这里涉及一次滑动步长(假设为1),1-10 滑动 ,2-11滑动,3-12 …

3.2.1滚动窗口

滚动窗口是根据固定时间或大小对数据流进行切分,且窗口和窗口之间的元素不会重叠
在这里插入图片描述
DataStream API提供了两种滚动窗口类型,
即基于事件时间的滚动窗口(TumblingEventTimeWindows)和

基于处理时间的滚动窗口(TumblingProcessingTimeWindows),

二者对应的窗口分配器分别为TumblingEventTimeWindows
和TumblingProcessingTimeWindows。

窗口的长度

窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time中的
seconds、minutes、hours和days来设置。

3.2.1.1 滚动窗口的实例

1.事件 时间 滚动 ,窗口大小5秒
关键词:TumblingEventTimeWindows

val dataStream: DataStream[T] = ...
 
//基于事件时间的滚动窗口,窗口大小为5秒钟
dataStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

2.基于处理时间的滚动
关键词:TumblingProcessingTimeWindows

//基于处理时间的滚动窗口,窗口大小为5秒钟
dataStream
    .keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

3.事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
偏移量调整窗口开始时间的数字。比如就会从整点的15分,30分,45分,00分开始,允许数据进行移位,用于时效性不强的数据。

//基于事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
dataStream
   .keyBy(...)
   .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
   .<window function>(...)

还可以使用快捷方法 timeWindow() 来定义TumblingEventTimeWindowsTumblingProcessingTimeWindows,举例如下:

dataStream
    .keyBy(...)
    .timeWindow(Time.seconds(1))
    .<window function>(...)

如果使用的是timewindow,那么就没说明是-事件时间-还是-处理时间。窗口类型就要根据程序中设置的TimeCharacteristic的值来决定。

当我们在程序中设置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)时,Flink会创建TumblingEventTimeWindows

当设置了env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)时,Flink会创建TumblingProcessingTimeWindows
认识英文单词就能很好的理解啦。

3.2.2滑动窗口

滑动窗口有重叠。(就是大屏幕的一个个向下滑动那种)
在这里插入图片描述

3.2.2.1 滑动窗口的实例

继续学习英文单词 slide :滑动(v) 它还有ppt幻灯片的意思。
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) 里面得有两个参数,一个是窗口大小,一个是滑动步长。

不一样的就是处理时间和事件时间。

val dataStream: DataStream[T] = ...
 
//基于事件时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream
    .keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)
 

//基于处理时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

这个是参数多了偏移量。

//基于处理时间的滑动窗口,窗口大小为12小时,滑动步长为1小时,偏移量为8小时
dataStream
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<window function>(...)
3.2.2.2 滑动步长与窗口大小的关系

在这里插入图片描述

3.2.3 会话窗口

会话窗口根据会话间隙(Session Gap)切分不同的窗口,当一个窗口在大于会话间隙的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。
在这里插入图片描述
接下来再来两个实例
看代码的withgap,中间的gap时间。

val input: DataStream[T] = ...
 
//基于事件时间的会话窗口,会话间隙为10分钟
input
    .keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)
 
//基于处理时间的会话窗口,会话间隙为10分钟
input
    .keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)

3.3窗口的计算函数

在Flink的窗口计算程序中,在确定了窗口分配器以后,接下来就要确定窗口计算函数,从而完成对窗口内数据集的计算。

Flink提供了四种类型的窗口计算函数,分别是

1. ReduceFunction、
2. AggregateFunction、
3. FoldFunction
4. ProcessWindowFunction。

根据计算原理,ReduceFunction、AggregateFunction和FlodFunction属于增量聚合函数,而ProcessWindowFunction则属于全量聚合函数(这里处理的是window 那么就是整个窗口了,就是全量了)。

3.3.1 ReduceFunction

ReduceFunction定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合计算,然后输出类型相同的一个结果元素。

从这句话可以理解,先keyBy,再将同组的聚合计算。

接下来看代码

import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random
 
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ReduceWindowFunctionTest {
  def main(args: Array[String]) {
 
    //设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
    //设置程序并行度
env.setParallelism(1)
 
//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    
    //创建数据源,股票价格数据流
    val stockPriceStream: DataStream[StockPrice] = env
      //该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)
//确定针对数据集的转换操作逻辑
    val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(1))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
 
    //打印输出
sumStream.print()
 
    //程序触发执行
    env.execute("ReduceWindowFunctionTest")
  }
 class StockPriceSource extends RichSourceFunction[StockPrice]{
    var isRunning: Boolean = true
 
    val rand = new Random()
    //初始化股票价格
    var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
    var stockId = 0
    var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {
      while (isRunning) {
        //每次从列表中随机选择一只股票
        stockId = rand.nextInt(priceList.size)
        val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
        priceList = priceList.updated(stockId, curPrice)
        val curTime = Calendar.getInstance.getTimeInMillis
 
        //将数据源收集写入SourceContext
        srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
        Thread.sleep(rand.nextInt(1000))
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }
}

分析代码:
具体看这几行

   val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(1))
      .reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

现根据ID分组变成Keyedstream,再分窗口,然后reduce聚合同ID的price。

使用Maven工具对程序进行编译打包,然后提交到Flink中运行,在运行日志中可以看到类似如下的输出结果:

StockPrice(stock_1,1602036130952,39.78897954489408)
StockPrice(stock_4,1602036131741,49.950455275162945)
StockPrice(stock_2,1602036132184,30.073529000410154)
StockPrice(stock_3,1602036133154,79.88817093404676)
StockPrice(stock_0,1602036133919,9.957551599687758)
StockPrice(stock_1,1602036134385,39.68343765292602)
……

3.3.2 AggregateFunction

这个单词的意思就是聚合。

Flink的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。

AggregateFunction比ReduceFunction更加通用,它定义了3个需要复写的方法:

  1. add
  2. getResult
  3. merge

其中,add()定义了数据的添加逻辑,getResult()定义了累加器计算的结果,merge()定义了累加器合并的逻辑。

1.add 函数
功能:该函数用于将输入元素添加到累加器中。在聚合过程中,每当有新的数据元素流入时,Flink 会调用此函数来更新累加器的状态。

参数:add 函数通常接收两个参数,一个是当前的累加器状态,另一个是待聚合的输入元素。

返回值:该函数返回更新后的累加器状态。

2.getResult 函数

功能:该函数用于从累加器中提取聚合结果。在窗口触发或查询结束时,Flink 会调用此函数来获取最终的聚合结果。

参数:getResult 函数通常只接收一个参数,即当前的累加器状态。

返回值:该函数返回聚合后的结果,其类型通常由 AggregateFunction 的输出类型参数指定。

3.merge 函数:

功能:该函数用于在并行执行时合并两个累加器的状态。在分布式计算环境中,同一个窗口的数据可能会分配到不同的节点上进行处理。当这些节点上的聚合操作完成后,需要将它们的累加器状态合并起来以得到全局的聚合结果。

参数:merge 函数通常接收两个参数,即两个待合并的累加器状态。

返回值:该函数返回合并后的累加器状态。

这三个函数共同构成了 Flink 中 AggregateFunction 的核心逻辑。通过实现这三个函数,用户可以定义自定义的聚合操作,以满足各种复杂的数据处理需求。

注意 除了这三个函数外,AggregateFunction 接口通常还包含一个 createAccumulator 方法,用于初始化一个新的累加器实例。该方法在聚合操作开始时被调用,并返回一个空的或初始化的累加器状态。

举例代码:

import java.util.Calendar
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random

case class StockPrice(stockId:String,timeStamp:Long,price:Double) 


object AggregateWindowFunctionTest {
 def main(args: Array[String]) { 
   // 设置执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   //设置程序并行度
   env.setParallelism(1)

//设置为处理时间
   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) 
   
   
   //创建数据源,股票价格数据流
   val stockPriceStream: DataStream[StockPrice] = 
   	env .addSource(new StockPriceSource)    //该数据流由StockPriceSource类随机生成
    
   stockPriceStream.print("input“)    
   
   //设定针对数据集的转换操作逻辑
   val sumStream = stockPriceStream
     .keyBy(s => s.stockId)
     .timeWindow(Time.seconds(1))
     .aggregate(new MyAggregateFunction)   //自定义的聚合函数,那么就需要实现三个方法

   //打印输出
   sumStream.print("output“) 
   //程序触发执行
   env.execute("AggregateWindowFunctionTest")
 }





 class StockPriceSource extends RichSourceFunction[StockPrice]{
   var isRunning: Boolean = true
   val rand = new Random()
   // 初始化股票价格
   
   var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
   var stockId = 0
   var curPrice = 0.0d 


override def run(srcCtx: SourceContext[StockPrice]): Unit = {
     while (isRunning) {
       // 每次从列表中随机选择一只股票
       stockId = rand.nextInt(priceList.size)
       val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
       priceList = priceList.updated(stockId, curPrice)
       val curTime = Calendar.getInstance.getTimeInMillis

       // 将数据源收集写入SourceContext
       srcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))
       Thread.sleep(rand.nextInt(500))
     }
   }
   override def cancel(): Unit = {
     isRunning = false
   }
 }






//自定义函数
 class MyAggregateFunction extends AggregateFunction[StockPrice,(String,Double,Long),(String,Double)] {

//回忆一下:case class StockPrice(stockId:String,timeStamp:Long,price:Double) 
//返回值不要时间了,只要id 和price


 
   //创建累加器
override def createAccumulator(): (String,Double, Long) = ("",0D,0L)



//定义把输入数据累加到累加器的逻辑
   override def add(input:StockPrice,acc:(String,Double,Long))={
     (input.stockId,acc._2+input.price,acc._3+1L)  //平均价格 所以数量  +1L
   }

   //根据累加器得出结果
override def getResult(acc:(String,Double,Long)) = (acc._1,acc._2 / acc._3)

//定义累加器合并的逻辑
   override def merge(acc1:(String,Double,Long),acc2:(String,Double,Long)) = {
     (acc1._1,acc1._2+acc2._2,acc1._3+acc2._3)
   }
 }
}


这里我开始理解为了股票的三个属性(String,Double,Long),这里理解错了。

在 Apache Flink 的 AggregateFunction 接口中,当你定义一个自定义的聚合函数时,你需要指定三个类型参数:

1.输入类型(InputType):这是流中元素的类型,例子中为 StockPrice。
2.累加器类型(AccumulatorType):这是用于在聚合过程中存储中间状态的类型。例子中,这是一个三元组 (String, Double, Long),其中 String 表示股票ID(尽管这里的处理可能不是最理想的,因为通常累加器不应该包含像股票ID这样的非聚合字段),Double 表示价格的总和,Long 表示价格的数量(或说是处理了多少个价格数据点)。
3.输出类型(OutputType):这是聚合函数最终产生的结果类型。例子中,这也是一个二元组 (String, Double),其中 String 同样是股票ID(这里同样需要注意可能的逻辑问题),Double 是计算出的平均价格。

我才开始没有理解acc1._3 + acc2._3,现写如下:

acc1._1,
  acc1._2 + acc2._2, // 合并价格总和
  acc1._3 + acc2._3  // 合并价格数量

根据代码的先后顺序及运行顺序,最后执行getresult。
先聚合,最后平均。

在大多数情况下,add 方法会首先被调用,用于处理流入的数据并更新累加器状态。然后,根据并行度和数据分布,merge 方法可能会被调用以合并累加器状态。最后,在窗口触发或查询结束时,getResult 方法会被调用以提取最终的聚合结果。

代码分析完后,输出结果:

input> StockPrice(stock_2,1602040572049,29.99367518574229)
input> StockPrice(stock_2,1602040572205,30.03665296896211)
input> StockPrice(stock_2,1602040572601,30.00867347810531)
input> StockPrice(stock_0,1602040572856,9.974154737531954)
input> StockPrice(stock_1,1602040572934,19.997437804748245)
output> (stock_2,30.013000544269904)
output> (stock_1,19.997437804748245)
output> (stock_0,9.974154737531954)

3.3.3 FoldFunction

FoldFunction决定了窗口中的元素如何和一个输出类型的元素进行结合。对于每个进入窗口的元素而言,FoldFunction会被增量调用。窗口中的第一个元素将会和这个输出类型的初始值进行结合。需要注意的是,FoldFunction不能用于会话窗口和那些可合并的窗口。

//前面的代码和ReduceWindowFunctionTest程序中的代码相同,因此省略
val sumStream = stockPriceStream
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(1))
      .fold("CHINA_"){ (acc, v) => acc + v.stockId }

3.3.4 ProcessWindowFunction

前面提到的ReduceFunction和AggregateFunction都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景的需求

但是,在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到 ProcessWindowFunction,因为它能够更加灵活地支持基于窗口全部数据元素的结果计算。

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
 
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ProcessWindowFunctionTest {
  def main(args: Array[String]) {
 
    //设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
    //设置程序并行度
    env.setParallelism(1)
 
    //设置为处理时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
    //创建数据源,股票价格数据流
    val source = env.socketTextStream("localhost", 9999)
 

    //指定针对数据流的转换操作逻辑
    val stockPriceStream = source
      .map(s => s.split(","))
      .map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))
 

    val sumStream = stockPriceStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          //为了测试方便,这里把水位线设置为0
          .forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
            override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
          }
          )
      )
      
      .keyBy(s => s.stockId)
      .timeWindow(Time.seconds(3))
      .process(new MyProcessWindowFunction())
 
 
    //打印输出
    sumStream.print()
 
    //执行程序
    env.execute("ProcessWindowFunction Test")
  }
 
 
  class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
    //一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
    
    override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
      //聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据
      var sumPrice = 0.0;
      elements.foreach(stock => {
        sumPrice = sumPrice + stock.price
      })
      out.collect(key, sumPrice/elements.size)
    }
  }
}

这个代码里需要注意的是 .assignTimestampsAndWatermarks 和MyProcessWindowFunction

.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
    .withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
      override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp
    })
)

WatermarkStrategy.forBoundedOutOfOrderness 用于创建一个处理有界乱序数据的水位线策略。参数 Duration.ofSeconds(0) 表示没有乱序,即所有数据都是按时序到达的(在实际应用中,这通常是一个简化的假设,实际数据往往会有一定程度的乱序)。withTimestampAssigner 方法用于指定如何从数据元素中提取时间戳,这里是从 StockPrice 对象的 timeStamp 字段中提取。

process 方法用于应用一个自定义的 ProcessWindowFunction。在这个例子中,MyProcessWindowFunction 是一个自定义的窗口函数,它接收一个键(股票ID)、一个上下文对象(包含窗口的元数据,如开始和结束时间)、一个包含窗口内所有元素的迭代器,以及一个用于收集输出结果的收集器。

在 MyProcessWindowFunction 的 process 方法中,代码遍历了窗口内的所有 StockPrice 元素,计算了价格的总和,并计算了平均值(总和除以元素数量)。然后,它将结果(股票ID和平均价格)收集到输出流中。

下一小节该总结触发器啦。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2248957.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟地址空间与物理内存(Linux系统)

个人主页&#xff1a;敲上瘾-CSDN博客 个人专栏&#xff1a;Linux学习、游戏、数据结构、c语言基础、c学习、算法 目录 问题引入 一、什么是虚拟内存 二、虚拟内存的描述与组织 三、页表的优势 四、虚拟内存区域划分 问题引入 为引入今天的话题&#xff0c;我们先来看下面…

docker-compose搭建xxl-job、mysql

docker-compose搭建xxl-job、mysql 1、搭建docker以及docker-compose2、下载xxl-job需要数据库脚本3、创建文件夹以及docker-compose文件4、坑来了5、正确配置6、验证-运行成功 1、搭建docker以及docker-compose 略 2、下载xxl-job需要数据库脚本 下载地址&#xff1a;https…

【ArcGIS Pro实操第11期】经纬度数据转化成平面坐标数据

经纬度数据转化成平面坐标数据 数据准备ArcGIS操作步骤-投影转换为 Sinusoidal1 投影2 计算几何Python 示例 另&#xff1a;Sinusoidal (World) 和 Sinusoidal (Sphere) 的主要区别参考 数据准备 数据投影&#xff1a; 目标投影&#xff1a;与MODIS数据相同&#xff08;Sinu…

【模型学习之路】PyG的使用+基于点的任务

这一篇是关于PyG的基本使用 目录 前言 PyG的数据结构 演示 图的可视化 基于点的任务 任务分析 MLP GCN 前言 对图结构感兴趣的朋友可以学一下常用的有关图结构的库&#xff1a;networkx详细介绍 networkx 库&#xff0c;探讨它的基本功能、如何创建图、操作图以及其常…

如何监控Elasticsearch集群状态?

大家好&#xff0c;我是锋哥。今天分享关于【如何监控Elasticsearch集群状态&#xff1f;】面试题。希望对大家有帮助&#xff1b; 如何监控Elasticsearch集群状态&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 监控 Elasticsearch 集群的状态对于确保…

Edify 3D: Scalable High-Quality 3D Asset Generation

Deep Imagination Research | NVIDIA 目录 一、Abstract 二、核心内容 1、多视图扩散模型 3、重建模型&#xff1a; 4、数据处理模块&#xff1a; 三、结果 1、文本到 3D 生成结果 2、图像到 3D 生成结果 3、四边形网格拓扑结构 一、Abstract NVIDIA 开发的用于高质量…

QUAD-MxFE平台

QUAD-MxFE平台 16Tx/16Rx直接L/S/C频段采样相控阵/雷达/电子战/卫星通信开发平台 概览 优势和特点 四通道MxFE数字化处理卡 使用MxFE的多通道、宽带系统开发平台 与Xilinx VCU118评估板&#xff08;不包括&#xff09;搭配使用 16个RF接收(Rx)通道&#xff08;32个数字Rx通道…

操作系统 锁——针对实习面试

目录 操作系统 锁什么是死锁&#xff1f;说说死锁产生的条件&#xff1f;死锁如何预防&#xff1f;死锁如何避免&#xff1f;银行家算法具体怎么操作&#xff1f;死锁如何解决&#xff1f;死锁会产生什么影响&#xff1f;乐观锁与悲观锁有什么区别&#xff1f; 操作系统 锁 什么…

UI设计-色彩、层级、字体、边距(一)

一.色彩&#xff1a;色彩可以影响人的心理与行动&#xff0c;具有不同的象征意义&#xff1b;有冷暖&#xff0c;轻重&#xff0c;软硬等等。 1.色彩情绪&#xff1a;最直观的视觉感受 一个活动的页面所用的颜色必须要与其内容相适应&#xff0c;让人看起来舒服。有时我们会不…

从入门到精通数据结构----四大排序(上)

目录 首言&#xff1a; 1. 插入排序 1.1 直接插入排序 1.2 希尔排序 2. 选择排序 2.1 直接选择排序 2.2 堆排序 3. 交换排序 3.1 冒泡排序 3.2 快排 结尾&#xff1a; 首言&#xff1a; 本篇文章主要介绍常见的四大排序&#xff1a;交换排序、选择排序、插入排序、归并排…

【C++第三方库】Muduo库结合ProtoBuf库搭建服务端和客户端的过程和源码

每日激励&#xff1a;“不设限和自我肯定的心态&#xff1a;I can do all things。 — Stephen Curry” 绪论​&#xff1a; 本章我将结合之前的这俩个第三方库快速上手protobuf序列化和反序列化框架和muduo网络&#xff0c;来去实现muduo库在protocol协议搭建服务端和客户端。…

Scala—Map用法详解

Scala—Map用法详解 在 Scala 中&#xff0c;Map 是一种键值对的集合&#xff0c;其中每个键都是唯一的。Scala 提供了两种类型的 Map&#xff1a;不可变 Map 和可变 Map。 1. 不可变集合&#xff08;Map&#xff09; 不可变 Map 是默认的 Map 实现&#xff0c;位于 scala.co…

文本处理之sed

1、概述 sed是文本编辑器&#xff0c;作用是对文本的内容进行增删改查。 和vim不一样&#xff0c;sed是按行进行处理。 sed一次处理一行内容&#xff0c;处理完一行之后紧接着处理下一行&#xff0c;一直到文件的末尾 模式空间&#xff1a;临时储存&#xff0c;修改的结果临…

了解网络威胁情报:全面概述

网络威胁情报 CTI 是指系统地收集和分析与威胁相关的数据&#xff0c;以提供可操作的见解&#xff0c;从而增强组织的网络安全防御和决策过程。 在数字威胁不断演变的时代&#xff0c;了解网络威胁情报对于组织来说至关重要。复杂网络攻击的兴起凸显了制定强有力的策略以保护敏…

Python 海龟绘图 turtle 的介绍

python的计算生态中包含标准库和第三方库 标准库&#xff1a;随着解释器直接安装到操作系统中的功能模块 第三方库&#xff1a;需要经过安装才能使用的功能模块 库Library 包 Package 模块Module 统称为模块 turtle 是一个图形绘制的函数库&#xff0c;是标准库&#…

学习日志017--python的几种排序算法

冒泡排序 def bubble_sort(alist):i 0while i<len(alist):j0while j<len(alist)-1:if alist[j]>alist[j1]:alist[j],alist[j1] alist[j1],alist[j]j1i1l [2,4,6,8,0,1,3,5,7,9] bubble_sort(l) print(l) 选择排序 def select_sort(alist):i 0while i<len(al…

java集合及源码

目录 一.集合框架概述 1.1集合和数组 数组 集合 1.2Java集合框架体系 常用 二. Collection中的常用方法 添加 判断 删除 其它 集合与数组的相互转换 三Iterator(迭代器)接口 3.0源码 3.1作用及格式 3.2原理 3.3注意 3.4获取迭代器(Iterator)对象 3.5. 实现…

⭐️ GitHub Star 数量前十的工作流项目

文章开始前&#xff0c;我们先做个小调查&#xff1a;在日常工作中&#xff0c;你会使用自动化工作流工具吗&#xff1f;&#x1f64b; 事实上&#xff0c;工作流工具已经变成了提升效率的关键。其实在此之前我们已经写过一篇博客&#xff0c;跟大家分享五个好用的工作流工具。…

【Jenkins】自动化部署 maven 项目笔记

文章目录 前言1. Jenkins 新增 Maven 项目2. Jenkins 配置 Github 信息3. Jenkins 清理 Workspace4. Jenkins 配置 后置Shell脚本后记 前言 目标&#xff1a;自动化部署自己的github项目 过程&#xff1a;jenkins 配置、 shell 脚本积累 相关连接 Jenkins 官方 docker 指导d…

杂7杂8学一点之多普勒效应

最重要的放在最前面&#xff0c;本文学习资料&#xff1a;B站介绍多普勒效应的优秀视频。如果上学时老师这么讲课&#xff0c;我估计会爱上上课。 目录 1. 多普勒效应 2. 多普勒效应对通信的影响 3. 多普勒效应对低轨卫星通信的影响 1. 多普勒效应 一个小石头扔进平静的湖面…