Flink之Table API SQL连接器

news2025/1/17 22:06:30

连接器

  • Table API & SQL连接器
    • 1.概述
    • 2.支持连接器
  • DataGen连接器
    • 1.概述
    • 2.SQL客户端执行
    • 3.Table API执行
  • FileSystem连接器
    • 1.创建FileSystem映射表
    • 2.创建source数据源表
    • 3.写入数据
    • 4.解决异常
    • 5.查询fileTable
    • 6.查看HDFS
  • Kafka连接器
    • 1.添加kafka连接器依赖
    • 2.重启yarn-session、sql-client
    • 3.创建Kafka映射表
    • 4.创建source数据源表
    • 5.插入Kafka表
    • 6.查询Kafka表
  • Upsert Kafka连接器
    • 1.添加kafka连接器依赖
    • 2.重启yarn-session、sql-client
    • 3.创建upsert-kafka的映射表
    • 4.创建source数据源表
    • 5.插入upsert-kafka表
    • 6.查询upsert-kafka表
  • JDBC连接器
    • 1.添加JDBC连接器依赖
    • 2.重启flink集群和sql-client
    • 3.创建JDBC表
    • 4.创建JDBC映射表
    • 5.写入数据
    • 6.查询数据
  • MongoDB连接器
    • 1.添加依赖
    • 2.示例
    • 3.验证
  • Elasticsearch连接器
    • 1.添加依赖
    • 2.示例
    • 3.验证

Table API & SQL连接器

1.概述

Flink的Table API 和 SQL 程序可以连接到其他外部系统,以读写批处理和流式表。表源提供对存储在外部系统(例如数据库、键值存储、消息队列或文件系统)中的数据的访问。表接收器将表发送到外部存储系统。根据源和接收器的类型,它们支持不同的格式,例如CSV、Avro、Parquet 或 ORC。

2.支持连接器

Flink原生支持各种连接器,以下是所有可用的连接器。

名称版本数据源数据接收器
文件系统有界和无界扫描流式接收器,批处理接收器
Elasticsearch6.x & 7.x不支持流式接收器,批处理接收器
Opensearch1.x & 2.x不支持流式接收器,批处理接收器
Apache Kafka0.10+无界扫描流式接收器,批处理接收器
Amazon DynamoDB不支持流式接收器,批处理接收器
Amazon Kinesis Data Streams无界扫描流式接收器
Amazon Kinesis Data Firehose不支持流式接收器
JDBC有界扫描,查找流式接收器,批处理接收器
Apache HBase1.4.x & 2.2.x有界扫描,查找流式接收器,批处理接收器
Apache Hive支持的版本无界扫描,有界扫描,查找流式接收器,批处理接收器
MongoDB有界扫描,查找流式接收器,批处理接收器

注意:这些连接器可以使用FlinkTable API操作,也可以使用Flink SQL客户端操作。

DataGen连接器

1.概述

DataGen 连接器允许按数据生成规则进行读取。

每个列,都有两种生成数据的方法:
1.随机生成器:

默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。

2.序列生成器:

可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。

连接器参数说明:

参数描述
connector参数必须,指定要使用的连接器,这里是 ‘datagen’
rows-per-second可选参数,默认值10000,每秒生成的行数,用以控制数据发出速率
fields.#.kind可选参数,默认值random,指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’
fields.#.min可选参数随机生成器的最小值,适用于数字类型
fields.#.max可选参数随机生成器的最大值,适用于数字类型
fields.#.length可选参数,默认值100,随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string
fields.#.start可选参数,序列生成器的起始值
fields.#.end可选参数,序列生成器的结束值

2.SQL客户端执行

在 Flink SQL客户端中创建DataGen表

Flink SQL> CREATE TABLE datagen (
 id INT,
 name STRING,
 age INT,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.f_sequence.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='50',
 'fields.age.min'='1',
 'fields.age.max'='150',
 'fields.name.length'='10'
)

查询表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|  datagen   |
+------------+
1 row in set

执行查询

Flink SQL> select * from datagen;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                     f82aac1d2e |          97 |
| +I |           2 |                     09bc2c62a6 |          75 |
| +I |           3 |                     9e3e0cca2f |         146 |
| +I |           4 |                     05bca80edc |          61 |
| +I |           5 |                     93bfca82f7 |          54 |

3.Table API执行

使用Table API操作,需要引入相关依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--负责Table API和下层DataStream API的连接支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--在本地的集成开发环境里运行Table APISQL的支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

在程序中通过Table API操作DataGen SQL连接器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建表
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                " id INT,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='50',\n" +
                " 'fields.age.min'='1',\n" +
                " 'fields.age.max'='150',\n" +
                " 'fields.name.length'='10'\n" +
                ");");

        // 执行查询
        tableEnv.executeSql("show tables;").print();
        Table select = tableEnv.from("datagen").select($("id"), $("name"), $("age"));
        // 打印
        select.execute().print();
    }

控制台执行结果如下:

+------------+
| table name |
+------------+
|    datagen |
+------------+
1 row in set
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                     f82aac1d2e |          97 |
| +I |           2 |                     09bc2c62a6 |          75 |
| +I |           3 |                     9e3e0cca2f |         146 |
| +I |           4 |                     05bca80edc |          61 |

FileSystem连接器

文件系统连接器,不需要添加额外的依赖。相应的 jar 包可以在 Flink 工程项目的 /lib 目录下找到。从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format。

1.创建FileSystem映射表

文件系统连接器允许从本地或分布式文件系统进行读写。文件系统表可以定义为:

CREATE TABLE fileTable( id int, name string, age int )
WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://node01:9000/flink/fileTable',
  'format' = 'json'
);

2.创建source数据源表

CREATE TABLE datagen (
 id INT,
 name  string,
 age  INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.id.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='1000',
 'fields.age.min'='1',
 'fields.age.max'='100',
 'fields.name.length'='10'
);

3.写入数据

Flink SQL> insert into fileTable select * from datagen;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory

4.解决异常

在执行插入操作出现如下异常:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory

解决方案有两种:

1.由于flink/lib目录下存在flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar导致,因此删除该Jar包

 mv lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar  lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar.back

重启Flink与SQL客户端,然后执行

Flink SQL> insert into fileTable select * from datagen;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c449c029599125c92da98514b512b0de

2.更换flink/lib目录下的flink-table-planner-loader-1.17.0.jar

通常会使用flink-sql-connector-hive连接器,所以不可能任意删除的,故先恢复

 mv lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar.back  lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar

注意:只有在使用Hive方言或HiveServer2时需要移动Jar操作,是Hive集成的推荐设置。

[root@node01 flink]# mv lib/flink-table-planner-loader-1.17.0.jar opt/

[root@node01 flink]# mv opt/flink-table-planner_2.12-1.17.0.jar lib/

重启Flink与SQL客户端,然后执行

Flink SQL> insert into fileTable select * from datagen;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7a2bc2fcb7407cec33530fb086f4242f

5.查询fileTable


Flink SQL> select * from fileTable where id=1;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                     6a296add45 |           8 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row

6.查看HDFS

在这里插入图片描述

Kafka连接器

Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

1.添加kafka连接器依赖

下载连接器:flink-sql-connector-kafka

将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录

cp ./flink-sql-connector-kafka-1.17.0.jar /usr/local/program/flink/lib

2.重启yarn-session、sql-client

bin/start-cluster.sh

bin/sql-client.sh

3.创建Kafka映射表

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  --当列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读 
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'testTopic',
  'properties.bootstrap.servers' = 'node01:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  --消息体时使用的格式
  'format' = 'json',
  --fixed为flink实现的分区器,一个并行度只写往kafka一个分区
  'sink.partitioner' = 'fixed'
);

查询表信息

Flink SQL> show tables;
+------------+
| table name |
+------------+
| KafkaTable |
+------------+
1 row in set

Flink SQL> desc KafkaTable;
+-----------+--------------+------+-----+-------------------------------+-----------+
|      name |         type | null | key |                        extras | watermark |
+-----------+--------------+------+-----+-------------------------------+-----------+
|   user_id |       BIGINT | TRUE |     |                               |           |
|   item_id |       BIGINT | TRUE |     |                               |           |
|  behavior |       STRING | TRUE |     |                               |           |
|        ts | TIMESTAMP(3) | TRUE |     |     METADATA FROM 'timestamp' |           |
| partition |       BIGINT | TRUE |     |              METADATA VIRTUAL |           |
|    offset |       BIGINT | TRUE |     |              METADATA VIRTUAL |           |
+-----------+--------------+------+-----+-------------------------------+-----------+
7 rows in set

4.创建source数据源表

CREATE TABLE datagen (
 user_id  INT,
 item_id  INT,
 behavior STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.user_id.kind'='sequence',
 'fields.user_id.start'='1',
 'fields.user_id.end'='1000',
 'fields.item_id.min'='1',
 'fields.item_id.max'='1000',
 'fields.behavior.length'='10'
);

5.插入Kafka表

Flink SQL> insert into KafkaTable(user_id,item_id,behavior) select * from datagen;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e4679f9e14823079d2ce355d592cae93

6.查询Kafka表

Flink SQL> select * from KafkaTable;
+----+----------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+
| op |              user_id |              item_id |                       behavior |                      ts |            partition |               offset |
+----+----------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+
| +I |                    1 |                   92 |                     c3baec3158 | 2023-07-11 23:22:45.564 |                    0 |                    0 |
| +I |                    2 |                  516 |                     804b7f09a2 | 2023-07-11 23:22:45.565 |                    0 |                    1 |
| +I |                    3 |                  784 |                     9940556819 | 2023-07-11 23:22:45.566 |                    0 |                    2 |
| +I |                    4 |                   62 |                     053ec345db | 2023-07-11 23:22:45.566 |                    0 |                    3 |
| +I |                    5 |                  375 |                     b4aa55a998 | 2023-07-11 23:22:45.566 |                    0 |                    4 |
| +I |                    6 |                  507 |                     b31794a773 | 2023-07-11 23:22:45.566 |                    0 |                    5 |

Upsert Kafka连接器

Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

1.作为source

upsert-kafka连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。数据记录中的 value被解释为同一 key的最后一个value的UPDATE,如果不存在相应的key,则该更新被视为INSERT。

changelog流中的数据记录被解释为UPSERT,也称为INSERT/UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE 消息。

2.作为sink

upsert-kafka连接器可以消费changelog流。它会将 INSERT/UPDATE_AFTER数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。

Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

1.添加kafka连接器依赖

下载连接器:flink-sql-connector-kafka

将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录

cp ./flink-sql-connector-kafka-1.17.0.jar /usr/local/program/flink/lib

2.重启yarn-session、sql-client

bin/start-cluster.sh

bin/sql-client.sh

3.创建upsert-kafka的映射表

注意:必须定义主键

CREATE TABLE kafka( 
    user_id int , 
    item_id int ,
    behavior STRING,
    primary key (item_id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'node01:9092',
  'topic' = 'upsertTopic',
  'key.format' = 'json',
  'value.format' = 'json'
)

4.创建source数据源表

CREATE TABLE datagen (
 user_id  INT,
 item_id  INT,
 behavior STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.user_id.kind'='sequence',
 'fields.user_id.start'='1',
 'fields.user_id.end'='1000',
 'fields.item_id.min'='1',
 'fields.item_id.max'='1000',
 'fields.behavior.length'='10'
);

5.插入upsert-kafka表

注意:当使用 GROUP BY 子句时,SELECT 子句中可以出现的列只能是分组键列或聚合函数应用的列。

insert into kafka 
select sum(user_id) as user_id, item_id, behavior 
from datagen 
group by item_id, behavior;

6.查询upsert-kafka表

upsert-kafka无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过-U+U+I等符号来显示数据的变化过程。


Flink SQL> select * from kafka;
+----+-------------+-------------+--------------------------------+
| op |     user_id |     item_id |                       behavior |
+----+-------------+-------------+--------------------------------+
| +I |          73 |         362 |                     bc998a7d48 |
| +I |          74 |         945 |                     2237c26098 |
| +I |          75 |         755 |                     2537957698 |
| -U |          70 |         604 |                     6dce4c7081 |
| +U |          76 |         604 |                     288d5d98f7 |
| +I |          77 |         228 |                     a6b76e7a0d |
| +I |          78 |         568 |                     5dcd3a80f3 |
| -U |          65 |         623 |                     9e2c52dec9 |
| +U |          79 |         623 |                     ba177640f7 |
| +I |          80 |         699 |                     9fa8198fbd |
| +I |          81 |         489 |                     2061468e63 |
| +I |          82 |         124 |                     8d2405e54c |
| +I |          83 |         115 |                     34d418f37c |

JDBC连接器

JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。

Flink将数据写入外部数据库时,如果使用DDL中定义的主键,则连接器以upsert模式与外部系统交换 UPDATE/DELETE消息,否则连接器以append模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

在upsert模式下,Flink会根据主键插入新行或更新现有行,这样可以保证幂等性。

在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。

为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。

这里使用MySQL JDBC连接器为例。

1.添加JDBC连接器依赖

下载:flink-connector-jdbc

下载:MySQL驱动包:mysql-connector-j

上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下

cp ./flink-connector-jdbc-3.1.0-1.17.jar /usr/local/program/flink/lib

cp ./mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib

2.重启flink集群和sql-client

bin/start-cluster.sh

bin/sql-client.sh

3.创建JDBC表

在MySQL中的demo数据库中创建表

CREATE TABLE `flinkTable` (
  `id` bigint(20) NOT NULL,
  `name` varchar(20) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

4.创建JDBC映射表

在 Flink SQL 中注册一张 MySQL 表

CREATE TABLE mysqlTable(
  id BIGINT,
  name STRING,
  age INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://node01:3306/demo?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = '123456',
	-- 连接到 JDBC 表的名称
    'table-name' = 'flinkTable',
    -- 最大重试超时时间,以秒为单位且不应该小于 1 秒
    'connection.max-retry-timeout' = '60s',
    --维表缓存的最大行数,若超过该值,则最老的行记录将会过期。
    'sink.buffer-flush.max-rows' = '500',
    --flush 间隔时间,超过该时间后异步线程将 flush 数据。
    'sink.buffer-flush.interval' = '2s',
    --查询数据库失败的最大重试次数
    'sink.max-retries' = '3',
    --用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度
    'sink.parallelism' = '1'
);

5.写入数据

Flink SQL> insert into mysqlTable values(1,'flink',20);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 78eb69d8f16fd239bbb24ea4e13aac5c

6.查询数据

Flink SQL> select * from mysqlTable ;
+----+----------------------+--------------------------------+-------------+
| op |                   id |                           name |         age |
+----+----------------------+--------------------------------+-------------+
| +I |                    1 |                          flink |          20 |
+----+----------------------+--------------------------------+-------------+
Received a total of 1 row

MongoDB连接器

MongoDB 连接器提供了从 MongoDB 中读取和写入数据的能力。

连接器可以在 upsert 模式下运行,使用在 DDL 上定义的主键与外部系统交换 UPDATE/DELETE 消息

如果 DDL 上没有定义主键,则连接器只能以 append 模式与外部系统交换 INSERT 消息且不支持消费 UPDATE/DELETE 消息

1.添加依赖

使用Table API操作,需要额外引入MongoDB连接器依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.0.1-1.17</version>
        </dependency>

2.示例

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建source数据源表
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                " id STRING,\n" +
                " name STRING,\n" +
                " age INT \n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='10',\n" +
                " 'fields.age.min'='1',\n" +
                " 'fields.age.max'='150',\n" +
                " 'fields.name.length'='10'\n" +
                ");");

		// 创建mongodb映射表
        tableEnv.executeSql("CREATE TABLE tb_mongodb (\n" +
                "  _id STRING,\n" +
                "  name STRING,\n" +
                "  age INT,\n" +
                "  PRIMARY KEY (_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'mongodb',\n" +
                "   'uri' = 'mongodb://IP:27017',\n" +
                "   'database' = 'my_db',\n" +
                "   'collection' = 'users'\n" +
                ");");

        // 执行查询
        tableEnv.executeSql("show tables;").print();

        tableEnv.executeSql(" insert into tb_mongodb select * from datagen;").print();
        Table tb_mongodb = tableEnv.from("tb_mongodb").select($("_id"), $("name"), $("age"));
        // 打印
        tb_mongodb.execute().print();
    }

3.验证

查看MongoDB
在这里插入图片描述

Elasticsearch连接器

Elasticsearch连接器允许将数据写入到 Elasticsearch 引擎的索引中。

连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息

如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息

1.添加依赖

有2个版本

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6</artifactId>
  <version>3.0.1-1.17</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7</artifactId>
  <version>3.0.1-1.17</version>
</dependency>

注意:需要引入flink-json,否则将出现如下异常

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath.
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

2.示例

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建表
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                " id STRING,\n" +
                " name STRING,\n" +
                " age INT \n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='10',\n" +
                " 'fields.age.min'='1',\n" +
                " 'fields.age.max'='150',\n" +
                " 'fields.name.length'='10'\n" +
                ");");


        tableEnv.executeSql("CREATE TABLE tb_es (\n" +
                "  id STRING,\n" +
                "  name STRING,\n" +
                "  age BIGINT,\n" +
                "  PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'elasticsearch-7',\n" +
                "  'hosts' = 'http://IP:9200',\n" +
                "  'index' = 'users'\n" +
                ");");

        // 执行查询
        tableEnv.executeSql("show tables;").print();
		// 插入数据
        tableEnv.executeSql(" insert into tb_es select * from datagen;").print();
    }

注意:由于es的表不支持source,故不能查询,查询会报如下错误

Caused by: org.apache.flink.table.api.ValidationException: Connector 'elasticsearch-7' can only be used as a sink. It cannot be used as a source.

3.验证

使用ElasticSearch Head查看连接查看

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1197269.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微软和Red Hat合体:帮助企业更方便部署容器

早在2015年&#xff0c;微软就已经和Red Hat达成合作共同为企业市场开发基于云端的解决方案。时隔两年双方在企业市场的多个方面开展更紧密的合作&#xff0c;今天两家公司再次宣布帮助企业更方便地部署容器。 双方所开展的合作包括在微软Azure上部署Red Hat OpenShift&#xf…

实战Leetcode(四)

Practice makes perfect&#xff01; 实战一&#xff1a; 这个题由于我们不知道两个链表的长度我们也不知道它是否有相交的节点&#xff0c;所以我们的方法是先求出两个链表的长度&#xff0c;长度长的先走相差的步数&#xff0c;使得两个链表处于同一起点&#xff0c;两个链…

运筹说 第102期 | 非线性规划—制约函数法

通过上期学习&#xff0c;大家已经了解了非线性规划中约束极值问题的最优性条件。本期小编将为大家介绍约束极值问题的求解方法&#xff1a;制约函数法&#xff0c;包括概念以及最基本的两种制约函数法&#xff1a;罚函数法、障碍函数法等内容。 制约函数法是通过构造某种制约函…

tomcat下载与使用教程

1. tomcat下载 官网&#xff1a;https://tomcat.apache.org/ 镜像地址&#xff1a;https://mirrors.huaweicloud.com/apache/tomcat/ 1、选择一个版本下载&#xff0c;官网下载速度缓慢&#xff0c;推荐镜像 2、对压缩包进行解压&#xff0c;无需进行安装&#xff0c;解压放…

PyTorch技术和深度学习——三、深度学习快速入门

文章目录 1.线性回归1&#xff09;介绍2&#xff09;加载自由泳冠军数据集3&#xff09;从0开始实现线性回归模型4&#xff09;使用自动求导训练线性回归模型5&#xff09;使用优化器训练线性回归模型 2.使用torch.nn模块构建线性回归模型1&#xff09;使用torch.nn.Linear训练…

智能指针,c++11,单例,类型转换

c11 unique_ptr 防拷贝 shared_ptr / weak_ptr: 引用计数,支持拷贝 面试 手写shared_ptr 各种ptr的特性对比, 不会问定制删除器和weak_ptr,但是问shared_ptr时,可以往这边延展. 单例 保证一写数据在一个进程中,只有一份,并且方便访问修改. 饿汉模式 在main函数之前就创…

Java中的多态究竟是什么?

目录 一.概念二.使用条件三.重写1.概念2.使用条件3.与重载对比4.举例5.为什么需要重写1.重写规则 2.静态绑定--重载3.动态绑定--重写 四.向上转型第一种传参方式&#xff1a;直接赋值第二种传参方式&#xff1a;通过传参优缺点 五.向下转型举例缺点 六.多态的优缺点优点缺点 一…

【Python 千题 —— 基础篇】账号登录

题目描述 题目描述 简易登录系统。你的账号密码分别是 “student”&#xff0c;“123456”&#xff1b;请使用 if-else 设计一个简易登录系统&#xff0c;输入账号密码。登陆成功输出 “Welcome !”&#xff0c;登录失败输出 “Login failed !” 输入描述 输入账号和密码。…

分类预测 | Matlab实现PSO-LSTM粒子群算法优化长短期记忆神经网络的数据多输入分类预测

分类预测 | Matlab实现PSO-LSTM粒子群算法优化长短期记忆神经网络的数据多输入分类预测 目录 分类预测 | Matlab实现PSO-LSTM粒子群算法优化长短期记忆神经网络的数据多输入分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现PSO-LSTM粒子群算法优化长短…

vue3+vite搭建后台项目-1 引入element-plus 中文包,打包时报错问题

vue3vite搭建后台项目-1 引入element-plus 中文包,打包时报错问题 终端报错 If theelement-pluspackage actually exposes this module, try adding a new declaration (.d.ts) file containing are moduleelement-plus/dist/locale/zh-cn.mjsdec import zhCn fromelement-plus…

VS c++多文件编译

前言&#xff1a;记录下我这个菜鸡学习的过程&#xff0c;如有错误恳请指出&#xff0c;不胜感激&#xff01; 1.简单多文件编译调试 文件目录&#xff1a; 编译&#xff1a; -g选项是告诉编译器生成调试信息&#xff0c;这样可以在程序崩溃或出现错误时更容易地进行调试。这…

思维模型 多看效应

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。越熟悉&#xff0c;越喜欢。 1 多看效应的应用 1.1 多看效应在广告和营销领域的应用 1 可口可乐之歌 可口可乐公司在 20 世纪 60 年代推出了“可口可乐之歌”广告&#xff0c;这个广告通…

华为ensp:交换机接口划分vlan

现在要把 e0/0/1 接口放入vlan1 e0/0/2 接口放入vlan2 e0/0/3 接口放入vlan3 默认所有接口都在vlan1所以 e0/0/0 接口不用动 1.创建vlan 进入系统视图模式 直接输入 vlan 编号 即可创建对应vlan vlan 编号 vlan 2 创建vlan2 vlan 3 创建vlan3 2.将接口进入vlan…

【java:牛客每日三十题总结-5】

java:牛客每日三十题总结 总结如下 总结如下 -Xmx&#xff1a;最大堆大小 -Xms&#xff1a;初始堆大小 -Xmn:年轻代大小 -XXSurvivorRatio&#xff1a;年轻代中Eden区与Survivor区的大小比值 年轻代5120m&#xff0c; Eden&#xff1a;Survivor3&#xff0c;Survivor区大小102…

【Git】的分支和标签的讲解及实际应用场景

目录 一、讲解 1. 环境讲述 2. 应用原因 3. 分支标签的区别 二、分支 1. 命令 2. 场景应用 三、标签 1. 命令 2. 标签规范 3. 应用场景 每篇一获 一、讲解 1. 环境讲述 当软件从开发到正式环境部署的过程中&#xff0c;不同环境的作用如下&#xff1a; 开发环境&a…

电脑清灰涂硅脂后电脑CPU温度不降反升

目录 一.问题描述二.问题解决三.拆机注意事项四.影响散热的主要因素说明1.通风差2.硅脂材料差3.硅脂涂抹方式错误 一.问题描述 电脑型号&#xff1a;暗影精灵5 测温工具&#xff1a;硬件狗狗&#xff08;只要是测温软件都可以&#xff0c;比如omen hub和Core Temp…&#xff0…

LeetCode146.LRU缓存

写了一个小时&#xff0c;终于把示例跑过了&#xff0c;没想到啊提交之后第19/22个测试用例没过 我把测试用例的输出复制在word上看看和我的有什么不同&#xff0c;没想到有18页的word&#xff0c;然后我一直检查终于找出了问题&#xff0c;而且这个bug真的太活该了&#xff0c…

Sprint Boot 学习路线 4

微服务 Spring Microservices是一个框架&#xff0c;它使用Spring框架更容易地构建和管理基于微服务的应用程序。微服务是一种架构风格&#xff0c;其中一个大型应用程序被构建为一组小型、独立可部署的服务。每个服务具有明确定义的职责&#xff0c;并通过API与其他服务通信。…

CCNA课程实验-13-PPPoE

目录 实验条件网络拓朴需求 配置实现基础配置模拟运营商ISP配置ISP的DNS配置出口路由器OR基础配置PC1基础配置 出口路由器OR配置PPPOE拨号创建NAT(PAT端口复用) PC1测试结果 实验条件 网络拓朴 需求 OR使用PPPoE的方式向ISP发送拨号的用户名和密码&#xff0c;用户名&#xf…

前端面试题之vue篇

vue基础 vue的基本原理 当一个Vue实例创建时&#xff0c;Vue会遍历data中的属性&#xff0c;用Object.defineProperty(Vue使用proxy)转换为getter/setter&#xff0c;并且在内部追踪相关依赖&#xff0c;在属性被访问和修改时通知变化。每个组件实例都有相应的watcher程序实例…