Flink双流(join)

news2025/1/11 11:19:15

 一、介绍

Join大体分类只有两种:Window Join和Interval Join

Window Join有可以根据Window的类型细分出3种:Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。

        🌸Window 类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作。

        🌸Interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理,目前Stream join的结果是数据的卡尔积。

二、Window Join

✨Tumbling Window Join

        执行翻滚窗口联接时,具有公共键和公告翻滚窗口的所有元素将成对组合联接,并传递JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射。

        如图所示,我们定义了一个为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]..............该图显示了每个窗口中所以元素的成对组合,这些元素将传递给JoinFunction。注意在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

✨Sliding Window Join

        在执行滑动窗口联接时,具有公共键和公共滑动窗口的所以元素将作为成对组合联接,并传递JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会联接到一个滑动窗口中,但不会联接到另一个滑动窗口中!

        在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[1,2],[2,3]...........x轴下方的连续元素时传递给每个滑动窗口的Join Function的元素。在这里,你还可以看到,例如在窗口[2,3]中,橙色②和绿色③连接,但在窗口[1,2]中没有与任何对象连接。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

✨Session Window Join

        在执行会话窗口联接时,具有相同键(当“组合”满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出

        这里,我们定义一个会话窗口连接,其中每个会话被至少1毫秒的时间分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

三、Interval Join

        前面学习的Window Join必须要在一个Window中进行Join,那如果没有Window如何处理呢?interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳

 

在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1462457.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mac 安装H3C iNode + accessClient mac版

一、下载安装 官网下载地址 https://www.h3c.com/cn/Service/Document_Software/Software_Download/IP_Management/ 可以使用文末参考博文中的账号 yx800 密码 01230123登录下载 选择版本 下载 下载 H3C_iNode_PC_7.3_E0626.zip 文件后&#xff0c;解压下载到的PC端压缩包…

linux单机巡检脚本并发送邮箱的巡检报告

#!/bin/bash # Author: HanWei # Date: 2020-03-16 09:56:57 # Last Modified by: HanWei # Last Modified time: 2020-03-16 11:06:31 # E-mail: han_wei_95163.com #!/bin/bash #安装mail yum -y install mailx#主机信息每日巡检IPADDR$(ifconfig eth0|grep inet addr|aw…

2023数据要素市场十大关键词

2023数据要素市场十大关键词 导读 2023年即将过去。一年之前&#xff0c;《中共中央国务院关于构建数据基础制度更好发挥数据要素作用的意见》&#xff08;简称“数据二十条”&#xff09;正式对外发布&#xff0c;为数据要素市场的建设举旗定向。 图片 2023年是“数据二十条…

python自动化接口测试

前几天&#xff0c;同组姐妹说想要对接口那些异常值进行测试&#xff0c;能否有自动化测试的方法。仔细想了一下&#xff0c;工具还挺多&#xff0c;大概分析了一下&#xff1a; 1、soapui:可以对接口参数进行异常值参数化&#xff0c;可以加断言&#xff0c;一般我们会加http…

Zabbix 6.2.1 安装

目录 1、监控介绍 监控的重要性 网站的可用性 监控范畴 如何监控 2、Zabbix 介绍 zabbix 简介 zabbix 主要功能 zabbix 监控范畴 Zabbix 监控组件 zabbix 常见进程 zabbix agentd 工作模式 zabbix 环境监控中概念 3、搭建LNMP 拓扑规划 安装MySQL 安装 Nginx …

【Vulkan Tutorials 01】【环境搭建】三角形例子

Development Environment&#xff08;开发环境&#xff09; 1. 安装Vulkan SDK 官网 2. 安装cmake和minGW 2.1 cmake 官网 双击可执行文件&#xff0c;然后直接安装&#xff0c;注意环境变量选择设置&#xff0c;否则需要自己操作。 2.2 minGW 官网 下载如下图所示&am…

Qt应用-天气预报实例

本文讲解Qt实现天气预报实例。 实现的功能 网络实时获取和显示6天的天气参数并绘制温度趋势曲线; 测试当前网络连接情况; 获得当前的IP地址的行政位置信息; 设计界面如下: 创建保存天气数据的类 #ifndef WEATHERDATA_H #define WEATHERDATA_H #include <QString>…

如何使用ArcGIS Pro生成等高线

无论在制图还是规划中&#xff0c;经常会使用到等高线&#xff0c;大多数情况下&#xff0c;从网上获取的高程数据都是DEM文件&#xff0c;我们可以通过ArcGIS Pro来生成等高线&#xff0c;这里为大家介绍一下生成方法&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的…

【转载】企业资产收集与脆弱性检查工具

简介 云图极速版是针对拥有攻击面管理需求的用户打造的 SaaS 应用&#xff0c;致力于协助用户管理互联网资产攻击面的 SaaS 化订阅服务产品。可实现对备案域名、子域名、IP、端口、服务、网站、漏洞、安全风险等场景进行周期性监控&#xff0c;支持多维度分析攻击面。利用可视化…

下一代自动化爬虫神器--playwright,所见即所得,配合逆向不要太香!!!

文章目录 1.Playwright介绍2.与 Selenium 和 pyppeteer 相比&#xff0c;Playwright 具有以下几个区别和优势3.在爬虫中使用 Playwright 的好处4.环境安装5.屏幕录制6.保留记录cookie信息7.playwright代码编写详解1.第一个Playwright脚本&#xff08;1&#xff09;同步模式&…

小米标准模组+MCU 快速上手开发(二)——之模组串口调试

小米标准模组MCU 开发笔记之固件调试 背景技术名词简介● 小米IoT开发者平台● 小米IoT 模组● 固件● OTA● CRC32 固件双串口调试● MHCWB6S-IB 模组资料下载● MHCWB6S-IB 模组管脚图● 上电调试 背景 小米标准模组MCU的开发过程中&#xff0c;由于部分官方资料较为古早&am…

解决MobaXterm网络错误连接超时问题

报错页面&#xff1a; 报错原因&#xff1a; ①网络断开了 ②网络端口&#xff0c;端口号改变 解决办法&#xff1a; ①重新连接网络按R ②固定端口号 第一步&#xff1a;编辑------>虚拟机网络编辑器&#xff08;我的Linux在虚拟机里&#xff09; 第二步&#xff1a;用…

【触想智能】工业平板知识分享|选购工业平板电脑需要注意的7大事项

工业平板电脑是一种将显示器、工控主板、触摸屏和其他电子设备整合在一起的电子产品。它广泛应用于工业控制和自动化领域。 在购买工业平板电脑时&#xff0c;需要考虑一些关键性因素&#xff0c;以确保工业平板电脑是安全可靠、运行稳定的。那么我们在购买工业平板电脑的时候&…

js设计模式:计算属性模式

作用: 将对象中的某些值与其他值进行关联,根据其他值来计算该值的结果 vue中的计算属性就是很经典的例子 示例: let nowDate 2023const wjtInfo {brithDate:1995,get age(){return nowDate-this.brithDate}}console.log(wjtInfo.age,wjt年龄)nowDate 1console.log(wjtInf…

5 原型模式 Prototype

1.模式定义: 指原型实例指定创建对象的种类&#xff0c;并且通过拷贝这些原型创建新的对象 2.应用场景&#xff1a; 当代码不应该依赖于需要复制的对象的具体类时&#xff0c;请使用Prototype模式。 Spring源码中的应用 org.springframework.beans.factory.support.AbstractB…

飞天使-k8s知识点24-kubernetes实操9-数据存储2配置存储

文章目录 高级存储pvc生命周期 配置存储secret 高级存储 前面已经学习了使用NFS提供存储&#xff0c;此时就要求用户会搭建NFS系统&#xff0c;并且会在yaml配置nfs。由于kubernetes支持的存储系统有很多&#xff0c;要求客户全都掌握&#xff0c;显然不现实。为了能够屏蔽底层…

排序算法1:冒泡排序、快速排序、插入排序

排序算法&#xff1a;交换类排序&#xff0c;插入类排序、选择类排序、归并类排序 交换类排序&#xff1a;冒泡排序、快速排序 一、冒泡排序 #include <stdio.h> #include <stdlib.h> #include <time.h> typedef int ElemType; typedef struct{ElemType *e…

C++面试宝典第31题:有效的数独

题目 判断一个9 x 9的数独是否有效。只需要根据以下规则,验证已经填入的数字是否有效即可。 1、数字1-9在每一行只能出现一次。 2、数字1-9在每一列只能出现一次。 3、数字1-9在每一个以粗实线分隔的3 x 3宫内只能出现一次。 下图是一个部分填充的有效的数独,数独部分空格内已…

Django学习笔记-HTML实现MySQL的图片上传

1.django项目编写index.html代码 创建form表单,路由指向upload,请求方式post,enctype设置"multipart/form-data", post请求添加{% csrf_token %},编写两个input,上传和提交 2.添加upload路由 3.views中创建upload 1).获取上传的文件,没有上传则返回"没有指定…

面试答疑03

1、登录鉴权怎么做的&#xff1f;为什么采用jwt的方式&#xff1f;有什么好处&#xff1f; Java登录鉴权常见的实现方式包括**CookieSession、HTTP Basic Authentication、ServletJDBC**等。 在Java的Web应用中&#xff0c;登录鉴权是确认用户身份的关键环节。一个常用的传统…