Flink Join操作

news2025/1/10 20:57:04

目录

DataStream API(函数编程)

window Join

join

coGroup

interval Join

Table API(flink sql)

Reguler Join (常规join)

inner join

left join / right join

full join

interval join

lookup join

Window Join

INNER/LEFT/RIGHT/FULL OUTER 


DataStream API(函数编程)

window Join

join

对处于同一窗口的数据进行join

时间类型:processTime、eventTime

问题:1、不在同一窗口的数据无法join,

           2、只能inner join

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>) // 左侧key值
    .equalTo(<KeySelector>) // 右侧key值
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)) // 开窗方式 tumbing/sliding/session
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

coGroup

coGroup是join的底层方法,通过coGroup可以实现inner/left/right/full 四种join

时间类型:processTime、eventTime

问题:不在同一窗口的数据无法join

interval Join

 为了解决window join的问题:处于不同窗口的数据无法join

时间类型:eventTime

interval join :根据左流的数据的时间点,左右各等待一段右流时间,在此范围内进行join

问题:只能是以左流为时间线,因此只支持inner join

latedata.coGroup(stream)
    .where(a->a.getStr("a"))
    .equalTo(a->a.getStr("a"))
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .apply(new CoGroupFunction<JSONObject, JSONObject, Object>() {
           @Override
           public void coGroup(Iterable<JSONObject> iterable, Iterable<JSONObject> iterable1, Collector<Object> collector) throws Exception {
                            
     }
})

Table API(flink sql)

Reguler Join (常规join)

默认没有时间范围,全局都可以join

可以设置数据过期时间

tableEnv.getConfig().setIdleStateRetention(xx)

设置过期时间后以西四种join 数据过期方式各有不同

inner join

inner join 左流右流,创建后进入过期倒计时

SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id

left join / right join

left: 左流创建后进入过期倒计时,但是成功join一次后,就会重置过期时间

left: 右流创建后进入过期倒计时,但是成功join一次后,就会重置过期时间

SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

full join

左、右流创建后进入过期倒计时,但是成功join一次后,就会重置过期时间

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

interval join

作为DataStreamApi升级版的interval join,sql版本的支持处理时间语义和事件事件语义

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

lookup join

效果等同于cdc,但是每次过来一条数据都会去数据库进行一次查询关联、效率很差

但是可以设置缓存机制,如果用过一次后会缓存指定的时间,但是在缓存期间内就不会实时同步mysql的数据了。此时就和regular join 一样了

因此lookup join 试用场景为字典数据需要变化,但是变化的时间不需要实时变化,有点延迟也可以。

应用场景不多

关键语句

FOR SYSTEM_TIME AS OF o.proc_time

lookup.cache.max-rows

optional(none)Integer

The max number of rows of lookup cache, over this value, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details

最多缓存多少条

lookup.cache.ttl

optional(none)Duration

The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details.

缓存数据ttl

1 DAY 

1 HOUR

CREATE TEMPORARY TABLE Orders (
  id INT,
  order_id INT,
  total INT,
  proc_time as procetime()
) WITH (
  'connector' = 'kafka',
  ...
);

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers',
  'lookup.cache.max-rows' = '10',
  'lookup.cache.ttl' = '1 hour'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

Window Join

窗口join,必须对表进行TVF开窗才能使用

table(tumple(table tablegreen,descriptor(rt),interval '5' minutes))

时间类型:processTime、eventTime

INNER/LEFT/RIGHT/FULL OUTER 

SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/463546.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

摄像头录像软件哪款好用?这款就很不错

案例&#xff1a;录屏时怎么录电脑摄像头&#xff1f; 【很多时候我不仅仅需要录制电脑屏幕&#xff0c;还需要在录屏时录入人像&#xff0c;可是我的录屏软件不支持录制人脸。有没有好用的电脑录屏带人脸的屏幕录制工具介绍&#xff1f;】 无论是教学、游戏直播、软件开发、…

HTML + CSS + JS 利用邮编查询 API 实现邮编查询工具

引言 邮政编码是地址信息的重要组成部分&#xff0c;可以帮助快递公司、物流公司等对地址进行快速、准确的识别和派送。因此&#xff0c;邮编查询工具应用在许多业务场景中都有广泛的应用&#xff0c;例如&#xff1a;电商平台、物流公司、金融机构等。通过使用邮编查询 API&a…

fiddler在请求头添加指定信息

下图所示&#xff0c;设置完请求头信息后&#xff0c;点击右上角Actions按钮的Run Filterset now保存信息&#xff0c;即可。

centos7部署FastDFS服务

一、安装需要的相关依赖 yum -y install make cmake gcc gcc-c 因为我的服务器已经安装了gcc&#xff0c;所以略去 使用gcc -v查看版本 yum -y install zip unzip 安装性能事件通知库 yum -y install libevent 安装nginx依赖 yum -y install libevent yum -y install zli…

MITA触摸屏维修WP4053米塔工控机控制屏维修

MITA-TEKNIK米塔触摸屏维修工控机工控屏控制器维修DISPLAY 2COM全系列型号 Mita-Teknik触摸屏维修常见故障&#xff1a;上电无显示&#xff0c;运行报故障&#xff0c;无法与电脑通讯&#xff0c;触摸无反应&#xff0c;触控板破裂&#xff0c;触摸玻璃&#xff0c;上电黑屏&a…

自动化工具 接口自动化测试引擎

一、前言&#xff1a; 1、解决痛点&#xff1a;接口自动化测试用例需要人去开发、去维护。 2、实现第一性原理&#xff1a;根据定义的测试策略自动生成接口测试用例。 二、引擎优势&#xff1a; 1、提升人效&#xff1a;降低传统方式中接口测试开发与维护的工作量。 2、覆盖更…

输电线路在线监拍设备的国网协议对接方案

疫情过后&#xff0c;尤其山东淄博电力建设的发展&#xff0c;电网规模的不断研发&#xff0c;高压远距离架空输电线路日益增多&#xff0c;在复杂地形条件下的电网建设和设备维护工作也越来越多。 如何解决4G模块控制的功耗&#xff0c;和信号传输方面有一套完整的方案。基于合…

【网络安全】红队基础免杀

引言 本文主要介绍“反射型 dll 注入”及“柔性加载”技术。 反射型 dll 注入 为什么需要反射型 dll 注入 常规的 dll 注入代码如下&#xff1a; int main(int argc, char *argv[]) {HANDLE processHandle;PVOID remoteBuffer;wchar_t dllPath[] TEXT("C:\\experime…

从源码全面解析LinkedBlockingQueue的来龙去脉

一、引言 并发编程在互联网技术使用如此广泛&#xff0c;几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360 的刁难。 二、使用 对于阻塞队列&#xff0c;想必大家应该都不陌生&#xff0c;我们这里简单的介绍一下&#xff0c;对于 Java 里面的阻塞…

【 SpringBoot 统⼀功能处理 】

文章目录 引言一、⽤户登录权限效验Spring 拦截器拦截器实现原理扩展&#xff1a;统⼀访问前缀添加 二、统⼀异常处理三、统⼀数据返回格式四、ControllerAdvice 源码分析 引言 接下来是 Spring Boot 统⼀功能处理模块&#xff0c;是 AOP 的实战环节&#xff0c;要实现的课程⽬…

轨道交通信号系统的可靠性与安全性

01.引言 城市轨道交通系统作为大容量公共交通工具&#xff0c;其安全性直接关系到广大乘客的生命安全&#xff0c;所以要求城市轨道交通系统在如此高的运行密度下&#xff0c;还要保证安全和高效率的运行。而信号系统作为保证列车安全、正点、便捷、舒适、高密度不间断运行的重…

Filter 过滤器基本内容及案例改进

举个例子 假设在Web资源中&#xff0c;A资源要写5行代码&#xff0c;而B资源也要写一模一样的5行代码&#xff0c;这时就把这些代码都提取出来&#xff0c; 在过滤器里写这些代码&#xff0c;因为访问任何资源都要经过过滤器&#xff0c;在过滤器走一遍就可以&#xff0c;而不用…

性能优化之20个 Linux 服务器性能调优技巧

Linux是一种开源操作系统&#xff0c;它支持各种硬件平台&#xff0c;Linux服务器全球知名&#xff0c;它和Windows之间最主要的差异在于&#xff0c;Linux服务器默认情况下一般不提供GUI(图形用户界面)&#xff0c;而是命令行界面&#xff0c;它的主要目的是高效处理非交互式进…

【22】核心易中期刊推荐——人工智能与识别图像处理与应用

🚀🚀🚀NEW!!!核心易中期刊推荐栏目来啦 ~ 📚🍀 核心期刊在国内的应用范围非常广,核心期刊发表论文是国内很多作者晋升的硬性要求,并且在国内属于顶尖论文发表,具有很高的学术价值。在中文核心目录体系中,权威代表有CSSCI、CSCD和北大核心。其中,中文期刊的数…

网络编程代码实例:多进程版

文章目录 前言代码仓库内容代码&#xff08;有详细注释&#xff09;server.cclient.cMakefile 结果总结参考资料作者的话 前言 网络编程代码实例&#xff1a;多进程版。 代码仓库 yezhening/Environment-and-network-programming-examples: 环境和网络编程实例 (github.com)E…

商品如果要在美国商超出售,那么如何申请美国条形码呢?

美国条码注册是指向美国条码协会提出条码申请&#xff0c;通过条码协会的审核批准后&#xff0c;条码可以印在产品上使用。条码是进入各大商场&#xff0c;超市的身份证&#xff0c;企业有条形码&#xff0c;一是可以提高企业产品的知名度&#xff1b;二是增加产品的防伪力度&a…

TypeScript与JavaScript

目录 一、什么是javascript 二、TypeScript&#xff1a;静态类型检查器 1、类型化的 JavaScript 超集 1.1、句法 1.2、类型 1.3、运行时行为 1.4、擦除类型 2、学习 JavaScript 还是 TypeScript 一、什么是javascript JavaScript&#xff08;也称为 ECMAScript&#xf…

为何电商这么难做…...你是否忽略了这个问题?

物流时效是影响买家体验的重要环节&#xff0c;物流服务优劣也是买家网上购物时的重要参考依据。但电商企业对于快递公司的时效承诺、服务质量基本处于被动接受的状况&#xff0c;直到买家投诉才知道快递公司服务缺失&#xff0c;若买家不投诉也没法主动知道大量的订单是否按约…

Notes/Domino 11.0.1FP7以及在NAS上安装Domino等

大家好&#xff0c;才是真的好。 目前HCL在还是支持更新的Notes/Domino主要是三个版本&#xff0c;V10、11和12&#xff0c;这不,上周HCL Notes/Domino 11.0.1居然推出了FP7补丁包程序。 从V10.0.1开始&#xff0c;Domino的FP补丁包程序主要是用来修复对应主要版本中的一些问…

TCP FACK 与 RACK

3 个 dupacks 触发 fast retransmit 是一个经典启发算法&#xff0c;但在引入 SACK 之后仍然计数 SACKed 数量 > 3 触发 fast retransmit 似乎就没理由了。即使把 reordering 算进去&#xff0c;一个距离 una 很远的 seg 被 SACKed&#xff0c;也足以判定丢包了&#xff0c;…