16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)

news2025/1/20 20:08:25

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例:Elasticsearch
    • 1、maven依赖(java编码依赖)
    • 2、创建 Elasticsearch 表并写入数据
    • 3、连接器参数
    • 4、特性
      • 1)、Key 处理
      • 2)、动态索引
    • 5、数据类型映射
  • 二、Flink SQL示例:将kafka数据写入es
    • 1、依赖环境
    • 2、创建表并提交任务
    • 3、验证
      • 1)、创建es表
      • 2)、创建kafka表
      • 3)、提交任务
      • 4)、创建kafkatopic
      • 5)、往kafka topic中写入数据
      • 6)、查看es中的数据


本文介绍了Elasticsearch连接器的使用,并以2个示例完成了外部系统是Elasticsearch的介绍,即使用datagen作为数据源写入Elasticsearch和kafka作为数据源写入Elasticsearch中。
本文依赖环境是Flink、kafka、Elasticsearch、hadoop环境好用,如果是ha环境则需要zookeeper的环境。
本文分为2个部分,即Elasticsearch的基本介绍及示例和Elasticsearch与kafka的使用示例。

一、Table & SQL Connectors 示例:Elasticsearch

Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中(不支持读取,截至1.17版本)。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。

连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。

如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。

1、maven依赖(java编码依赖)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7</artifactId>
  <version>3.0.1-1.17</version>
</dependency>

2、创建 Elasticsearch 表并写入数据

本示例的Elasticsearch是7.6,故需要Elasticsearch7的jar文件

flink-sql-connector-elasticsearch7_2.11-1.13.6.jar

CREATE TABLE source_table (
 userId INT,
 age INT,
 balance DOUBLE,
 userName STRING,
 t_insert_time AS localtimestamp,
 WATERMARK FOR t_insert_time AS t_insert_time
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.userId.kind'='sequence',
 'fields.userId.start'='1',
 'fields.userId.end'='5000',

 'fields.balance.kind'='random',
 'fields.balance.min'='1',
 'fields.balance.max'='100',

 'fields.age.min'='1',
 'fields.age.max'='1000',

 'fields.userName.length'='10'
);


CREATE TABLE alan_flink_es_user_idx (
 userId INT,
 age INT,
 balance DOUBLE,
 userName STRING,
 t_insert_time AS localtimestamp,
 PRIMARY KEY (userId) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://192.168.10.41:9200,http://192.168.10.42:9200,http://192.168.10.43:9200',
  'index' = 'alan_flink_es_user_idx'
);


INSERT INTO alan_flink_es_user_idx
SELECT userId,   age, balance , userName FROM source_table;

---------------------具体操作如下-----------------------------------
Flink SQL> CREATE TABLE source_table (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING,
>  t_insert_time AS localtimestamp,
>  WATERMARK FOR t_insert_time AS t_insert_time
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='5',
>  'fields.userId.kind'='sequence',
>  'fields.userId.start'='1',
>  'fields.userId.end'='5000',
> 
>  'fields.balance.kind'='random',
>  'fields.balance.min'='1',
>  'fields.balance.max'='100',
> 
>  'fields.age.min'='1',
>  'fields.age.max'='1000',
> 
>  'fields.userName.length'='10'
> );
[INFO] Execute statement succeed.

Flink SQL> 
> 
> CREATE TABLE alan_flink_es_user_idx (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING,
>  t_insert_time AS localtimestamp,
>  PRIMARY KEY (userId) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://192.168.10.41:9200,http://192.168.10.42:9200,http://192.168.10.43:9200',
>   'index' = 'alan_flink_es_user_idx'
> );
[INFO] Execute statement succeed.

Flink SQL> 
> 
> INSERT INTO alan_flink_es_user_idx
> SELECT userId,   age, balance , userName FROM source_table;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1163eb7a404c2678322adaa89409bcda
-----由于es的表不支持source,故不能查询,查询会报如下错误----
Flink SQL> select * from alan_flink_es_user_idx;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'elasticsearch-7' can only be used as a sink. It cannot be used as a source.

Elasticsearch结果如下图
在这里插入图片描述

3、连接器参数

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、特性

1)、Key 处理

Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。

在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。 Elasticsearch 连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES,ROW,ARRAY,MAP 等。 如果未指定主键,Elasticsearch 将自动生成文档 id。

有关 PRIMARY KEY 语法的更多详细信息,请参见 22、Flink 的table api与sql之创建表的DDL。

2)、动态索引

Elasticsearch sink 同时支持静态索引和动态索引。

如果你想使用静态索引,则 index 选项值应为纯字符串,例如 ‘myusers’,所有记录都将被写入到 “myusers” 索引中。

如果你想使用动态索引,你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。 你也可以使用 ‘{field_name|date_format_string}’ 将 TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。 date_format_string 与 Java 的 DateTimeFormatter 兼容。 例如,如果选项值设置为 ‘myusers-{log_ts|yyyy-MM-dd}’,则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 “myusers-2020-03-27” 索引中。

你也可以使用 ‘{now()|date_format_string}’ 将当前的系统时间转换为 date_format_string 指定的格式。now() 对应的时间类型是 TIMESTAMP_WITH_LTZ 。 在将系统时间格式化为字符串时会使用 session 中通过 table.local-time-zone 中配置的时区。 使用 NOW(), now(), CURRENT_TIMESTAMP, current_timestamp 均可以。

使用当前系统时间生成的动态索引时, 对于 changelog 的流,无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引,只能支持 append only 的流。

5、数据类型映射

Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 Flink 为 Elasticsearch 连接器使用内置的 ‘json’ 格式。更多类型映射的详细信息,请参阅 35、Flink 的JSON Format。

二、Flink SQL示例:将kafka数据写入es

本示例是将kafka的数据通过Flink 的任务写入es中。

1、依赖环境

需要增加kafka和es相关的jar包,本示例用到如下:

flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
flink-sql-connector-kafka_2.11-1.13.5.jar

2、创建表并提交任务

在flink sql中运行

CREATE TABLE alan_flink_es_kafka_user_idx (
 userId INT,
 age INT,
 balance DOUBLE,
 userName STRING,
 t_insert_time AS localtimestamp,
 PRIMARY KEY (userId) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://192.168.10.41:9200',
  'index' = 'alan_flink_es_kafka_user_idx_test'
);

CREATE TABLE alanchan_kafka_table (
    `id` INT,
    name STRING,
    age INT,
    balance DOUBLE,
    ts BIGINT, -- 以毫秒为单位的时间
    t_insert_time AS TO_TIMESTAMP_LTZ(ts,3),
    WATERMARK FOR t_insert_time AS t_insert_time - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
) WITH (
    'connector' = 'kafka',
    'topic' = 't_kafkasource2',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
    'format' = 'csv'
);

INSERT INTO alan_flink_es_kafka_user_idx
SELECT id,   age, balance , name FROM alanchan_kafka_table;

3、验证

本示例没有特别说明,则是在flink sql cli中操作,kafka则是kafka的运行环境命令。

1)、创建es表

Flink SQL> CREATE TABLE alan_flink_es_kafka_user_idx (
>  userId INT,
>  age INT,
>  balance DOUBLE,
>  userName STRING,
>  t_insert_time AS localtimestamp,
>  PRIMARY KEY (userId) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://192.168.10.41:9200',
>   'index' = 'alan_flink_es_kafka_user_idx_test'
> );
[INFO] Execute statement succeed.

2)、创建kafka表

Flink SQL> CREATE TABLE alanchan_kafka_table (
>     `id` INT,
>     name STRING,
>     age INT,
>     balance DOUBLE,
>     ts BIGINT, -- 以毫秒为单位的时间
>     t_insert_time AS TO_TIMESTAMP_LTZ(ts,3),
>     WATERMARK FOR t_insert_time AS t_insert_time - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 't_kafkasource2',
>     'scan.startup.mode' = 'earliest-offset',
>     'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>     'format' = 'csv'
> );
[INFO] Execute statement succeed.

3)、提交任务

Flink SQL> INSERT INTO alan_flink_es_kafka_user_idx
> SELECT id,   age, balance , name FROM alanchan_kafka_table;
........
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: dc19c9b904f69985d40eca372af9553a

4)、创建kafkatopic

[alanchan@server3 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_kafkasource2 --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic t_kafkasource2.

>

5)、往kafka topic中写入数据

[alanchan@server3 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource2
>1,alan,15,100,1692593500222
>2,alanchan,20,200,1692593501230
>3,alanchanchn,25,300,1692593502242
>4,alan_chan,30,400,1692593503256
>5,alan_chan_chn,500,45,1692593504270
>

6)、查看es中的数据

在这里插入图片描述
以上,完成了外部系统是Elasticsearch的介绍,使用了2个示例,即使用datagen作为数据源写入Elasticsearch和kafka作为数据源写入Elasticsearch中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/925501.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每日一题 113路径总和||(递归)

题目 给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22…

脱离束缚:数字化工厂中ARM控制器的革命性应用!

近年来&#xff0c;中国数字经济体系已进入高速增长阶段。制造业作为中国经济高质量发展的重要支撑力量&#xff0c;在面临生产成本不断上涨、关键装备和核心零部件“受制于人”等挑战时&#xff0c;建设数字化工厂已成必然。 数字化工厂数据采集出现的问题 在数字工厂的建设…

两个步骤让图片动起来!

在当今数字时代&#xff0c;动态图片已经成为了网页设计和移动应用设计的标配之一。动态图片能够吸引用户的注意力&#xff0c;提高用户体验和页面交互性。那么&#xff0c;图片怎么动起来&#xff1f;有什么好用的方法呢&#xff1f;下面我们来一起探讨一下。 通常我们认知的动…

【分布式技术专题】「OSS中间件系列」从0到1的介绍一下开源对象存储MinIO技术架构

MinIO背景介绍 MinIO创始者是Anand Babu Periasamy, Harshavardhana&#xff08;戒日王&#xff09;等人&#xff0c; Anand是GlusterFS的初始开发者、Gluster公司的创始人与CTO&#xff0c;Harshavardhana曾经是GlusterFS的开发人员&#xff0c;直到2011年红帽收购了Gluster公…

Web安全测试(三):SQL注入漏洞

一、前言 结合内部资料&#xff0c;与安全渗透部门同事合力整理的安全测试相关资料教程&#xff0c;全方位涵盖电商、支付、金融、网络、数据库等领域的安全测试&#xff0c;覆盖Web、APP、中间件、内外网、Linux、Windows多个平台。学完后一定能成为安全大佬&#xff01; 全部…

IDEA启动两个Tomcat服务的方式 使用nginx进行反向代理 JMeter测试分布式情况下synchronized锁失效

目录 引出IDEA启动Tomcat两个端口的方式1.编辑配置2.添加新的端口-Dserver.port80833.service里面管理4.启动后进行测试 使用nginx进行反向代理反向代理多个端口运行日志查看启动关闭重启 分布式情况下synchronized失效synchronized锁代码启动tomcat两个端口nginx反向代理JMete…

实验七 Linux 内核移植

【实验目的】 掌握 Linux 内核配置和编译的基本方法 【实验环境】 ubuntu 14.04 发行版FS4412 实验平台交叉编译工具&#xff1a;arm-none-linux-gnueabi- 【注意事项】 实验步骤中以“$”开头的命令表示在 ubuntu 环境下执行&#xff0c;以“#”开头的命令表 示在开发板下…

C++信息学奥赛1139:整理药名

#include <iostream> #include <string> using namespace std; int main() {int n;// 输入整数ncin>>n;cin.ignore();string arr[n];// 循环读取n行字符串for (int i 0; i<n ;i){getline(cin,arr[i]);}for (int i 0; i<n ;i){for(int j0;j<arr[i]…

【JSDocvscode】使用JSDoc、在vscode中开启node调试、使用vscode编写运行Python程序

JSDoc JSDoc是JavaScript的一种注释语法&#xff0c;同时通过JSDoc注释也可以规避js弱类型中不进行代码提示的问题 图形展示JSDoc的效果&#xff1a; 上述没有进行JSDoc&#xff0c;然后我们a点什么 是没有任何提示的 上述就是加上 JSDoc的效果 常用的 vscode 其实内置了 js…

IBM Spectrum LSF License Scheduler

LSF License Scheduler 提供了两个版本: Basic Edition 和 Standard Edition。 LSF License Scheduler Basic Edition 随附于 LSF Standard Edition 和 Advanced Edition &#xff0c;并非旨在应用有关如何在集群或项目之间共享许可证的策略。 相反&#xff0c; LSF License S…

[JavaWeb]【十四】web后端开发-MAVEN高级

目录 一、分模块设计与开发 1.1 分模块设计 1.2 分模块设计-实践​编辑 1.2.1 复制老项目改为spring-boot-management 1.2.2 新建maven模块runa-pojo 1.2.2.1 将原项目pojo复制到runa-pojo模块 1.2.2.2 runa-pojo引入新依赖 1.2.2.3 删除原项目pojo包 1.2.2.4 在spring-…

postman 调用webservice

有个外部接口需要提供古老的webservice 格式接口。 1 设置格式 按照xml 格式设置。 2 消息体xml 封装 不加envelope: <soap:Envelope xmlns:soap"" target"_blank">http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <soap…

硬盘中病毒是什么原因?硬盘格式化能清除病毒吗

“我的电脑中了一个非常顽固的病毒&#xff0c;朋友建议我进行硬盘格式化来彻底清除病毒。不知道是不是真的有用&#xff0c;半信半疑下进行了硬盘格式化。当我完成操作后&#xff0c;我发现有些工作文件没有备份到。这可怎么办&#xff1f;想问下大家有没有什么方法去恢复数据…

ClickHouse进阶(二):ClickHouse MergeTree表引擎及目录解析

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术&#xff0c;IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &…

【VS Code插件开发】消息通信(四)

&#x1f431; 个人主页&#xff1a;不叫猫先生&#xff0c;公众号&#xff1a;前端舵手 &#x1f64b;‍♂️ 作者简介&#xff1a;前端领域优质作者、阿里云专家博主&#xff0c;共同学习共同进步&#xff0c;一起加油呀&#xff01; &#x1f4e2; 资料领取&#xff1a;前端…

ModaHub魔搭社区:WinPlan垂直大模型数据采集

WinPlan经营大脑数据手动提交 数据采集模版创建后,用户可手动提交数据 数据批量导入 1、第一步:上传Excel 如何选择Excel本系统的批量导入支持选择任意相关的Excel,映射到数据采集模版的各列,即可实现批量导入;相关Excel可以是自行维护的相关数据、或从其他业务系统导出…

【C语言】文件操作 -- 详解

一、什么是文件 磁盘上的文件是文件。 1、为什么要使用文件 举个例子&#xff0c;当我们想实现一个 “通讯录” 程序时&#xff0c;在通讯录中新建联系人、删除联系人等一系列操作&#xff0c;此时的数据存储于内存中&#xff0c;程序退出后所有数据都会随之消失。为了让通讯录…

vue-admin-template实现按钮级控制

这里记录一下使用大佬的模板vue-admin-template&#xff0c;实现按钮级别控制 实现的思路&#xff1a;用户登录之后&#xff0c;返回用户详细信息(将用户的所有权限码发送给前端)&#xff0c;然后将权限码保存在全局状态管理对象中&#xff0c;然后在组件中进行判断是否显示 最…

JAVA学习-愚见

JAVA学习-愚见 分享一下Java的学习路线&#xff0c;仅供参考【本人亲测&#xff0c;真实有效】 1、尽可能推荐较新的课程 2、大部分视频在B站上直接搜关键词就行【自学&#xff0c;B大的学生】 文章目录 JAVA学习-愚见前期准备Java基础课程练手项目 数据库JavaWeb前端基础 Vue…

Rancher2.5.9版本证书更新

一、环境 主机名IP地址操作系统rancher版本K8s-Master192.168.10.236Centos 72.5.9 二、更新证书 1、查看当前证书到期时间 2、进行证书轮换 [rootK8s-Master ~]# docker ps |grep rancher/rancher d581da2b7c4e rancher/rancher:v2.5.9 &q…