flink sql (jdbc)如何支持where 条件下推数据库

news2025/1/19 11:04:11

背景

最近在使用 flink sql (jdbc)做离线数据同步(历史数据修复),遇到一个问题,只同步几条数据的情况下,测试环境执行竟然需要30+分钟。

进一步研究,发现where条件没有下推到数据库执行,而是全表读取(排查过程详见下面的文章)。
flink sql 执行慢问题排查(flink jdbc where 条件没有下推数据库)
flink sql 源码走读 — 解释flink jdbc where 条件为什么没有下推数据库

为了支持过滤条件下推,这里提供一些解决方案。

解决办法

提供两个解决办法:
1、使用分区字段来做过滤
2、修改源码或者自定义connector

1、使用分区字段来做过滤
关于connector配置,详见官网(
我们可以使用分区扫描的功能来实现数据过滤。在这里插入图片描述
举个例子:

create table mysql_test_12 (
ID STRING,
NAME STRING,
primary key(ID) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://${mysql_hosts}:${mysql_port}/sit?useSSL=false&useUnicode=true&characterEncoding=UTF-8',
'username' = '${mysql_username}',
'password' = '${mysql_pass}',
'scan.fetch-size'='1000',
'table-name' = 'test_12',
'scan.partition.column' = 'ID',
'scan.partition.num' = '1',
'scan.partition.lower-bound' = '20200604',
'scan.partition.upper-bound' = '20200604'
);

create table es_test_12 (
ID STRING,
NAME STRING,
primary key(ID) not enforced
) with (
'connector' = '${es_connector}',
'hosts' = '${es_hosts}',
'username' = '${es_username}',
'password' = '${es_pass}',
'index' = 'test_12'
);

insert into es_test_12
select
   *
from mysql_test_12
;

落库执行的SQL是:

SELECT ID, NAME FROM test_12 WHERE ID BETWEEN 20200604 AND 20200604

上述方法起到数据过滤的效果,减少查询开销,加速程序执行效率,使用起来也比较简单。

但同时也会存在一些缺点
1)、分区字段目前只支持 数字、日期、时间戳,对于字符串是不支持的;
2)、between 无法使用索引;
2)、使用起来不够灵活,比如不支持 多个过滤条件、函数使用等

2、修改源码或者自定义connector
修改源码或者自定义connector原理差不多,考虑到自定义connector拓展性更强,这里选择自定义connector的方式来解决。
下面记载开发过程,主要提供一下解决思路:

新的connector type为"jdbc-custom"。

1、首先在resource 下创建目录:META-INF/service,在该目录下创建文件org.apache.flink.table.factories.Factory
在这里插入图片描述
文件内容为新建的factory类:com.custom.connector.JdbcCustomDynamicTableFactory

2、为了减少代码开发,可以直接拷贝源码 JdbcDynamicTableFactory、JdbcDynamicTableSource 到com.custom.connector目录下,并修改类名为JdbcCustomDynamicTableFactory、JdbcCustomDynamicTableSource 。

JdbcCustomDynamicTableFactory的 IDENTIFIER = “jdbc” 修改为 IDENTIFIER = “jdbc_custom”;

createDynamicTableSource()方法也做下简单修改
在这里插入图片描述
3、在JdbcDynamicTableSource#getScanRuntimeProvider() 方法中改造SQL生成逻辑,主要修改的代码如下:
在这里插入图片描述
其中customOptions 是新增的配置项,定义了"filter" 选项,在 JdbcCustomDynamicTableFactory 中定义。可以参考jdbcReadOptions定义过程。

最终使用方法如下:
在这里插入图片描述

以上通过2种方式优化了flink sql 执行效率,使过滤条件入库执行。

在项目中,本人采用了第二种方式,效果还不错。希望以上思路对你有帮助~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/398110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目心得--网约车

一、RESTFULPost:新增Put:全量修改Patch:修改某个值Delete: 删除Get:查询删除接口也可以用POST请求url注意:url中不要带有敏感词(用户id等)url中的名词用复数形式url设计:api.xxx.co…

分形(Fractal)及分形维数(Fractal dimension)

文章目录1. 分形介绍2. 分形的定义3. 分形维数介绍4. 历史5. 缩放的作用(Role of scaling)6. D 不是唯一描述符7. 分形表面结构8. 例子8.8 Hausdorff dimension8.8.1 直观概念8.8.2 正式定义8.8.2.1 Hausdorff dimension8.8.2.2 Hausdorff content8.8.3 …

C++ 实现一个反射类

代码环境为VScode CMake GCC 8.1.0 首先,如何才能做到给我一个名字我就能拿到这个对象的这个值,也是比较难的一个地方,方法如下 #define OFFSET(className,fieldName) (size_t)&(((className*)0)->fieldName)这个能够得到该成员变…

数据结构3——线性表2:线性表的顺序结构

顺序结构的基本理解 定义: 把逻辑上相邻的数据元素存储在物理上相邻(占用一片连续的存储单元,中间不能空出来)的存储单元的存储结构 存储位置计算: LOC(a(i1))LOC(a(i))lLOC(a(i1))LOC(a(i))l LOC(a(i1))LOC(a(i))l L…

【pyqt】win10下使用pycharm安装并配置pyqt环境,并用其进行界面绘制

目录1.PyQt简介2.PyQt安装2.PyQt中界面绘制工具集在pycharm中的配置2.主程序引入生成的界面类1.PyQt简介 PyQt是一个基于Python语言的开源图形用户界面(GUI)框架,它是Qt跨平台应用程序框架的Python绑定,可以用于创建各种不同类型…

js 拷贝

一、浅拷贝 对数据拷贝的时候只拷贝一层,深层次的只拷贝了地址 1. (1)对于数组类型,可以使用slice(start, end)方法,返回一个新的数组。 var arr1 arr.slice(0); (备注:slice 方法一直复制到…

(Fabric 超级账本学习【2】)Fabric2.4环境下部署自己编写的链码

(Fabric 超级账本学习【2】)Fabric2.4环境下部署自己编写的链码 1、前提是搭建好了Fabric 2.4(Fabric2.x)版本的区块链网络,并在以此环境下部署自己编写的链码,如下图先进入 test-network 文件夹目录下 2、…

23年PMP考试,应试技巧二十五条(含资料)

1.一定要认真阅读答案的全部四个选项。 千万不要看到某个选项是正确的,就不看其他选项,因为可能还有更正确的选项。PMP考的是选项的相对正确程度,而不是绝对正确程度,这与英语水平考试截然不同。可能四个选项都是正确或错误的&am…

Linux进程间通信详解(最全)

进程间的五种通信方式介绍 进程间通信(IPC,InterProcess Communication)是指在不同进程之间传播或交换信息。IPC的方式通常有管道(包括无名管道和命名管道)、消息队列、信号量、共享内存、Socket(套接字&a…

项目实战典型案例20——内存长期占用导致系统慢

内存长期占用导致系统慢一:背景介绍出现的问题二:思路&方案下面是对于这三个原因的解决方案1.服务启动时分配的堆内存过小2. 具有大量大对象被创建,并且没有及时被GC回收或者由于具有引用GC无法回收3.当GC之后,虽然会清理堆内…

再获认可!腾讯安全NDR获Forrester权威推荐

近日,国际权威研究机构Forrester发布最新研究报告《The Network Analysis And Visibility Landscape, Q1 2023》(以下简称“NAV报告”),从网络分析和可视化(NAV)厂商规模、产品功能、市场占有率及重点案例等…

LeetCode刷题——回溯法

文章目录[中等]全排列[中等]全排列 II[中等]组合总和[中等]组合总和 II[中等]复原 IP 地址[中等]括号生成[中等]子集[中等]单词搜索[中等]组合[中等]全排列 原题链接题解 class Solution { public:vector<vector<int>> ans;vector<int> num;int flag[10];v…

三天吃透消息队列面试八股文

本文已经收录到Github仓库&#xff0c;该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点&#xff0c;欢迎star~ Github地址&#xff1a;https://github.com/…

OSPF路由协议总结

OSPF路由协议总结一 OSPF协议的三个工作步骤二 5中OSPF协议报文三 4种网络类型&#xff0c;邻居和邻接3.1 P2P3.2 P2MP3.3 Broadcase3.4 NBMA&#xff08;非广播多路访问&#xff09;四 OSPF协议的度量方式五 LS&#xff08;链路状态&#xff09; LSA&#xff08;链路状态通告&…

JMU软件20 大数据技术复习(只写了对比18提纲的变动部分)

原博主 博客主页&#xff1a;https://xiaojujiang.blog.csdn.net/ 原博客链接&#xff1a;https://blog.csdn.net/qq_43058685/article/details/117883940 本复习提纲只适用于JMU软件工程大数据课程&#xff08;ckm授课&#xff09; 具体内容参考老师提纲的考纲&#xff0c;18和…

使用Tensorflow完成一个简单的手写数字识别

Tensorflow中文手册 介绍TensorFlow_w3cschool 模型结构图&#xff1a; 首先明确模型的输入及输出&#xff08;先不考虑batch&#xff09; 输入&#xff1a;一张手写数字图&#xff08;28x28x1像素矩阵&#xff09; 1是通道数 输出&#xff1a;预测的数字&#xff08;1x10的one…

100种思维模型之信息传递思维模型-028

人与人之间存有 认知偏差和理解偏差 &#xff0c;信息在传递过程中会 衰减、失真以及再加工 &#xff01; 信息传递思维模型 &#xff0c;一个有助于 提高信息传递质量 的思维模型。下面从三个方面进行介绍&#xff0c; 何谓信息传递思维模型、信息传递思模型生活中的运…

JVM系统优化实践(9):G1垃圾回收器

您好&#xff0c;我是湘王&#xff0c;这是我的CSDN博客&#xff0c;欢迎您来&#xff0c;欢迎您再来&#xff5e;在JDK8及其之前&#xff0c;一直用的都是ParNewCMS的组合&#xff1a;ParNew负责年轻代的垃圾回收&#xff0c;而由CMS负责老年代的垃圾回收&#xff0c;但会产生…

脑机接口科普0016——独立BCI与非独立BCI

本文禁止转载&#xff01;&#xff01;&#xff01;&#xff01; 所谓的“独立BCI”与“非独立BCI”仅仅是BCI系统中的一个术语。本章主要是介绍一下这两个术语。 这两个术语是由Wolpaw在2002年提出来的。 独立BCI是指不依赖于中枢神经系统的的输出。 非独立BCI是指那种依赖…

工作5年了,你竟然还不会应用优雅停机?

事情是这样的&#xff0c;小明是一个工作五年的老程序员&#xff0c;半秃着的头已经彰显了他深不可测的技术实力。 这一天&#xff0c;小明收到了领导给过来的一个需求。 领导对小明说&#xff1a;“小明啊&#xff0c;你工作五年了&#xff0c;这个需求我交给你一个人负责很是…