大数据(9h)FlinkSQL之Lookup Join

news2025/1/11 21:06:40

文章目录

  • 概述
  • pom.xml
  • MySQL建表
    • 对应Flink的建表SQL
  • Lookup Join
    • FlinkSQL
    • 完整Java代码

概述

  • lookup join通常是 查询外部系统的数据 来 充实FlinkSQL的主表
    例如:事实表 关联 维度表,维度表在外部系统(如MySQL)
  • 要求:
    1个表具有处理时间属性(基于Processing Time Temporal Join语法)
    语法上,和一般JOIN比较,多了FOR SYSTEM_TIME AS OF
    另1个表由连接器(a lookup source connector)支持
  • Lookup Cache
    默认情况下,不启用Lookup Cache
    可设置lookup.cache.max-rowslookup.cache.ttl参数来启用
    启用Lookup Cache后,Flink会先查询缓存,缓存未命中才查询外部数据库
    启用缓存可加快查询速,但缓存中的记录未必是最新的
SQL参数说明
connector连接器,可以是jdbckafkafilesystem
driver数据库驱动
lookup.cache.ttlLookup Cache中每行数据 的 最大 存活时间
lookup.cache.max-rowsLookup Cache中的最大行数

当 缓存的行数>lookup.cache.max-rows 时,将清除存活时间最久的记录
缓存中的行 的存货时间 超过lookup.cache.ttl 也会被清除

pom.xml

环境:WIN10+IDEA+JDK1.8+Flink1.13+MySQL8

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
    <fastjson.version>2.0.19</fastjson.version>
    <lombok.version>1.18.24</lombok.version>
    <mysql.version>8.0.31</mysql.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 'format'='csv' -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 'format'='json' -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 'connector' = 'jdbc' -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 'driver' = 'com.mysql.cj.jdbc.Driver' -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <!-- JSON解析 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <!-- 简化JavaBean书写 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>
</dependencies>

MySQL建表

DROP DATABASE IF EXISTS db0;
CREATE DATABASE db0;
CREATE TABLE db0.tb0 (
  a VARCHAR(255) PRIMARY KEY,
  b INT(3),
  c BIGINT(5),
  d FLOAT(3,2),
  e DOUBLE(4,2),
  f DATE DEFAULT '2022-10-24',
  g TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT db0.tb0 (a,b,c,d,e) VALUES
('aa',1,11,1.11,11.11),
('bb',2,22,2.22,22.22),
('cc',3,33,3.33,33.33);
SELECT * FROM db0.tb0;

对应Flink的建表SQL

SQL

CREATE TEMPORARY TABLE temp_tb0 (
  a STRING,
  b INT,
  c BIGINT,
  d FLOAT,
  e DOUBLE,
  f DATE,
  g TIMESTAMP,
  PRIMARY KEY(a) NOT ENFORCED)
WITH (
  'lookup.cache.max-rows' = '2',
  'lookup.cache.ttl' = '30 second',
  'connector' = 'jdbc',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'url' = 'jdbc:mysql://localhost:3306/db0',
  'username' = 'root',
  'password' = '123456',
  'table-name' = 'tb0'
)

测试代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Hello {
    public static void main(String[] args) {
        //创建流和表的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //创建表,连接MySQL表
        tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb0 (\n" +
                "  a STRING,\n" +
                "  b INT,\n" +
                "  c BIGINT,\n" +
                "  d FLOAT,\n" +
                "  e DOUBLE,\n" +
                "  f DATE,\n" +
                "  g TIMESTAMP,\n" +
                "  PRIMARY KEY(a) NOT ENFORCED)\n" +
                "WITH (\n" +
                "  'lookup.cache.max-rows' = '2',\n" +
                "  'lookup.cache.ttl' = '30 second',\n" +
                "  'connector' = 'jdbc',\n" +
                "  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "  'url' = 'jdbc:mysql://localhost:3306/db0',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'table-name' = 'tb0'\n" +
                ")");
        //执行查询,打印
        tbEnv.sqlQuery("SELECT * FROM temp_tb0").execute().print();
    }
}

测试结果打印

+----+----+---+----+------+-------+------------+----------------------------+
| op |  a | b |  c |    d |     e |          f |                          g |
+----+----+---+----+------+-------+------------+----------------------------+
| +I | aa | 1 | 11 | 1.11 | 11.11 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | bb | 2 | 22 | 2.22 | 22.22 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | cc | 3 | 33 | 3.33 | 33.33 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
+----+----+---+----+------+-------+------------+----------------------------+

Lookup Join

FlinkSQL

SELECT * FROM v
JOIN t
  FOR SYSTEM_TIME AS OF v.y
  ON v.x=t.a

完整Java代码

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Scanner;

import static org.apache.flink.table.api.Expressions.$;

public class Hi {
    public static void main(String[] args) {
        //创建流和表的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env);
        //创建左表
        DataStreamSource<String> d = env.addSource(new ManualSource());
        Table tb = tbEnv.fromDataStream(d, $("x"), $("y").proctime());
        tbEnv.createTemporaryView("v", tb);
        //创建右表(维度表)
        tbEnv.executeSql("CREATE TEMPORARY TABLE t ( " +
                "  a STRING, " +
                "  b INT, " +
                "  c BIGINT, " +
                "  d FLOAT, " +
                "  e DOUBLE, " +
                "  f DATE, " +
                "  g TIMESTAMP, " +
                "  PRIMARY KEY(a) NOT ENFORCED) " +
                "WITH ( " +
                "  'lookup.cache.max-rows' = '2', " +
                "  'lookup.cache.ttl' = '30 second', " +
                "  'connector' = 'jdbc', " +
                "  'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "  'url' = 'jdbc:mysql://localhost:3306/db0', " +
                "  'username' = 'root', " +
                "  'password' = '123456', " +
                "  'table-name' = 'tb0' " +
                ")");
        //执行查询,打印
        tbEnv.sqlQuery("SELECT * FROM v " +
                "JOIN t " +
                "  FOR SYSTEM_TIME AS OF v.y " +
                "  ON v.x=t.a").execute().print();
    }

    /** 手动输入的数据源 */
    public static class ManualSource implements SourceFunction<String> {
        public ManualSource() {}

        @Override
        public void run(SourceFunction.SourceContext<String> sc) {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) {break;}
                if (!str.equals("")) {sc.collect(str);}
            }
            scanner.close();
        }

        @Override
        public void cancel() {}
    }
}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/45651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国多媒体与网络教学学报杂志社中国多媒体与网络教学学报编辑部2022年第9期目录

多媒体信息技术《中国多媒体与网络教学学报》投稿&#xff1a;cn7kantougao163.com 采油工程探索式虚拟仿真实验教学实践——以有杆抽油系统实验为例 窦祥骥 ;何岩峰 ;张少辉 ;王相 ;徐慧 ; 1-5 人体寄生虫课程网络虚拟实验环境的构建及其应用研究 周蕾;贺帅;李晓琳;席…

深入理解mysql执行的底层机制

MySql系列整体栏目 内容链接地址【一】深入理解mysql索引本质https://blog.csdn.net/zhenghuishengq/article/details/121027025【二】深入理解explain以及索引优化https://blog.csdn.net/zhenghuishengq/article/details/124552080【三】深入理解mysql事务本质https://blog.cs…

3dmax如何进行网络渲染?网渲云渲染渲染农场怎么用?

渲染本身是将3d模型转换为2d图像的一个过程&#xff0c;而网络渲染就是把3d模型放在云端进行完成&#xff0c;而本地我们只需要等待结果就好。而云渲染也就是网渲的标准称呼&#xff0c;两个是一个意思。 那怎么进行网络渲染呢&#xff1f; 首先我们需要下载网络渲染客户端&a…

03-Docker-Docker镜像的分层概念

目录 一、镜像是什么 二、UnionFS&#xff08;联合文件系统&#xff09; 三、Docker镜像加载原理 四、将容器生成为镜像Commit命令 一、镜像是什么 是一种轻量级、可执行的独立软件包&#xff0c;包含运行某个软件所需的所有内容&#xff0c;我们把应用程序和配置依赖打包好…

TextBox文本框与PasswordBox密码框水印

在开发一个软件和网页的时候&#xff0c;都会有一个功能&#xff0c;那就是登陆功能&#xff0c;有了登陆那就一定需要用户输入账号和密码&#xff0c;我们在写登陆页面都会想到使用TextBox和PasswordBox去完成这两个功能&#xff0c;但是有一个问题&#xff0c;那就是如果你使…

java EE初阶 — 线程的状态

文章目录1.状态的基本认识2.观察线程的所有状态3.线程状态和状态转移4.多线程的意义1.状态的基本认识 NEW 创建了 Thread 对象&#xff0c;但是还没调用 start&#xff08;内核里还没有创建对应的PCB&#xff09;TERMINATED 表示内核中的 PCB 已经执行完毕了&#xff0c;但是 …

zabbix监控触发器与报警动作

目录 一、环境准备 1、搭建zabbix基础环境 2、创建被监控主机 二、触发器概念 三、创建触发器 1、创建触发器步骤 2、触发器表达式 &#xff08;1&#xff09;表达式格式 &#xff08;2&#xff09;表达式函数 3、配置触发器 四、创建报警动作 1、设置邮箱服务器 …

学生选课系统

项目描述 通过项目背景的分析以及了解到现在学校面临的问题&#xff0c;特别需要一个选课管理系统保证学生信息以及各种课程成绩的准确性和实效性&#xff0c;通过利用计算机的高速计算和快速的统计分析&#xff0c;保证学生信息的最新记录。从教职工的角度老考虑&#xff0c;…

网络套接字编程(UDP协议)

文章目录预备知识socket&#xff08;网络套接字&#xff09;编程接口简单的UDP网络程序增加多用户可以互相通信预备知识 网络字节序 大端存储&#xff1a;数据的高字节内容保存在内存的低地址处&#xff0c;数据的低字节内容保存在内存的高地址处 小端存储&#xff1a;数据的高…

婚纱预订小程序开发,商家线上展示平台

婚纱代表着纯洁与忠贞&#xff0c;也是爱情永恒的见证者&#xff0c;穿上洁白的婚纱嫁给自己心爱的人是每个女生的梦想&#xff0c;婚纱对于每一个女生来说都有着重要的意义&#xff0c;所以选择一件美丽且适合的婚纱非常重要&#xff0c;因此人们在选择婚纱时会花费很多的时间…

MySQL数据库之索引

目录 一、MySQL索引简介 二、索引的作用 1、优点 2、缺点 三、创建索引的原则依据 四、索引的分类和创建 1、普通索引 2、唯一索引&#xff08;创建唯一键即创建唯一索引&#xff09; 3、主键索引&#xff08;和创建主键的方式一样&#xff09; 4、组合索引&#xff…

Python标准库之copy

1. copy标准库简介 Python 中赋值语句不复制对象&#xff0c;而是在目标和对象之间创建绑定 (bindings) 关系。对于自身可变或者包含可变项的集合对象&#xff0c;我们有时会需要生成其副本用于改变操作&#xff0c;进而避免改变原对象。 2. copy常用函数 2.1 copy.copy(x) …

R语言用ARIMA模型滑动时间窗口识别网络流量时间序列异常值

全文链接&#xff1a;http://tecdat.cn/?p30597最近我们被要求解决时间序列异常检验的问题。有客户在使用大量的时间序列。这些时间序列基本上是每10分钟进行一次的网络测量&#xff0c;其中一些是周期性的&#xff08;即带宽&#xff09;&#xff0c;而另一些则不是&#xff…

Android Jetpack Compose——一个简单的聊天界面

Jetpack Compose——聊天界面前言效果视频引入RowColumnTextImage聊天界面效果左边布局右边布局插入数据总结前言 目前声明式UI已经成为前端开发趋势&#xff0c;除了一开始的跨端开发React,Flutter等以及Web支持外&#xff0c;后续Android和IOS平台也相继推出声明式开发&…

零基础快速开发Vue图书管理系统—登录注册篇(一)

零基础快速开发Vue图书管理系统—登录注册篇&#xff08;一&#xff09; 一、图书管理系统项目功能 二、项目技术选型 前端主要采用&#xff1a;Vue3.x (vuex/vue-router)、Ant Design Vue、Axios等服务端主要采用&#xff1a;Node.js、Koa、Mongoose等数据库主要采用&#x…

Docker安装Redis集群失败经历汇总

在程序员的开发过程中&#xff0c;Redis可以说基本上是必不可少的缓存中间件。不管是二进制包还是docker安装Redis的文章在网上都是数不胜数。我之前自己玩Redis的时候基本不是二进制包安装就是docker安装&#xff0c;也没有尝试过集群方式。每次需要的时候&#xff0c;网上百度…

DataFrame转化为json的方法教程

网络上有好多的教程&#xff0c;讲得不太清楚和明白&#xff0c;我用实际的例子说明了一下内容&#xff0c;附档代码&#xff0c;方便理解和使用 DataFrame.to_json(path_or_bufNone, orientNone, date_formatNone, double_precision10, force_asciiTrue, date_unitms, defau…

考研数据结构大题整合_组一(ZYL组)

考研数据结构大题整合 目录考研数据结构大题整合一、ZYL组ZYL组一ZYL组二ZYL组三ZYL组四ZYL组五ZYL组六ZYL组七ZYL组八一、ZYL组 ZYL组一 1.一棵树有度为i的结点ni 个(i1,2,3,…m), 求叶结点的个数.&#xff08;10分&#xff09; 2、已知带权连通图G(V,E)的邻接表如图所示&am…

rubbitmq 图形界面使用 常用六种通信模式 Simple-Work-fanout-direct-topic-headers

阿里云服务器添加rubbitmq需要开启端口:rabbitmq阿里云服务器开放端口号 Rubbitmq地址: 服务器地址:15672 1.简单模式Simple 一个生产者、一个消费者&#xff0c;不需要设置交换机&#xff08;使用默认的交换机&#xff09; 2.工作队列模式Work Queue 一个生产者、多个消费者&a…

windows操作系统双网卡问题处理办法

windows操作系统双网卡问题处理办法&#xff08;详解&#xff09;一、命令说明二、处理办法1.设置外网网关为默认网关2.查看当前路由表3.删除缺省路由4.添加访问外网的缺省路由5.添加访问内网的路由信息一、命令说明 显示 IP 路由表的信息 route print显示 IP 路由表中以 192…