Flink IntervalJoin 笔记

news2024/12/27 12:27:40

Flink Join

一、Fink IntervalJoin

1、简要说明

Flink 中基于 DataStream 的join,只能实现在同一个窗口的两个数据流进行Join。但是实际中会存在数据乱序或者延时情况,导致两个流的数据进度不一致。无法在同一个窗口内Join。

Flink 基于 KeyedStream 提供interval join机制 ,intervaljoin 连接两个KeyedStream,按照相同的Key在一个相对数据时间的时间段内进行连接。

  SingleOutputStreamOperator<Object> process = vinNS.intervalJoin(vinESD0).between(Time.milliseconds(-5), Time.milliseconds(5))
          .process(new ProcessJoinFunction<String, String, Object>() {
           @Override
           public void processElement(String s, String s2, Context context, Collector<Object> collector) throws Exception {
           System.out.println("res"+s + ":" + s2);
            collector.collect(s + ":" + s2);

           }
          });

vinNS和vinESDO 两个topic通过 keyby vin码 得到两个 KeyedStream,再进行intervalJoin操作,between方法传递两个参数 lowerBound和upperBound,用来判断右边的流可以与那个时间范围内的左边的流进行关联(leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBoundBound)即:是以右边流为基表关联左边的流。

2、简要源码
@Deprecated //表示当前接口已经不推荐使用
@PublicEvolving //当前接口实现时需要指定生产者所要传输的对象类型

1、Interval join必须指定的时间类型为EventTime,否则直接抛出 UnsupportedTimeCharacteristicException 异常
在这里插入图片描述

2、两个KeyedStream在进行IntervalJoin调用between方法(默认是闭区间,可以通过下面设置为开区间 exclusive)

在这里插入图片描述

3、使用proess 实现 ProcessJoinFunction 入参是 两个流的类型和出参的类型,IntervalJoined left.connect(right).keyBy(keySelector1,keySelector2) 因为left.connect(right)返回的是 ConnectedStreams, keySelector1/2是指demo中两个数据流的keyBy条件,类似与flinksql join中的 on换类了,现在是IntervalJoinOperator类中了,重点也就在IntervalJoinOperator中

在这里插入图片描述

在这里插入图片描述

4、IntervalJoin接口继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator和Triggerable(触发器)接口。

/** @param <K>键的类型,我们根据它来连接元素。
    @param <T1>左流元素的类型。
    @param <T2>右流中元素的类型。
    @param <OUT>用户自定义函数创建的输出类型。*/


在这里插入图片描述

IntervalJoinOperator覆盖了AbstractUdfStreamOperator(StreamOperator定义)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState
在这里插入图片描述

IntervalJoinOperator实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法(TwoInputStreamOperator接口定义的其他一些方法在AbstractUdfStreamOperator的父类AbstractStreamOperator中有实现);processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同。

在这里插入图片描述

processElement方法里头实现了intervalJoin的时间匹配逻辑,它会从internalTimerService获取currentWatermark,然后判断element是否late,如果late直接返回,否则继续往下执行;之后就是把element的value添加到ourBuffer中(对于processElement1来说ourBuffer为leftBuffer,对于processElement2来说ourBuffer为rightBuffer);之后就是遍历otherBuffer(去遍历另一条流的MapState)中的每个元素,挨个判断时间是否满足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound)将数据输出给ProcessJoinFunction调用,ourTimestamp表示流入的数据时间,timestamp表示对应join的数据时间,不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法);

在这里插入图片描述

在这里插入图片描述

若左右两个流都能从对方取到值,以结果 Timestamp 为两边流最大的执行processElement方法

在这里插入图片描述

之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer,定时的清理时间,就是当下流入的数据的时间+relativeUpperBound,当watermark大于该时间就需要清理。

前面取到值后遍历对方流的时间戳。对于左边的流 遍历规则左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s

右边的流:右边时间戳-5s<=左边流时间戳<=右边时间戳-1s

a。如果为左边流数据到达,调用processElement1方法
此时relativeUpperBound为5,relativeLowerBound为1,relativeUpperBound>0,所以定时清理时间为10+5即15s
当时间达到15s时,清除左边流数据,即看右边流在15s时,需要查找的左边流时间范围
10s<=左边流时间戳<=14s,所以watermark>15s时可清除10s的数据。
b。如果为右边流数据到达,调用processElement2方法
此时relativeUpperBound为-1,relativeLowerBound为-5,relativeUpperBound<0,所以定时清理时间为10s
当时间达到10s时,清除右边流数据,即看左边流在10s时,需要查找的右边流时间范围
11s<=右边流时间戳<=15s,所以可以清除10s的数据。
在这里插入图片描述

在这里插入图片描述

/**
@param lowerBound计算元素是否应该连接的下限
@param upperBound计算元素是否应该连接的上限
@param lowerBoundInclusive—是否包含时间戳与下限匹配的元素
@param upperBoundInclusive -是否包含时间戳与上界匹配的元素
@param udf一个用户定义的{@link ProcessJoinFunction},当两个元素被调用时*/

在这里插入图片描述

3、学习文档
1、 TypeSerializer Flink的序列化器

Flink 数据类型与序列化

2、Flink的Process Function 侧输出流

Flink的Process Function

3、AbstractUdfStreamOperator 介绍

https://zhuanlan.zhihu.com/p/466333577

4、本文参照文档

https://blog.csdn.net/weixin_34247155/article/details/87993564?ops_request_misc=&request_id=&biz_id=102&utm_term=CLEANUP_TIMER_NAME&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-1-87993564.142v86control,239v2insert_chatgpt&spm=1018.2226.3001.4187

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/458664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

详论YUM仓库的部署和NFS共享服务

目录 一:YUM仓库服务 1.YUM概述 2.安装源的准备 &#xff08;1&#xff09;软件仓库的提供方式 &#xff08;2&#xff09;RPM软件包的来源 &#xff08;3&#xff09;创建Centos7软件仓库 &#xff08;4&#xff09;在软件仓库加入非官方RPM包 3.访问YUM仓库 4.安装FT…

商品页面翻页功能--购物车拓展

之前我们在mvc练习中曾经写过翻页功能&#xff0c;现在我们给购物车产品显示界面也加一个 1、把productlist中dao的sql语句做出修改&#xff0c;并传递需要用到的参数 再来一个返回product总数的方法 2、 对productlist的servlet拓展相关操作&#xff0c;准备好翻页的功能 3、…

访问图像像素

Opencv访问图像像素 预备知识: 图像矩阵的大小取决于所用的颜色模型(或者说通道数)&#xff0c;灰度图矩阵如下: 多通道图像&#xff0c;如RGB颜色模型的矩阵如下&#xff1a; 注&#xff1a;opencv的通道顺序是BGR&#xff0c;而不是RGB 访问图像中像素方法: import num…

新建虚拟机更改ip(连接xshell)

# 查看网络设备 [rootcentos79 ~]# nmcli device DEVICE TYPE STATE CONNECTION ens32 ethernet 已连接 ens32 ens33 ethernet 已连接 ens33 virbr0 bridge 已连接 virbr0 lo loopback 未托管 -- # 查看…

Unity之OpenXR+XR Interaction Toolkit实现 抓取物体

前言 我们今天来说一下如何使用XR Interaction Toolkit来实现和3D物体的交互之&#xff1a;抓取&#xff0c;简单说就是通过VR手柄拿起来一个物体。 二.准备工作 有了前两篇的配置介绍,我们就不在详细说明这些了&#xff0c;大家自行复习 Unity之OpenXRXR Interaction Toolk…

PyCharm连接远程服务器配置过程

目录 背景 一、建立远程服务器连接 1.创建远程连接 2.进行本地项目与远程项目之间的文夹路径映射 3.设置自动上传项目&#xff08;如有需要&#xff0c;可设置&#xff09; 4.验证是否连接成功&#xff08;调出服务器的文件目录&#xff09; 二、本地配置Python解释器 …

【社区图书馆】-《科技服务与价值链》总结

【为什么研究价值链】 价值链及价值链协同体系是现代产业集群的核心枢纽&#xff0c;是推进城市群及产业集群化、服务化、生态化发展的纽带。因而推进价值链协同&#xff0c;创新发展价值链协同业务科技资源体系&#xff0c;既是科技服务业创新的重要方向&#xff0c;也是重塑生…

NetApp ONTAP: 企业级数据管理软件,为无缝混合云奠定基础

为何选择 NetApp ONTAP 进行企业数据管理 NetApp ONTAP 数据管理软件可帮助您快速应对新的业务挑战&#xff0c;简化日常活动并给您的团队留下深刻印象。无论您在内部环境和云中有着怎样的数据管理需求&#xff0c;ONTAP 都能满足您。 1、支持当今的数据驱动型企业 当今的企…

升级底座、打破壁垒、消灭报销,让业财融合一触即发!

一个平台 一个入口 一站服务 以移动互联网、云计算、大数据、人工智能、5G与物联网、区块链为代表的新一代信息通信技术&#xff08;ICT&#xff09;的集群式、交互式发展&#xff0c;驱动企业进入数智化新阶。商业创新是打造企业竞争力的必然选择&#xff0c;在数字化转型大潮…

p68 内网安全-域横向 PTHPTKPTT 哈希票据传递

数据来源 ​ ​ Kerberos 协议具体工作方法&#xff0c;在域中&#xff0c;简要介绍一下&#xff1a; 客户机将明文密码进行 NTLM 哈希,然后和时间戳一起加密(使用krbtgt 密码 hash 作为密钥)&#xff0c;发送给 kdc&#xff08;域控&#xff09;&#xff0c;kdc 对用户进行检…

C语言从入门到精通第11天(数组的基本操作)

数组的基本操作 数组的概念一维数组二维数组 数组的概念 在程序设计中&#xff0c;为了方便处理数据把具有相同类型的若干变量按有序形式集合在一起&#xff0c;这些按序排列的同类数据元素的集合称为数组。 在C语言中&#xff0c;数组属于构造数据类型&#xff0c;一个数组可…

聊聊如何通过APT+AST来实现AOP功能

前言 如果有使用过spring aop功能的小伙伴&#xff0c;应该都会知道spring aop主要是通过动态代理在运行时&#xff0c;对业务进行切面拦截操作。今天我们就来实现一下如何通过APTAST在编译期时实现AOP功能。不过在此之前先科普一下APT和AST相关内容 APT&#xff08;注解处理…

Nginx的重写功能

一、常用的Nginx 正则表达式 字符涵义以及示例^匹配输入字符串的起始位置$匹配输入字符串的结束位置*匹配前面的字符零次或多次&#xff1b;如“ol*”能匹配“o”及“ol”、“oll”匹配前面的字符一次或多次&#xff1b;如“ol”能匹配“ol”及“oll”、“olll”&#xff0c;但…

智能建筑中电力监控系统的应用与产品选型

摘要&#xff1a;近几十年&#xff0c;中国现代化经济不断发展&#xff0c;计算机技术、信息技术等相关产业也取得了飞跃性的进步。随着商业、生活以及公共建筑不断提高智能管理和节能的要求&#xff0c;电力监控系统开始逐渐渗入人们的日常生活&#xff0c;发挥着不可替代的作…

Graphql中我们应该用什么姿势来实现Resolver?

Graphql中我们应该用什么姿势来实现Resolver? Graphql中我们应该用什么姿势来实现Resolver? 前言设计数据库定义 Type实现 Resolver按需组装查询语句请求数据库GraphQLResolveInfo附录 前言 我最近在用 Graphql 来弥补原先写的 RESTFUL 接口的一些短板。在实践过程中遇到…

实战Websocket

实战Websocket&#xff1a;从入门到自闭 作为前端开发人员&#xff0c;我们经常需要使用 Websocket 实现实时通信功能&#xff0c;如聊天室、实时数据展示、游戏等。近期我在一家公司实习工作中&#xff0c;也遇到了使用 Websocket 的场景&#xff0c;所以开始了解 Websocket …

第四章 使用Maven:IDEA环境

1、创建 Project2、开启自动导入 TIP 各个 IDEA 不同版本在具体操作方面存在一定差异&#xff0c;这里我们以 2019.3.3 版本为例进行演示。其它版本大家灵活变通即可。 第一节 创建父工程 创建 Project 开启自动导入 创建 Project 后&#xff0c;IDEA 会自动弹出下面提示…

有史以来第一次利用 Kubernetes RBAC 攻击后门集群

我们最近发现了有史以来第一个证据&#xff0c;表明攻击者正在野外利用 Kubernetes (K8s) 基于角色的访问控制 (RBAC) 创建后门。 攻击者还部署了 DaemonSets 来接管和劫持他们攻击的 K8s 集群的资源。我们的研究表明&#xff0c;该活动正在积极针对至少 60 个野外集群。 这…

026:Mapbox GL加载矢量切片数据源

第026个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载矢量切片数据源。将矢量源添加到地图。使用其 tileset URL(mapbox:// + tileset ID)添加任何 Mapbox 托管的 tileset。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例…

python@可变对象和不可变对象@按值传递和引用传递@python运行可视化工具

文章目录 可变对象和不可变对象&#x1f388;可视化工具&#x1f388;可变对象和idegeg变量名和内存地址&#x1f388;函数调用对参数的修改&#x1f602;Note 按值传递vs引用传递note&#x1f388;如何借助函数修改外部变量的值?Note 可变对象和不可变对象&#x1f388; 在Py…