大数据(9h)FlinkSQL双流JOIN、Lookup Join

news2024/12/25 9:24:58

文章目录

  • 1、环境
  • 2、Temporal Joins
    • 2.1、基于处理时间(重点)
      • 2.1.1、设置状态保留时间
    • 2.2、基于事件时间
  • 3、Lookup Join(重点)
  • 4、Interval Joins(基于间隔JOIN)

重点是Lookup JoinProcessing Time Temporal Join,其它随意

1、环境

WIN10+IDEA2021+JDK1.8+本地MySQL8

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
    <fastjson.version>2.0.19</fastjson.version>
    <lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <!-- JSON解析 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <!-- 简化JavaBean书写 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>
</dependencies>

2、Temporal Joins

2.1、基于处理时间(重点)

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Hi {
    public static void main(String[] args) {
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建流式表执行环境
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //双流
        DataStreamSource<Tuple2<String, Integer>> d1 = env.fromElements(
                Tuple2.of("a", 2),
                Tuple2.of("b", 3));
        DataStreamSource<P> d2 = env.fromElements(
                new P("a", 4000L),
                new P("b", 5000L));
        //创建临时视图
        tbEnv.createTemporaryView("v1", d1);
        tbEnv.createTemporaryView("v2", d2);
        //双流JOIN
        tbEnv.sqlQuery("SELECT * FROM v1 LEFT JOIN v2 ON v1.f0=v2.pid").execute().print();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class P {
        private String pid;
        private Long timestamp;
    }
}

结果

+----+-------+-------+-------------+-------------+
| op |    f0 |    f1 |         pid |   timestamp |
+----+-------+-------+-------------+-------------+
| +I |     a |     2 |      (NULL) |      (NULL) |
| -D |     a |     2 |      (NULL) |      (NULL) |
| +I |     a |     2 |           a |        4000 |
| +I |     b |     3 |      (NULL) |      (NULL) |
| -D |     b |     3 |      (NULL) |      (NULL) |
| +I |     b |     3 |           b |        5000 |
+----+-------+-------+-------------+-------------+

2.1.1、设置状态保留时间

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.util.Scanner;

public class Hi {
    public static void main(String[] args) {
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建流式表执行环境
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //设置状态保留时间
        tbEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5L));
        //双流
        DataStreamSource<Tuple2<String, Long>> d1 = env.addSource(new AutomatedSource());
        DataStreamSource<String> d2 = env.addSource(new ManualSource());
        //创建临时视图
        tbEnv.createTemporaryView("v1", d1);
        tbEnv.createTemporaryView("v2", d2);
        //双流JOIN
        tbEnv.sqlQuery("SELECT * FROM v1 INNER JOIN v2 ON v1.f0=v2.f0").execute().print();
    }

    /** 手动输入的数据源(请输入a或b进行测试) */
    public static class ManualSource implements SourceFunction<String> {
        public ManualSource() {}

        @Override
        public void run(SourceFunction.SourceContext<String> sc) {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) {break;}
                if (!str.equals("")) {sc.collect(str);}
            }
            scanner.close();
        }

        @Override
        public void cancel() {}
    }

    /** 自动输入的数据源 */
    public static class AutomatedSource implements SourceFunction<Tuple2<String, Long>> {
        public AutomatedSource() {}

        @Override
        public void run(SourceFunction.SourceContext<Tuple2<String, Long>> sc) throws InterruptedException {
            for (long i = 0L; i < 999L; i++) {
                Thread.sleep(1000L);
                sc.collect(Tuple2.of("a", i));
                sc.collect(Tuple2.of("b", i));
            }
        }

        @Override
        public void cancel() {}
    }
}

测试结果

2.2、基于事件时间

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Hello {
    public static void main(String[] args) {
        //创建流和表的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //创建数据流,设定水位线
        tbEnv.executeSql("CREATE TABLE v1 (" +
                "  x STRING PRIMARY KEY," +
                "  y BIGINT," +
                "  ts AS to_timestamp(from_unixtime(y,'yyyy-MM-dd HH:mm:ss'))," +
                "  watermark FOR ts AS ts - INTERVAL '2' SECOND" +
                ") WITH (" +
                "  'connector'='filesystem'," +
                "  'path'='src/main/resources/a.csv'," +
                "  'format'='csv'" +
                ")");
        tbEnv.executeSql("CREATE TABLE v2 (" +
                "  x STRING PRIMARY KEY," +
                "  y BIGINT," +
                "  ts AS to_timestamp(from_unixtime(y,'yyyy-MM-dd HH:mm:ss'))," +
                "  watermark FOR ts AS ts - INTERVAL '2' SECOND" +
                ") WITH (" +
                "  'connector'='filesystem'," +
                "  'path'='src/main/resources/b.csv'," +
                "  'format'='csv'" +
                ")");
        //执行查询
        tbEnv.sqlQuery("SELECT * " +
                "FROM v1 " +
                "LEFT JOIN v2 FOR SYSTEM_TIME AS OF v1.ts " +
                "ON v1.x = v2.x"
        ).execute().print();
    }
}

打印结果

+----+---+------------+-------------------------+--------+------------+-------------------------+
| op | x |          y |                      ts |     x0 |         y0 |                     ts0 |
+----+---+------------+-------------------------+--------+------------+-------------------------+
| +I | a | 1666540800 | 2022-10-24 00:00:00.000 | (NULL) |     (NULL) |                  (NULL) |
| +I | b | 1666540803 | 2022-10-24 00:00:03.000 |      b | 1666540802 | 2022-10-24 00:00:02.000 |
| +I | c | 1666540806 | 2022-10-24 00:00:06.000 |      c | 1666540803 | 2022-10-24 00:00:03.000 |
+----+---+------------+-------------------------+--------+------------+-------------------------+

3、Lookup Join(重点)

Lookup Join是基于Processing Time Temporal Join

详细链接:https://yellow520.blog.csdn.net/article/details/128070761

4、Interval Joins(基于间隔JOIN)

SQL

SELECT * FROM v1
LEFT JOIN v2 ON v1.x=v2.x AND
v1.y BETWEEN (v2.y - INTERVAL '2' SECOND) AND (v2.y + INTERVAL '1' SECOND)

Java

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Scanner;

import static org.apache.flink.table.api.Expressions.$;

public class Hi {
    public static void main(String[] args) {
        //创建流和表的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //创建数据流
        DataStreamSource<String> d1 = env.addSource(new ManualSource());
        DataStreamSource<String> d2 = env.addSource(new AutomatedSource());
        //创建动态表,并声明一个额外的字段来作为处理时间字段
        Table t1 = tbEnv.fromDataStream(d1, $("x"), $("y").proctime());
        Table t2 = tbEnv.fromDataStream(d2, $("x"), $("y").proctime());
        //创建临时视图
        tbEnv.createTemporaryView("v1", t1);
        tbEnv.createTemporaryView("v2", t2);
        //执行查询
        tbEnv.sqlQuery("SELECT * FROM v1 LEFT JOIN v2 ON v1.x=v2.x AND " +
                "v1.y BETWEEN (v2.y - INTERVAL '2' SECOND) AND (v2.y + INTERVAL '1' SECOND)"
        ).execute().print();
    }

    /** 手动输入的数据源 */
    public static class ManualSource implements SourceFunction<String> {
        public ManualSource() {}

        @Override
        public void run(SourceFunction.SourceContext<String> sc) {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) {break;}
                if (!str.equals("")) {sc.collect(str);}
            }
            scanner.close();
        }

        @Override
        public void cancel() {}
    }

    /** 自动输入的数据源 */
    public static class AutomatedSource implements SourceFunction<String> {
        public AutomatedSource() {}

        @Override
        public void run(SourceFunction.SourceContext<String> sc) throws InterruptedException {
            for (int i = 0; i < 999; i++) {
                Thread.sleep(801);
                sc.collect("a");
                sc.collect("b");
            }
        }

        @Override
        public void cancel() {}
    }
}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/46409.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot+Vue实现前后端分离的餐饮点餐系统

文末获取源码 开发语言&#xff1a;Java 使用框架&#xff1a;spring boot 前端技术&#xff1a;JavaScript、Vue.js 、css3 开发工具&#xff1a;IDEA/MyEclipse/Eclipse、Visual Studio Code 数据库&#xff1a;MySQL 5.7/8.0 数据库管理工具&#xff1a;phpstudy/Navicat JD…

线性表01- 数组与简易接口设计

线性表的定义 线性表: 具有n个相同类型元素的有限序列 n>0 线性表的元素特点是有索引, 可以通过索引快速查找到元素. a1是a2的前驱, a2是a1的后继 常见的线性表 数组链表栈队列哈希表 (散列表) 数组 数组是一种顺序存储的线性表, 所有的元素的内存地址是连续的. int arr…

Python用广义加性模型GAM进行时间序列分析

每当你发现一个与时间对应的趋势时&#xff0c;你就会看到一个时间序列。我们围绕广义加性模型GAM技术进行一些咨询&#xff0c;帮助客户解决独特的业务问题。研究金融市场表现和天气预报的事实上的选择&#xff0c;时间序列是最普遍的分析技术之一&#xff0c;因为它与时间有着…

matlab图像的运算有点运算、代数运算、逻辑运算和几何运算

1.图像的点运算 2.图像的代数运算 3.图像的逻辑运算 4.图像的几何运算 一、图像的点运算 图像的点运算&#xff1a;对图像中的每个像素值进行计算&#xff0c;从而改善图像显示效果的操作&#xff0c;常用于改变图像的灰度范围及分布&#xff0c;有时也被称为对比度增强和拉伸…

Arduino开发实例-DIY酒精浓度检测计

DIY酒精浓度检测计 在本文中,将详细介绍如何创建一个简单的酒精检测器。 它可以在各种应用领域中使用。市场上有许多先进的酒精传感器,价格合理,但我们在这里使用一些基本的微控制器来制作这个项目,如 Arduino、LED、蜂鸣器和 MQ3 酒精传感器。 1、MQ-3传感器介绍 MQ-3传…

外卖项目(项目优化2)11---读写分离

读&#xff1a;查询的操作 目录 一、Mysql主从复制 172 1.1Mysql主从复制_配置主库Master&从库Slave 173 配置&#xff1a;主库Master&#xff1a; 配置&#xff1a;从库Slave 二、读写分离案例 175 2.1背景 2.2Sharding-JDBC介绍 2.3读写分离案例---入门案例 17…

Magic Leap 2设计和开发幕后花絮

Magic Leap今年发布新款AR头显Magic Leap 2&#xff0c;相比于上一代Magic Leap 1&#xff0c;新品更专注于B端场景&#xff0c;自公布以来&#xff0c;Magic Leap不仅对公司策略、理念更加透明&#xff0c;也不断公开ML2产品设计背后的思考。相比于ML1&#xff0c;ML2的设计有…

里P7告诉你,接口测试真的很简单,有手就行

一、什么是接口测试&#xff1f; 所谓接口&#xff0c;是指同一个系统中模块与模块间的数据传递接口、前后端交互、跨系统跨平台跨数据库的对接。而接口测试&#xff0c;则是通过接口的不同情况下的输入&#xff0c;去对比输出&#xff0c;看看是否满足接口规范所规定的功能、…

windows bat批处理文件,实现某个软件的重启

bat批处理实现软件重启功能windows环境需要明确的概念按照启动文件xxx.exe去定位某个程序的Pid根据pid杀死某个进程根据exe文件启动某个软件bat示例&#xff0c;杀死软件进程并重启windows环境 我这里用的是win10企业版&#xff0c;在基础功能上和家庭版区别不大 需要明确的概…

19uec++多人游戏【基础AI导航】

首先把这一期的资源导入一下 创建一个球体类&#xff0c;继承于pawn类 为其添加静态组件 UPROPERTY(VisibleAnywhere, BlueprintReadOnly, Category "Components")class UStaticMeshComponent * MeshComponent; #include "Components/StaticMeshComponent.h&qu…

具备统一门户功能的内网即时通讯软件才是发展趋势

作为企业领导&#xff0c;我们最害怕的就是下属工作效率不高&#xff0c;没办法为企业带来价值&#xff0c;有时候并不一定是员工自身存在问题&#xff0c;“工欲善其事&#xff0c;必先利其器“正好说明了&#xff0c;如果我们有能力在线的员工加上强大的办公软件辅助&#xf…

如何线上登记版权?

问题一&#xff1a;为什么要登记版权&#xff1f; 告诉你一个行业秘密&#xff0c;其实可以不用登记版权&#xff0c;为什么&#xff1f;因为作品自完成就自动拥有版权&#xff0c;作者可以自己选择登记与否&#xff0c;不登记对版权也没有影响。这里可能有人要问了&#xff0…

xss-labs/level12

这一关首先尝试以下 <script>alert(xss)</script> 不废话 直接看源代码 很明显发现第一个输出点被转义了 根本无法通过script标签来完成弹窗 然后依然是四个隐藏表单 我们可以先试一试在不用抓包工具的前提下 我们能不能将某个隐藏表单给显示出来 构造如下 &l…

Docker的常用基础命令(详细讲解)

首选需要大家搭建好Docker环境&#xff0c;没有环境的可以查看前面的详细讲解安装Docker引擎的文章&#xff08;在CentOS上安装Docker引擎_征服bug的博客-CSDN博客&#xff09; 首先是安装好Docker 引擎 一&#xff0c;如何启动与停止引擎服务 #启动docker systemctl start do…

JAVA-GUI工具的编写-----请求篇

上节我们说到&#xff0c;我们制作了样子货的GUI&#xff0c;但是没有嵌入任何的按钮事件&#xff0c;并且上一次忘记加进去命令执行的确定按钮&#xff0c;让我们简单的回顾一下子吧 import javafx.application.Application; import javafx.collections.FXCollections; impor…

R在GIS中用ggmap地理空间数据分析

概要 做过O2O&#xff08;Online To Offline,在线离线/线上到线下&#xff09;的小伙伴知道&#xff0c;GIS数据需要具体到精准的位置(即经纬度)&#xff1b;对于连锁门店&#xff0c;使用GIS和其它的数据密集型服务遵循一个简单的逻辑&#xff1a;数据有助于企业节省开支&…

Vilatile底层逻辑总结

#### 增加volatile 使用JIT优化的时候 禁止出现语句重排 #### volatile是Java虚拟机提供的轻量级同步机制。 - 保证可见性 - 不保证原子性&#xff08;整体流程成功 整体流程失败&#xff09;如果要保证原子性-加 synchronized 或者直接使用 Automic 原子类 - 禁止指令重排&am…

VBA Regex 正则表达式应用介绍

. VBA正则表达式介绍 正则表达式或 RegEx 用于在字符串中查找特定的字符。 本文将展示一个 VBA RegEx 示例,并演示为什么在 VBA 中使用正则表达式如此强大。 正则表达式是一个比较大的话题,关于这方面的书很多。 同时也是一个让许多人感到害怕的话题,因为它的语法比较神秘和…

【论文笔记】ASYMMETRIC SELF-PLAY FOR AUTOMATIC GOAL DISCOVERY IN ROBOTIC MANIPULATION

【论文笔记】ASYMMETRIC SELF-PLAY FOR AUTOMATIC GOAL DISCOVERY IN ROBOTIC MANIPULATION ABSTRACT 【主要工作】用一个单一的、有目标条件的策略来解决许多机器人操作任务&#xff0c;包括对之前未见过的物体的操作。 【主要方法】对于目标发现过程采用非对称自我博弈方法…

Java 输入输出流简介和应用 (Java实现序列化工具类)

目录 简介 常用模板 实现序列化工具类 简介 应用场景 代码 简介 Java 流相关的类都封装在 java.io 包中&#xff0c;而且每个数据流都是一个对象。所有输入流类都是 InputStream 抽象类&#xff08;字节输入流&#xff09;和 Reader 抽象类&#xff08;字符输入流&#…