Flink SQL kafka连接器

news2024/12/25 23:49:05

版本说明

Flink和kafka的版本号有一定的匹配关系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依赖

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分发flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

启动yarn-session

[hadoop@node2 ~]$ myhadoop.sh start
[hadoop@node2 ~]$ yarn-session.sh -d

启动kafka集群

[hadoop@node2 ~]$ zk.sh start
[hadoop@node2 ~]$ kf.sh start

创建kafka主题

查看主题
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
​
如果没有ws1,则创建
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
​

普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL> 

创建Kafka的映射表

CREATE TABLE t1( 
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
  'properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  'scan.startup.mode' = 'earliest-offset',
  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
  'topic' = 'ws1',
  'format' = 'json'
);

可以往kafka读数据,也可以往kafka写数据。

插入数据到Kafka表

如果没有source表,先创建source表,如果source表存在则不需要再创建。

CREATE TABLE source ( 
    id INT, 
    ts BIGINT, 
    vc INT
) WITH ( 
    'connector' = 'datagen', 
    'rows-per-second'='1', 
    'fields.id.kind'='random', 
    'fields.id.min'='1', 
    'fields.id.max'='10', 
    'fields.ts.kind'='sequence', 
    'fields.ts.start'='1', 
    'fields.ts.end'='1000000', 
    'fields.vc.kind'='random', 
    'fields.vc.min'='1', 
    'fields.vc.max'='100'
);

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否复制成功

$ ls $FLINK_HOME/lib

重启sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
​

查询Kafka表

select * from t1;

报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord

​

重启yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
​
​
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
​
Flink SQL> 
​

 

upsert-kafka表

'connector' = 'upsert-kafka'

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( 
    id int , 
    sumVC int ,
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'node2:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
);

如果没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。

设置显示模式

SET sql-client.execution.result-mode=tableau;

 查询t2表数据

select * from t2;

如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。

进入Flink Web UI,cancel掉所有running job,重新操作成功如下:

删除表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|     source |
|         t1 |
|         t2 |
+------------+
3 rows in set
​
Flink SQL> drop table source;
Flink SQL> drop table t1;
Flink SQL> drop table t2;

创建表

CREATE TABLE source ( 
    id INT, 
    ts BIGINT, 
    vc INT
) WITH ( 
    'connector' = 'datagen', 
    'rows-per-second'='1', 
    'fields.id.kind'='random', 
    'fields.id.min'='1', 
    'fields.id.max'='10', 
    'fields.ts.kind'='sequence', 
    'fields.ts.start'='1', 
    'fields.ts.end'='1000000', 
    'fields.vc.kind'='random', 
    'fields.vc.min'='1', 
    'fields.vc.max'='100'
);
CREATE TABLE t2( 
    id int , 
    sumVC int ,
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'node2:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
);

设置显示模式

SET sql-client.execution.result-mode=tableau;

查询表

select * from t2;

 

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1905860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI教你如何系统的学习Python

Python学习计划 第一阶段:Python基础(1-2个月) 目标:掌握Python的基本语法、数据类型、控制结构、函数、模块和包等。 学习Python基本语法:包括变量、数据类型(整数、浮点数、字符串、列表、元组、字典、…

Java求解百钱买百鸡问题(课堂实例2)

目录 💕💕引言💕💕 😍😍点关注编程梦想家(大学生版)-CSDN博客不迷路💕💕 一、问题背景----百鸡百钱_百度百科 (baidu.com) 𝑥𝑦&a…

颍川韩氏始祖,归顺大汉的弓高侯

弓高侯,听起来十分不顺当,像是域外来音似的。本人的名字更另类——颓当,词典中甚至找不到。然而,弓高曾经是河北的一个县名——弓高县,颓当曾经是匈奴的一个城——颓当城,这两个地名已经不存在了&#xff0…

python - 文件 / 永久存储:pickle / 异常处理

一.文件 利用help(open)可以看到open()函数的定义: >>> help(open) Help on built-in function open in module _io:open(file, moder, buffering-1, encodingNone, errorsNone, newlineNone, closefdTrue, openerNone) 默认打开模式是’rt’&#xff0…

spring boot(学习笔记第十二课)

spring boot(学习笔记第十二课) Spring Security内存认证&#xff0c;自定义认证表单 学习内容&#xff1a; Spring Security内存认证自定义认证表单 1. Spring Security内存认证 首先开始最简单的模式&#xff0c;内存认证。 加入spring security的依赖。<dependency>…

edge浏览器详细解析

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 Microsoft Edge ​​​​…

InvalidVersionSpecError: Invalid version spec: =2.7解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

C++ | Leetcode C++题解之第22题完全二叉树的节点个数

题目&#xff1a; 题解&#xff1a; class Solution { public:int countNodes(TreeNode* root) {if (root nullptr) {return 0;}int level 0;TreeNode* node root;while (node->left ! nullptr) {level;node node->left;}int low 1 << level, high (1 <&…

详解Java垃圾回收(GC)机制

一、为什么需要垃圾回收 如果不进行垃圾回收&#xff0c;内存迟早都会被消耗空&#xff0c;因为我们在不断的分配内存空间而不进行回收。除非内存无限大&#xff0c;我们可以任性的分配而不回收&#xff0c;但是事实并非如此。所以&#xff0c;垃圾回收是必须的。 二、哪些内…

计算机的错误计算(二十四)

摘要 计算机的错误计算&#xff08;二十一&#xff09;就案例 展示了“两个不相等数相减&#xff0c;差为0”。本节给出新的计算过程&#xff1a;不停增加计算精度直到出现非0结果。这个过程与结果表明&#xff0c;即使是专业数学软件&#xff0c;对这个问题的处理&#xff0…

JS进阶-作用域

学习目标&#xff1a; 掌握作用域 学习内容&#xff1a; 作用域局部作用域全局作用域作用域链JS垃圾回收机制拓展-JS垃圾回收机制-算法说明闭包变量提升 作用域&#xff1a; 作用域规定了变量能够被访问的"范围"&#xff0c;离开了这个"范围"变量便不能被…

论文1:多模态人类活动识别综述

论文题目&#xff1a;A Review of Multimodal Human Activity Recognition with Special Emphasis on Classification, Applications, Challenges and Future Directions 文献偏旧-2021 1、 专业词汇&#xff1a; Human activity recognition (HAR)-人类活动识别 Wearable …

Open3D 计算点云的马氏距离

目录 一、概述 1.1原理 1.2应用 二、代码实现 三、实现效果 3.1原始点云 3.2计算后点云 一、概述 1.1原理 马氏距离&#xff08;Mahalanobis Distance&#xff09;是一种度量多维数据点与数据分布中心之间距离的方法。与欧几里得距离不同&#xff0c;马氏距离考虑了数据…

树目标、抓过程、要结果

一个好的管理理念不会因为一两个成功案例而发扬&#xff0c;一定是有无数个案例验证了它的价值所在&#xff0c;既然OKR在国外已经取得成功&#xff0c;那么国内依然如此。那么OKR这么成功&#xff0c;它到底好在哪呢&#xff1f; 一、OKR是连接企业战略和落地执行的最佳方式。…

C嘎嘎:类和对象(上)

目录 面向过程和面向对象的初步认识 类的引入 类的定义 类的访问限定符及封装 访问限定符 封装 类的作用域 类的实例化 类对象模型 如何计算类对象大小 结构体内存对齐规则 this指针 this指针的引出 this指针的特性 面向过程和面向对象的初步认识 C语言是面向过程…

CentOS 6.5配置国内在线yum源和制作openssh 9.8p1 rpm包 —— 筑梦之路

CentOS 6.5比较古老的版本了&#xff0c;而还是有一些古老的项目仍然在使用。 环境说明 1. 更换国内在线yum源 CentOS 6 在线可用yum源配置——筑梦之路_centos6可用yum源-CSDN博客 cat > CentOS-163.repo << EOF [base] nameCentOS-$releasever - Base - 163.com …

尚品汇-(十二)

&#xff08;1&#xff09;数据库表结构 根据以上的需求&#xff0c;以此将SKU关联的数据库表结构设计为如下&#xff1a; base_attr_value&#xff1a;前面学的平台属性值表 我们进行关联&#xff0c;可以从分类导向平台&#xff0c;通过平台过滤商品 &#xff08;2&#xf…

利用亚马逊云科技云原生Serverless代码托管服务开发OpenAI ChatGPT-4o应用

今天小李哥继续介绍国际上主流云计算平台亚马逊云科技AWS上的热门生成式AI应用开发架构。上次小李哥分享​了利用谷歌云serverless代码托管服务Cloud Functions构建Gemini Pro API​&#xff0c;这次我将介绍如何利用亚马逊的云原生服务Lambda调用OpenAI的最新模型ChatGPT 4o。…

【NTN 卫星通信】Starlink基于终端用户的测量以及测试概述

1 概述 收集了一些starlink的资料&#xff0c;是基于终端侧部署在野外的一些测试以及测量结果。 2 低地球轨道卫星网络概述 低地球轨道卫星网络(lsn)被认为是即将到来的6G中真正实现全球覆盖的关键基础设施。本文介绍了我们对Starlink端到端网络特征的初步测量结果和观测结果&…

基于YOLOv9的脑肿瘤区域检测

数据集 脑肿瘤区域检测&#xff0c;我们直接采用kaggle公开数据集&#xff0c;Br35H 数据中已对医学图像中脑肿瘤位置进行标注 数据集我已经按照YOLO格式配置好&#xff0c;数据内容如下 数据集中共包含700张图像&#xff0c;其中训练集500张&#xff0c;验证集200张 模型训…