16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

news2025/1/10 23:43:22

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例:JDBC
    • 1、maven依赖(java编码依赖)
    • 2、创建 JDBC 表
      • 1)、创建jdbc表,并插入、查询
      • 2)、批量插入表数据
      • 3)、JDBC 表在时态表关联中作为维表
    • 3、连接器参数
    • 4、已弃用的配置
      • 5、特性
      • 1)、键处理
      • 2)、分区扫描
      • 3)、Lookup Cache
      • 4)、幂等写入
    • 5、JDBC Catalog
      • 1)、JDBC Catalog 的使用
      • 2)、JDBC Catalog for PostgreSQL
      • 3)、JDBC Catalog for MySQL
    • 6、数据类型映射


本文简单的介绍了flink sql读取外部系统的jdbc示例(每个示例均是验证通过的,并且具体给出了运行环境的版本)。
本文依赖环境是hadoop、kafka、mysql环境好用,如果是ha环境则需要zookeeper的环境。

一、Table & SQL Connectors 示例:JDBC

1、maven依赖(java编码依赖)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>


在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
在这里插入图片描述
驱动jar需放在flink的安装目录lib下,且需要重启服务。
本示例jar包有

flink-connector-jdbc_2.11-1.13.6.jar
mysql-connector-java-5.1.5.jar 或
mysql-connector-java-6.0.6.jar(1.17版本中使用的mysql驱动,用上面mysql驱动有异常信息)

2、创建 JDBC 表

JDBC table 可以按如下定义,以下示例中包含创建表、批量插入以及left join的维表。

1)、创建jdbc表,并插入、查询

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

-------------------具体事例----------------------------------
-- 在 Flink SQL 中注册一张 MySQL 表 'user'
CREATE TABLE Alan_JDBC_User_Table (
  id BIGINT,
  name STRING,
  age INT,
  balance DOUBLE,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.10.44:3306/test',
   'table-name' = 'user'
);

-- mysql中的数据
mysql> select * from user;
+----+-------------+------+---------+-----------------------+------------+
| id | name        | age  | balance | email                 | pwd        |
+----+-------------+------+---------+-----------------------+------------+
|  1 | aa6         |   61 |   60000 | 6@163.com             | 123456     |
|  2 | aa4         |   71 |   70000 | 7@163.com             | 7123       |
|  4 | test        | NULL |    NULL | NULL                  | NULL       |
|  5 | test2       | NULL |    NULL | NULL                  | NULL       |
|  7 | alanchanchn |   19 |     800 | alan.chan.chn@163.com | vx         |
|  8 | alanchan    |   19 |     800 | alan.chan.chn@163.com | sink mysql |
+----+-------------+------+---------+-----------------------+------------+
6 rows in set (0.00 sec)

---------在flink sql中建表并查询--------
Flink SQL> CREATE TABLE Alan_JDBC_User_Table (
>   id BIGINT,
>   name STRING,
>   age INT,
>   balance DOUBLE,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://192.168.10.44:3306/test',
>    'table-name' = 'user'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                            aa6 |          61 |                        60000.0 |
| +I |                    2 |                            aa4 |          71 |                        70000.0 |
| +I |                    4 |                           test |      (NULL) |                         (NULL) |
| +I |                    5 |                          test2 |      (NULL) |                         (NULL) |
| +I |                    7 |                    alanchanchn |          19 |                          800.0 |
| +I |                    8 |                       alanchan |          19 |                          800.0 |
+----+----------------------+--------------------------------+-------------+--------------------------------+
Received a total of 6 rows

2)、批量插入表数据

-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

---------创建数据表----------------------
CREATE TABLE source_table (
 userId INT,
 age INT,
 balance DOUBLE,
 userName STRING,
 t_insert_time AS localtimestamp,
 WATERMARK FOR t_insert_time AS t_insert_time
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.userId.kind'='sequence',
 'fields.userId.start'='1',
 'fields.userId.end'='5000',

 'fields.balance.kind'='random',
 'fields.balance.min'='1',
 'fields.balance.max'='100',

 'fields.age.min'='1',
 'fields.age.max'='1000',

 'fields.userName.length'='10'
);

-- 从另一张表 "source_table" 将数据写入到 JDBC 表中
INSERT INTO Alan_JDBC_User_Table
SELECT userId,  userName, age, balance FROM source_table;

-- 查看 JDBC 表中的数据
select * from Alan_JDBC_User_Table;

---------------flink sql中查询----------------------------------
Flink SQL> INSERT INTO Alan_JDBC_User_Table
> SELECT userId,  userName, age, balance FROM source_table;
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e91cd3c41ac20aaf8eab79f0094f9e46


Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                     728297a8d9 |         410 |                           35.0 |
| +I |                    3 |                     643c2226cd |         142 |                           80.0 |
......
-------------验证mysql中的数据是否写入,此处只查总数----------------
mysql> select count(*) from user;
+----------+
| count(*) |
+----------+
|     2005 |
+----------+
1 row in set (0.00 sec)

3)、JDBC 表在时态表关联中作为维表

-- 1、创建 JDBC 表在时态表关联中作为维表
CREATE TABLE Alan_JDBC_User_Table (
  id BIGINT,
  name STRING,
  age INT,
  balance DOUBLE,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.10.44:3306/test',
   'table-name' = 'user'
);
-----2、查询表中的数据(实际数据是之前测试的结果)   -----
Flink SQL> select * from Alan_JDBC_User_Table;
+----+----------------------+--------------------------------+-------------+--------------------------------+
| op |                   id |                           name |         age |                        balance |
+----+----------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                     728297a8d9 |         410 |                           35.0 |
| +I |                    3 |                     643c2226cd |         142 |                           80.0 |
| +I |                    4 |                     6115f11f01 |         633 |                           69.0 |
| +I |                    5 |                     044ba5fa2f |          74 |                           71.0 |
| +I |                    6 |                     98a112dc87 |         729 |                           54.0 |
| +I |                    7 |                     705326a369 |         846 |                           99.0 |
| +I |                    8 |                     532692924f |         872 |                           79.0 |
| +I |                    9 |                     b816802948 |         475 |                           67.0 |
| +I |                   10 |                     06906bebb2 |         109 |                           57.0 |
......

-----3、创建事实表,以kafka表作为代表   -----
CREATE TABLE Alan_KafkaTable_3 (
    user_id BIGINT, -- 用户id
    item_id BIGINT, -- 商品id
    action STRING,  -- 用户行为
    ts     BIGINT,  -- 用户行为发生的时间戳
    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 事件时间
    WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND  -- 在eventTime上定义watermark
) WITH (
  'connector' = 'kafka',
  'topic' = 'testtopic',
  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

-----4、发送kafka消息,同时观察事实表中的数据   -----
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic testtopic
>1,1001,"login",1692593500222
>2,1002,"p_read",1692593502242
>

Flink SQL> select * from Alan_KafkaTable_3;

+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op |              user_id |              item_id |                         action |                   ts |                proctime |              event_time |
+----+----------------------+----------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I |                    1 |                 1001 |                          login |        1692593500222 | 2023-08-22 05:33:38.830 | 2023-08-22 05:39:54.439 |
| +I |                    2 |                 1002 |                         p_read |        1692593502242 | 2023-08-22 05:33:38.833 | 2023-08-22 05:40:41.284 |
Query terminated, received a total of 2 rows



-----5、以jdbc的维表进行关联查询事实表数据-----
SELECT
  kafkamessage.user_id, 
  kafkamessage.item_id,
  kafkamessage.action,  
  jdbc_dim_table.name,
  jdbc_dim_table.age,
  jdbc_dim_table.balance
FROM Alan_KafkaTable_3 AS kafkamessage 
LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;

Flink SQL> SELECT
>   kafkamessage.user_id, 
>   kafkamessage.item_id,
>   kafkamessage.action,  
>   jdbc_dim_table.name,
>   jdbc_dim_table.age,
>   jdbc_dim_table.balance
> FROM Alan_KafkaTable_3 AS kafkamessage 
> LEFT JOIN Alan_JDBC_User_Table FOR SYSTEM_TIME AS OF kafkamessage.proctime AS jdbc_dim_table ON kafkamessage.user_id = jdbc_dim_table.id;

+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| op |              user_id |              item_id |                         action |                           name |         age |                        balance |
+----+----------------------+----------------------+--------------------------------+--------------------------------+-------------+--------------------------------+
| +I |                    1 |                 1001 |                          login |                     ead5352794 |         513 |                            4.0 |
| +I |                    2 |                 1002 |                         p_read |                     728297a8d9 |         410 |                           35.0 |


  • java
    该部分示例仅仅是以java实现创建表及查询,简单示例。
// 注册名为 “jdbcOutputTable” 的JDBC表
		String sinkDDL = "create table jdbcOutputTable (" + 
									 "id bigint not null, " + 
									 "name varchar(20) , " + 
									 "age int ,"+
									 "balance bigint,"+
									 "pwd varchar(20),"+
									 "email varchar(20) , PRIMARY KEY (id) NOT ENFORCED" +
									 ") with (" + 
									 " 'connector.type' = 'jdbc', "				+ 
									 " 'connector.url' = 'jdbc:mysql://192.168.10.44:3306/test', " + 
									 " 'connector.table' = 'user', " + 
									 " 'connector.driver' = 'com.mysql.jdbc.Driver', "				+ 
									 " 'connector.username' = 'root', " + 
									 " 'connector.password' = '123456' )";

		tenv.executeSql(sinkDDL);
		
		String sql = "SELECT *  FROM jdbcOutputTable ";
		String sql2 = "SELECT *  FROM jdbcOutputTable  where name like '%alan%'";
		Table table = tenv.sqlQuery(sql2);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();

//运行结果
(
  `id` BIGINT NOT NULL,
  `name` VARCHAR(20),
  `age` INT,
  `balance` BIGINT,
  `pwd` VARCHAR(20),
  `email` VARCHAR(20)
)


15> (true,+I[7, alanchanchn, 19, 800, vx, alan.chan.chn@163.com])
15> (true,+I[8, alanchan, 19, 800, sink mysql, alan.chan.chn@163.com])

3、连接器参数

在这里插入图片描述
在这里插入图片描述

4、已弃用的配置

这些弃用配置已经被上述的新配置代替,而且最终会被弃用。请优先考虑使用新配置。
在这里插入图片描述

5、特性

1)、键处理

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

有关 PRIMARY KEY 语法的更多详细信息,请参见 22、Flink 的table api与sql之创建表的DDL。

2)、分区扫描

为了在并行 Source task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。

scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

  • scan.partition.column:输入用于进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。

3)、Lookup Cache

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,你可以将 lookup.cache 设置为 PARTIAL 参数来启用。

lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。
默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。
当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.partial-cache.max-rows 或当行超过 lookup.partial-cache.expire-after-write 或 lookup.partial-cache.expire-after-access 指定的最大存活时间时,缓存中的行将被设置为已过期。 缓存中的记录可能不是最新的,用户可以将缓存记录超时设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。

所以要做好吞吐量和正确性之间的平衡。

默认情况下,flink 会缓存主键的空查询结果,你可以通过将 lookup.partial-cache.cache-missing-key 设置为 false 来切换行为。

4)、幂等写入

如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。

由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:
在这里插入图片描述

5、JDBC Catalog

JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。

// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);
其他的 Catalog 方法现在尚不支持。

1)、JDBC Catalog 的使用

本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。
本处描述的版本是flink 1.17,flink1.13版本只支持postgresql,在1.13版本中执行会出现如下异常:

[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Catalog for 'org.apache.flink.connector.jdbc.dialect.MySQLDialect@1bc49bc5' is not supported yet.

JDBC catalog 支持以下参数:

  • name:必填,catalog 的名称。

  • default-database:必填,默认要连接的数据库。

  • username:必填,Postgres/MySQL 账户的用户名。

  • password:必填,账户的密码。

  • base-url:必填,(不应该包含数据库名)
    对于 Postgres Catalog base-url 应为 “jdbc:postgresql://:” 的格式。
    对于 MySQL Catalog base-url 应为 “jdbc:mysql://:” 的格式。

  • sql

---需要将mysql-connector-java-6.0.6.jar、flink-connector-jdbc-3.1.0-1.17.jar放在flink的lib目录,并重启flink集群
CREATE CATALOG alan_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'test',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://192.168.10.44:3306'
);


USE CATALOG alan_catalog;
---------------------------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = '123456',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.


Flink SQL> show CATALOGS;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> use CATALOG alan_catalog;
[INFO] Execute statement succeed.

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "my_catalog";
String defaultDatabase = "mydb";
String username        = "...";
String password        = "...";
String baseUrl         = "..."

JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);

// 设置 JdbcCatalog 为会话的当前 catalog
tableEnv.useCatalog("my_catalog");
  • yaml
execution:
    ...
    current-catalog: alan_catalog  # 设置目标 JdbcCatalog 为会话的当前 catalog
    current-database: test

catalogs:
   - name:alan_catalog
     type: jdbc
     default-database: test
     username: ...
     password: ...
     base-url: ...

2)、JDBC Catalog for PostgreSQL

  • PostgreSQL 元空间映射
    除了数据库之外,postgreSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 “public”,每个 schema 可以包含多张表。 在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name 或只有 table_name,其中 schema_name 是可选的,默认值为 “public”。

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:
在这里插入图片描述
Flink 中的 Postgres 表的完整路径应该是 “..<schema.table>”。如果指定了 schema,请注意需要转义 <schema.table>。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

3)、JDBC Catalog for MySQL

  • MySQL 元空间映射
    MySQL 实例中的数据库与 MySQL Catalog 注册的 catalog 下的数据库处于同一个映射层级。一个 MySQL 实例可以拥有多个数据库,每个数据库可以包含多张表。 在 Flink 中,当查询由 MySQL catalog 注册的表时,用户可以使用 database.table_name 或只使用 table_name,其中 database 是可选的,默认值为创建 MySQL Catalog 时指定的默认数据库。

因此,Flink Catalog 和 MySQL catalog 之间的元空间映射如下:
在这里插入图片描述
Flink 中的 MySQL 表的完整路径应该是 “<catalog>.<db>.<table>”。

这里提供了一些访问 MySQL 表的例子(在版本1.17中完成):

-- 扫描 默认数据库(test)中的 'person' 表
select * from alan_catalog.test.person;
select * from test.person;
select * from person;

-- 扫描 'cdhhive' 数据库中的 'version' 表,
select * from alan_catalog.cdhhive.version;
select * from cdhhive.version;
select * from version;

---------------具体操作详见下文------------------
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.

Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = '123456',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> select * from alan_catalog.test.person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> select * from test.person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

Flink SQL> use alan_catalog.test;
[INFO] Execute statement succeed.

Flink SQL> select * from person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows



Flink SQL> select * from alan_catalog.cdhhive.version;

+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> select * from cdhhive.version;

+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

Flink SQL> use alan_catalog.cdhhive;
[INFO] Execute statement succeed.

Flink SQL> select * from version;

+----+----------------------+--------------------------------+--------------------------------+
| op |               VER_ID |                 SCHEMA_VERSION |                VERSION_COMMENT |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    1 |                          2.1.1 |     Hive release version 2.1.1 |
+----+----------------------+--------------------------------+--------------------------------+
Received a total of 1 row

6、数据类型映射

Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。

在这里插入图片描述
在这里插入图片描述
以上,简单的介绍了flink sql读取外部系统的jdbc示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/919684.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用NXP GUI GUIDER生成的GUI移植到雅特力MCU平台过程详解(ST/GD/国民/极海通用)

好记性不如烂笔头&#xff0c;既然不够聪明&#xff0c;就乖乖的做笔记&#xff0c;温故而知新。 本文档用于本人对知识点的梳理和记录 一、前言 上一篇我们有介绍NXP GUI Guider工具如何制作和调试GUI&#xff0c;GUI神器 NXP GUI GUIDER开发工具入门教程https://blog.csdn.n…

【云计算】Docker特别版——前端一篇学会

docker学习 文章目录 一、下载安装docker&#xff08;一&#xff09;Windows桌面应用安装&#xff08;二&#xff09;Linux命令安装 二、windows注册登录docker三、Docker的常规操作(一)、基本的 Docker 命令(二)、镜像操作(三)、容器的配置(四)、登录远程仓库 四、镜像管理(一…

视频转音频mp3怎么弄?

视频转音频mp3怎么弄&#xff1f;在很多人看来&#xff0c;音频就是视频中的一部分&#xff0c;其实这时是一定道理的&#xff0c;视频是一种包含图像和有声音的多媒体文件&#xff0c;没有声音的视频是不完美的。时代发展到现在&#xff0c;短视频已经融入了我们生活的方方面面…

ElementUI中的日历组件加载无效的问题

在ElementUI中提供了一个日历组件。在某些场景下还是比较有用的。只是在使用的时候会有些下坑&#xff0c;大家要注意下。   官网提供的信息比较简介。我们在引入到项目中使用的时候可以能会出现下面的错误提示。 Unknown custom element: <el-calendar> - did you …

【Spring】Spring循环依赖

目录 什么是循环依赖问题 循环依赖具体是怎么解决的 具体的解决步骤&#xff1a; 通俗实例&#xff1a; 严谨的循环依赖解决图例 为什么使用的是三级缓存&#xff0c;二级缓存不够用吗&#xff1f; 什么是循环依赖问题 Spring的循环依赖是指在Bean之间存在相互依赖关…

【教程】手把手教你Termius去除登录并解除限制,非常简单!

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 如果不想关注我&#xff0c;但还想看本文&#xff0c;就去这里吧&#xff0c;无限制&#xff1a;【教程】手把手教你Termius去除登录并解除限制&#xff0c;非常简单&#xff01; - 小锋学长生活大爆炸 这里以Mac…

C语言:整型提升

一、什么是整型提升 C语言的整型算术运算至少是以缺省整型类型的精度来进行的。 为了达到这个精度&#xff0c;算术运算表达式中的 字符型char 和 短整型short 需要被转换为普通整型&#xff0c;这种转换成为整型提升。 二、整型提升的意义 表达式的整型运算需要在CPU相应的运算…

基于Jenkins自动打包并部署docker环境

目录 1、安装docker-ce 2、阿里云镜像加速器 3、构建tomcat 基础镜像 4、构建一个Maven项目 实验环境 操作系统 IP地址 主机名 角色 CentOS7.5 192.168.200.111 git git服务器 CentOS7.5 192.168.200.112 Jenkins git客户端 jenkins服务器 CentOS7.5 192.168…

SpringSecurity5.7.10整合Mybatisplus3.5.3.2自定义用户登录逻辑

1、SpringSecurity5.7.0案例功能概述 本案例中采用的是最新版本的springSecurity5.7.10,在就的版本中丢弃了WebSecurityConfigurerAdapter的使用,在使用上发生了很多的变化,本案例中参数变化及具体的使用方式,希望对你有所帮助。 本案例使用springboot2.7.14整合springSe…

【C++】虚函数

2023年8月23日&#xff0c;周三上午 目录 虚函数在派生类中重写虚函数纯虚函数 示例程序 虚函数 在函数返回值前面加上关键字virtual虚函数必须在类中声明&#xff0c;否则会报错“[Error] virtual outside class declaration” class Base { public:virtual void func(); /…

HDLBits-Verilog学习记录 | Verilog Language-Basics(2)

文章目录 9.Declaring wires | wire decl10. 7458 chip 9.Declaring wires | wire decl problem:Implement the following circuit. Create two intermediate wires (named anything you want) to connect the AND and OR gates together. Note that the wire that feeds the …

Easy Rules规则引擎(2-细节篇)

目录 一、序言二、规则引擎参数配置实例1、skipOnFirstAppliedRules示例(1) FizzRule(2) BuzzRule(3) FizzBuzzRule(4) onFizzBuzzRule(5) FizzBuzzRulesLauncher 2、skipOnFirstNonTriggeredRule示例3、skipOnFirstFailedRule示例 三、组合规则1、UnitRuleGroup组合规则2、Act…

Docker容器与虚拟化技术:Docker compose部署LNMP

目录 一、理论 1.LNMP架构 2.背景 3.Dockerfile部署LNMP 3.准备Nginx镜像 4.准备MySQL容器 5.准备PHP镜像 6.上传wordpress软件包 7.编写docker-compose.yml 8.构建与运行docker-compose 9.启动 wordpress 服务 10.浏览器访问 11.将运行中的 docker容器保存为 doc…

react +Antd Cascader级联选择使用接口数据渲染

1获取接口数据并将数据转换成树形数组 useEffect(() > {axios.get(/接口数据, {params: {“请求参数”},}).then((res) > {console.log(res);const getTreeData (treeData, pid) > {// 把数据转化为树型结构let tree [];let currentParentId pid || 0;for (let i …

并发式编程的相关知识--notify和wait、CompletableFuture

1.notify和wait方法的使用 1.1快递到站通知 说明&#xff1a;快递的地点在上海&#xff0c;离快递发货地相聚500km&#xff0c;每次进行改变&#xff0c;快递将会前进100km,当快递前进100km时&#xff0c;需要通知快递当前的位置&#xff0c;当快递到达目的地时&#xff0c;需…

【学习FreeRTOS】第15章——FreeRTOS队列集

1.队列集简介 一个队列只允许任务间传递的消息为同一种数据类型&#xff0c;如果需要在任务间传递不同数据类型的消息时&#xff0c;那么就可以使用队列集&#xff0c;作用&#xff1a;用于对多个队列或信号量进行“监听”&#xff0c;其中不管哪一个消息到来&#xff0c;都可…

Docker拉取并配置Grafana

Linux下安装Docker请参考&#xff1a;Linux安装Docker 安装准备 新建挂载目录 /opt/grafana/data目录&#xff0c;准备用来挂载放置grafana的数据 /opt/grafana/plugins目录&#xff0c;准备用来放置grafana的插件 /opt/grafana/config目录&#xff0c;准备用来挂载放置graf…

containerd上基于dockerfile无特权构建镜像打包工具kaniko

目录 一、kaniko是什么 二、kaniko工作原理 三、kanijo工作在Containerd上 基于serverless的考虑&#xff0c;我们选择了kaniko作为镜像打包工具&#xff0c;它是google提供了一种不需要特权就可以构建的docker镜像构建工具。 一、kaniko是什么 kaniko 是一种在容器或 Kube…

Ubuntu18.04 交叉编译curl-7.61.0

下载 官方网址是&#xff1a;curl 安装依赖库 如果需要curl支持https协议&#xff0c;需要先交叉编译 openssl,编译流程如下&#xff1a; Ubuntu18.04 交叉编译openssl-1.1.1_我是谁&#xff1f;&#xff1f;的博客-CSDN博客 解压 # 解压&#xff1a; $tar -xzvf curl-7.61.…

2023年 Java 面试八股文(25w字)

目录 一.Java 基础面试题1.Java概述Java语言有哪些特点&#xff1f;Java和C有什么关系&#xff0c;它们有什么区别&#xff1f;JVM、JRE和JDK的关系是什么&#xff1f;**什么是字节码?**采用字节码的好处是什么?Oracle JDK 和 OpenJDK 的区别是什么&#xff1f; 2.基础语法Ja…