43、Flink 的 Window Join 详解

news2025/1/11 22:51:36
1.Window Join
a)概述

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上,窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。

两个流中的元素在组合之后,会被传递给用户定义的 JoinFunctionFlatJoinFunction,可以用它们输出符合 join 要求的结果。

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

注意

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。
b)滚动 Window Join

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunctionFlatJoinFunction

行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

在这里插入图片描述

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口,将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction

注意:滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
c)滑动 Window Join

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunctionFlatJoinFunction,当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!

注意:在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

在这里插入图片描述

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素;图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
d)会话 Window Join

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunctionFlatJoinFunction,这个操作同样是 inner join,如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

在这里插入图片描述

定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction,而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1710275.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stream-实践应用-统计分析

背景 业务部门提供了一个数据&#xff0c;数据甚至不是excel类型的&#xff0c;是data.txt&#xff0c;每一行都是一个数据&#xff0c;需要对此数据进行统计分析 统计各个月份的销量 因为直接获取resources下的data.txt&#xff0c;所以要借助输入流进行获取数据&#xff0c;再…

sqli-labs---第三关

1、判断什么类型注入 ?id1 正常显示 ?id1 &#xff08;报错&#xff1a;1) LIMIT 0,1&#xff09; ?id1 正常显示 ?id1#(报错&#xff1a;1) LIMIT 0,1) 可知闭合方式为) 2、查看列数 ?id1) order by 3 -- (没有报错) ?id1) order by 4 -- (报错) 说明有3列 3、使用联合查…

Scrapy框架简单介绍及Scrapy项目编写详细步骤(Scrapy框架爬取豆瓣网站示例)

引言 Scrapy是一个用Python编写的开源、功能强大的网络爬虫框架&#xff0c;专为网页抓取和数据提取设计。它允许开发者高效地从网站上抓取所需的数据&#xff0c;并通过一系列可扩展和可配置的组件来处理这些数据。Scrapy框架的核心组成部分包括&#xff1a; Scrapy Engine&…

window本地部署Dify

Dify与之前的MaxKB不同&#xff0c;MaxKB可以实现基础的问答以及知识库功能&#xff0c;但是如果要开发一个Agent&#xff0c;或者工作流就还是需要额外开发&#xff0c;而Dify 是一个开源 LLM 应用开发平台。其直观的界面结合了 AI 工作流、RAG 管道、代理功能、模型管理、可观…

python制作一个批量更新文件名称的工具

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一.前言 二.实现 三.使用效果 一.前言 随着数字化时代的到来&#xff0c;文件管理和处理变…

14.微信小程序之地理定位功能

目录 1.地理定位介绍 1.1 申请开通 1.2 使用方法 2.拒绝授权后的解决方案 3.开通腾讯位置服务 4.LBS 逆地址解析 1.地理定位介绍 小程序地理定位是指通过小程序开发平台提供的 API&#xff0c;来获取用户的地理位置信息。用户在使用小程序时&#xff0c;可以授权小程序获…

【LabVIEW FPGA入门】同步C系列模块

1.同步使用循环定时器VI计时循环速率的系列模块 数字模块SAR ADC 模块多路复用模块 数字通道可以在一个时钟周期内执行。模拟通道需要多个时钟周期。 同步模拟模块的每个通道有一个 ADC&#xff0c;采集的数据在通道之间没有明显的偏差。多路复用模块使用多路复用器通过单个 A…

解决:error: failed to push some refs to ‘https://gitee.com/***/***.git‘(高效快速)

解决方案&#xff1a; git pull --rebase origin master 具体原因&#xff1a; 主要原因是gitee(github)中的README.md文件不在本地代码目录中 要执行git pull --rebase origin master命令将README.md拉到本地 然后就可以执行git push啦 写在最后&#xff1a; 要是问题得到…

MySQL简单测试和安装

MySQL 的特点 1、MySQL 性能卓越、服务稳定&#xff0c;很少出现异常宕机。 2、MySQL开放源代码且无版权制约&#xff0c;自主性及使用成本低。 3、MySQL历史悠久(版本众多)&#xff0c;用户使用活跃&#xff0c;遇到问题可以寻求帮助。 4、MySQL体积小(相对大型关系型数据库)…

你还不知道宠物空气净化器的五大好处?难怪家里总有异味和猫毛!

养猫是一件非常令人愉快的事情&#xff0c;猫咪的陪伴能带给我们无尽的欢乐。然而&#xff0c;随着时间的推移&#xff0c;许多养猫的朋友会发现一个问题&#xff0c;那就是家中的猫毛和异味问题。其实&#xff0c;解决这些问题的关键就在于选择一款高效的宠物空气净化器。今天…

嵌入式学习——3——多点通信

1、套接字选项&#xff08;socket options&#xff09; int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen); int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen); 功能&#xff1a;获取或设置套接…

二叉树——基础知识详解

前言&#xff1a; 经过前面的学习&#xff0c;我们接下来要开始二叉树的学习&#xff0c;因二叉树有难度&#xff0c;为了方便讲解以及各位的理解&#xff0c;本节知识会分成不同的小节进行学习&#xff0c;在本阶段只学习初阶的二叉树&#xff08;堆&#xff0c;二叉数基本知识…

多分支拓扑阻抗匹配

最近测试信号质量&#xff0c;发现在有过冲、振铃等问题的时候大部分硬件工程师喜欢直接调大匹配电阻或者减小驱动电流&#xff0c;虽然这种操作是有效果的&#xff0c;但是我认为应该还可以更严谨的计算下&#xff0c;而不是选几个电阻多次尝试&#xff0c;显得不是很专业。 …

SOLIDWORKS正版一年多少钱 2024版报价

SOLIDWORKS软件作为一款优秀的三维设计工具&#xff0c;以其强大的功能和优质的设计工具&#xff0c;为设计师们提供了前所未有的便利。SOLIDWORKS三维设计软件是一款多科学集成软件&#xff0c;它在产品开发和制造方面发挥着重要作用。 作为整个SOLIDWORKS产品开发解决方案套件…

boost asio异步服务器(2)实现伪闭包延长连接生命周期

闭包 在函数内部实现一个子函数&#xff0c;子函数的作用域内能访问外部函数的局部变量。闭包就是能够读取其他函数内部变量。但是由于闭包会使得函数中的变量都被保存在内存中&#xff0c;内存消耗很大&#xff0c;所以不能滥用闭包&#xff0c;否则会造成程的性能问题&#x…

Discourse 使用 DiscourseConnect 来进行用户数据同步

我们都知道 Discourse 的用户管理和设置都高度依赖电子邮件。 如果 Discourse 没有设置电子邮件 SMTP 的话&#xff0c;作为管理员是没有办法对用户邮箱进行修改并且通过验证的。 可以采取的办法是通过 Discourse 的 DiscourseConnect 来进行用户同步。 根据官方的说法&…

Golang原生http实现中间件

Golang原生http实现中间件 中间件&#xff08;middleware&#xff09;&#xff1a;常被用来做认证校验、审计等 大家常用的Iris、Gin等web框架&#xff0c;都包含了中间件逻辑。但有时我们引入该框架显得较为繁重&#xff0c;本文将介绍通过golang原生http来实现中间件操作。全…

vue复习选择题2

1. 下面哪一个方法可以实现判断元素的class属性是否含有pp样式&#xff1f; &#xff08;A&#xff09; A. hasClass(“pp”)B. is(“pp”)C. attr(“class”,“pp”)D. toggleClass(“pp”) [!NOTE] 当涉及到在 jQuery 中判断元素的类属性时&#xff0c;下面是各个选项的作用…

2024/5/28 P1247 取火柴游戏

取火柴游戏 题目描述 输入 k k k 及 k k k 个整数 n 1 , n 2 , ⋯ , n k n_1,n_2,\cdots,n_k n1​,n2​,⋯,nk​&#xff0c;表示有 k k k 堆火柴棒&#xff0c;第 i i i 堆火柴棒的根数为 n i n_i ni​&#xff1b;接着便是你和计算机取火柴棒的对弈游戏。取的规则如下&…

AI批量剪辑视频素材,高效混剪快速出片/矩阵发布,一键管理自媒体账号。

今天给大家分享一个超级好用的办公神器。特别是玩矩阵的企业&#xff0c;这款工具高效解决短视频剪辑问题。 这款软件可以帮你快速生产出1000条视频内容,而且还能把内容同步到多个平台账号上&#xff0c;多平台矩阵发布。 这款系统真的太棒了! 不仅操作简单,而且功能超强大。 …