Flink学习26:触发器

news2024/12/23 10:18:23

触发器

 作用:决定何时,触发窗口计算函数,开始计算

 

每个窗口都有一个默认触发器,也可以自定义触发器。

自定义触发器

示例1:

当流中元素达到5个以后,触发窗口计算。

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties

//case class StockPrice(stockId:String, timestamp: Long, price:Double)

object trigger {

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //set parallelism
    env.setParallelism(1)

    //set process time
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

//    //for kafka connection
//    val kafkaProps = new Properties()
//
//    //kafka's attribute
//    kafkaProps.setProperty("bootstrap.servers","192.12.249.10:9092")
//
//    //set the consumer's group
//    kafkaProps.setProperty("group.id","groupq")
//
//    //create the consumer
//    val kafkaSource = new FlinkKafkaConsumer[String]("stockPrice", new SimpleStringSchema, kafkaProps)
//
//    //set offset
//    kafkaSource.setStartFromEarliest()
//
//    //auto commit offset
//    kafkaSource.setCommitOffsetsOnCheckpoints(true)
//
//    //band data source
//    val ds = env.addSource(kafkaSource)
//
//    val stockPriceStream = ds.map(s => s.split(","))
//      .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))

    //create ds
    val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23), StockPrice("stock3", 10, 888.23))

    val ds = env.fromCollection(pricesList)
//    ds.print()


    val sumedStream = ds.keyBy(s => s.stockId)
      .timeWindow(Time.seconds(10))
      .trigger(new MyTrigger(3))
      .reduce((s1, s2) => StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price))

    sumedStream.print()

    env.execute()

  }


  class MyTrigger extends Trigger[StockPrice, TimeWindow] {

    //to receive the para
    def this(maxCount:Int){
      this()
      this.maxCount = maxCount
    }

    //declare ( if reach max num ,then trigger windows)
    private var maxCount:Long = _

    //get trigger's state
    private lazy val countStateDescriptor = new ReducingStateDescriptor[Long]("count", new Sum, classOf[Long])

    //override on element
    override def onElement(t: StockPrice, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {

      //get the trigger's state
      val countState = triggerContext.getPartitionedState(countStateDescriptor)

      //state add
      countState.add(1L)

      //judge state more than max trigger num
      if(countState.get() >= this.maxCount){

        //reach max num,then clear and trigger window compute
        //clear state
        countState.clear()

        //compute
        TriggerResult.FIRE

      }else{
        TriggerResult.CONTINUE
      }
    }

    override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
      //do nothing, cause we don't need deal process Time window, but need to override func
      TriggerResult.CONTINUE
    }

    override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
      //do nothing
      TriggerResult.CONTINUE
    }

    //clear the state, when window reach max num to trigger the compute
    override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
      println("@--now, window is closeed")
      triggerContext.getPartitionedState(countStateDescriptor).clear()
    }

    //update the state,
    class Sum extends ReduceFunction[Long]{
      override def reduce(t: Long, t1: Long): Long = {
        t+t1
      }
    }
  }



}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/74272.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于数据挖掘算法的服装销售平台的设计与实现(spring+spring mvc+mybatis+mysql+maven)

目 录 摘 要 I Abstract II 目 录 III 1 绪论 1 1.1 研究背景 1 1.2 研究意义 2 1.3 国内外研究现状 2 2 相关理论和开发工具 4 2.1 数据挖掘简述 4 2.2 相关数据挖掘算法概述 4 2.2.1关联规则 4 2.2.2 聚类算法 5 2.2.3 分类算法 5 2.3 文本挖掘概述 6 2.4 开发工具 7 3系统需…

5-10人的创业团队,怎么在半个月内上线一款新产品?

5~10 人的小微型创业团队,需不需要专业的研发协作工具? 随着生产力工具的价值获得更广泛的认可,越来越多观点认为,组织结构精简、业务尚未成熟的小微型团队应该尽早引入专业研发协作工具,完成核心竞争力的蜕变。 猴子…

2022 计网复习应用题【太原理工大学】

最后一道大题 —— 应用题&#xff0c;有以下几个考点&#xff0c;原理无需懂会算就行&#xff0c;15 分 拿 10 分不难&#xff0c;建议看一下。>_< 目录 1. 判断 IP 地址类型 2. 通过 IP 地址求子网掩码 3. 求网络地址和广播地址 4. 求主机号和可用 IP 5. 双绞线的…

【Spring】一文带你搞懂Spring IOC容器

前言 本文为 【Spring】Spring IOC容器 相关知识&#xff0c;首先为大家介绍Spring IOC相关的名词概念&#xff0c;对Spring IOC进行概述&#xff0c;然后具体为大家介绍配置元数据&#xff0c;容器实例化与使用等Spring IOC相关详尽内容~ &#x1f4cc;博主主页&#xff1a;小…

【Python毕业设计】Python基于面向对象+tkinter打造学生信息管理系统 | 附源码

前言 halo&#xff0c;包子们上午好 很多学计算机的小伙伴应该都知道&#xff0c;毕业设计是一个头疼的东西 今天的话小编这边给大家准备好了一个Python基于面向对象tkinter打造学生信息管理系统 这不是毕业设计必备项目 说实话操作起来还是有那么一点点的难度的&#xff0c;但…

2023年天津医科大学临床医学院专升本专业课考试报名缴费考试安排

天津医科大学临床医学院2023年高职升本科专业课考试报考须知 一、报名条件&#xff1a; 报考2023年天津医科大学临床医学院高职升本科专业课考试的考生应符合以下条件&#xff1a; 1、符合《2023年天津市高职升本科招生工作规定》中规定的报考资格。 2、我院高职升本科专业课考…

scViewerX ActiveX 多功能文件查看器控件

scViewerX ActiveX 控件 scViewerX是一个功能强大的 ActiveX 控件&#xff0c;允许您查看、打印和转换 PLT、Adobe PDF、Autodesk DWF、CGM、Calcomp、HPGL/2、Gerber、TIF、CALS 和其他几种格式。 ScViewerX 可以将您的文件转换为多种不同的输出文件格式&#xff0c;包括 PDF、…

软件测试题库怎么样 这个刷题小程序很适合临时抱佛脚

考试刷题&#xff0c;面试找工作也要刷题&#xff1f;说到这&#xff0c;可能很多都觉得不可思议&#xff0c;这找工作&#xff0c;还得提前刷题做准备&#xff1f;其实这个现象一个都有的&#xff0c;尤其是对于技术岗来说&#xff0c;由于面试官会着重询问技术相关问题&#…

结合邻域连接法的蚁群优化(NACO)求解TSP问题附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

(附源码)node.js学生钟点工管理系统 毕业设计 290011

学生钟点工管理系统 摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;学生钟点工管理系统当然也不能排除在外。学生钟点工管理系统是以实际运用为开发背景&#xff0c;运用…

docker(2):docker常用命令

目录帮助命令镜像命令docker imagesdocker searchdocker pulldocker rmi容器命令docker rundocker psdocker rm启动/停止/退出其他常用命令后台启动docker logsdocker topdocker inspectdocker execDocker attachdocker cp命令大全总结所有命令请查看帮助文档&#xff1a;https…

[附源码]Python计算机毕业设计SSM基于售楼系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Span 抽取和元学习能碰撞出怎样的新火花,小样本实体识别来告诉你!

作者&#xff1a;王嘉宁、汪诚愚、谭传奇、邱明辉、黄松芳、黄俊、高明 近日&#xff0c;阿里云机器学习平台PAI与华东师范大学高明教授团队、达摩院机器智能技术NLP团队合作在自然语言处理顶级会议EMNLP2022上发表基于Span和元学习的小样本实体识别算法SpanProto。这是一种面…

界面控件DevExtreme DataGrid——一个多用途的UI组件

DevExtreme拥有高性能的HTML5 / JavaScript小部件集合&#xff0c;使您可以利用现代Web开发堆栈&#xff08;包括React&#xff0c;Angular&#xff0c;ASP.NET Core&#xff0c;jQuery&#xff0c;Knockout等&#xff09;构建交互式的Web应用程序&#xff0c;该套件附带功能齐…

【视频】什么是非线性模型与R语言多项式回归、局部平滑样条、 广义相加GAM分析工资数据|数据分享...

全文链接&#xff1a;http://tecdat.cn/?p9706在这文中&#xff0c;我将介绍非线性回归的基础知识。非线性回归是一种对因变量和一组自变量之间的非线性关系进行建模的方法。最后我们用R语言非线性模型预测个人工资数据&#xff08;查看文末了解数据获取方式&#xff09;是否每…

《人月神话》(The Mythical Man-Month)看清问题的本质:如果我们想解决问题,就必须试图先去理解它...

第一章 焦油坑&#xff08;The Tar Pit&#xff09;史前史中&#xff0c;没有比巨兽在焦油坑中垂死挣扎的场面更令人震撼的了。上帝见证着恐龙、猛犸象、剑齿虎在焦油中挣扎。它们挣扎得越是猛烈&#xff0c;焦油纠缠得越紧&#xff0c;没有任何猛兽足够强壮或具有足够的技巧&a…

Linux简化版线程池

目录 一&#xff0c;线程池设计 二&#xff0c;线程池应用场景 三&#xff0c;线程池准备 1&#xff0c;包装一个锁 2&#xff0c;一个任务类 三&#xff0c;线程池 1&#xff0c;成员介绍 2&#xff0c;设计单例模式 3&#xff0c;创建线程池 4&#xff0c;线程池执…

【吴恩达机器学习笔记】十六、应用实例:图片文字识别

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4e3;专栏定位&#xff1a;为学习吴恩达机器学习视频的同学提供的随堂笔记。 &#x1f4da;专栏简介&#xff1a;在这个专栏&#xff0c;我将整理吴恩达机器学习视频的所有内容的笔记&…

【Linux】Zabbix5.0平台的搭建

文章目录项目背景视频展演一、Linux基础配置1、查看当前系统版本2、修改主机名3、修改 IP 地址4、配置防火墙5、关闭 SELINUX6、修改系统时间及时区7、配置 YUM 库方式 1方式 2二、安装apache1、安装 apache2、启动 apache 服务3、设置 httpd 服务开机启动4、查看服务状态5、防…

深度学习——制作自己的VOC图像分割数据集

1、数据集介绍 COCO数据集有80个类别,VOC数据集有20个类别。当这些数据集类别中没有自己需要的时候,就需要自己动手做自己的数据集了。 我自己在做数据集的时候主要使用到了labelme和labelImg两个工具。labelme主要是制作语义分割数据集(ImageSets,JPEGImages,SegmentationC…