背景
最近在使用 flink sql (jdbc)做离线数据同步(历史数据修复),遇到一个问题,只同步几条数据的情况下,测试环境执行竟然需要30+分钟。
进一步研究,发现where条件没有下推到数据库执行,而是全表读取(排查过程详见下面的文章)。
flink sql 执行慢问题排查(flink jdbc where 条件没有下推数据库)
flink sql 源码走读 — 解释flink jdbc where 条件为什么没有下推数据库
为了支持过滤条件下推,这里提供一些解决方案。
解决办法
提供两个解决办法:
1、使用分区字段来做过滤
2、修改源码或者自定义connector
1、使用分区字段来做过滤
关于connector配置,详见官网()
我们可以使用分区扫描的功能来实现数据过滤。
举个例子:
create table mysql_test_12 (
ID STRING,
NAME STRING,
primary key(ID) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://${mysql_hosts}:${mysql_port}/sit?useSSL=false&useUnicode=true&characterEncoding=UTF-8',
'username' = '${mysql_username}',
'password' = '${mysql_pass}',
'scan.fetch-size'='1000',
'table-name' = 'test_12',
'scan.partition.column' = 'ID',
'scan.partition.num' = '1',
'scan.partition.lower-bound' = '20200604',
'scan.partition.upper-bound' = '20200604'
);
create table es_test_12 (
ID STRING,
NAME STRING,
primary key(ID) not enforced
) with (
'connector' = '${es_connector}',
'hosts' = '${es_hosts}',
'username' = '${es_username}',
'password' = '${es_pass}',
'index' = 'test_12'
);
insert into es_test_12
select
*
from mysql_test_12
;
落库执行的SQL是:
SELECT
ID
,NAME
FROMtest_12
WHEREID
BETWEEN 20200604 AND 20200604
上述方法起到数据过滤的效果,减少查询开销,加速程序执行效率,使用起来也比较简单。
但同时也会存在一些缺点:
1)、分区字段目前只支持 数字、日期、时间戳,对于字符串是不支持的;
2)、between 无法使用索引;
2)、使用起来不够灵活,比如不支持 多个过滤条件、函数使用等
2、修改源码或者自定义connector
修改源码或者自定义connector原理差不多,考虑到自定义connector拓展性更强,这里选择自定义connector的方式来解决。
下面记载开发过程,主要提供一下解决思路:
新的connector type为"jdbc-custom"。
1、首先在resource 下创建目录:META-INF/service,在该目录下创建文件org.apache.flink.table.factories.Factory
文件内容为新建的factory类:com.custom.connector.JdbcCustomDynamicTableFactory
2、为了减少代码开发,可以直接拷贝源码 JdbcDynamicTableFactory、JdbcDynamicTableSource 到com.custom.connector目录下,并修改类名为JdbcCustomDynamicTableFactory、JdbcCustomDynamicTableSource 。
JdbcCustomDynamicTableFactory的 IDENTIFIER = “jdbc” 修改为 IDENTIFIER = “jdbc_custom”;
createDynamicTableSource()方法也做下简单修改
3、在JdbcDynamicTableSource#getScanRuntimeProvider() 方法中改造SQL生成逻辑,主要修改的代码如下:
其中customOptions 是新增的配置项,定义了"filter" 选项,在 JdbcCustomDynamicTableFactory 中定义。可以参考jdbcReadOptions定义过程。
最终使用方法如下:
以上通过2种方式优化了flink sql 执行效率,使过滤条件入库执行。
在项目中,本人采用了第二种方式,效果还不错。希望以上思路对你有帮助~~