Flink之SQL client使用案例

news2024/9/27 12:17:40

Flink的执行模式有以下三种:

前提是我们已经开启了yarnsession的进程,在下图中可以看到启动的id也就是后续任务需要通过此id进行认证,以及任务分配的master主机。

这里启动时候会报错一个ERROR:org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed

查阅资料得知:

该错误是因为,kerberos认证失败,cdh6,并没有启动kerberos。所以该错误可以忽略。但是如果已经开启动了kerberos,这个问题就要解决了。

我们这里没有开启Kerberos,所以这个报错我么可以不管。

Session Mode:会话模式

会话模式需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所有提交的作业会竞争集群中的资源。适合任务规模小,执行时间短的大量作业。

Flink的作业执行环境会一直保留在集群上,直到会话被显式终止。这样,可以提交多个作业,它们可以共享相同的集群资源和状态,从而实现更高的效率和资源利用。

bin/flink run -yid application_1723708102500_0009  examples/batch/WordCount.jar

重要的是要添加 -yid 这个参数,不添加这个参数会执行不成功,会报错找不到执任务的cluster。

脚本执行参数:

-n(--container):TaskManager的数量。(1.10 已经废弃)

-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。

-jm:JobManager的内存(单位MB)。

-q:显示可用的YARN资源(内存,内核);

-tm:每个TaskManager容器的内存(默认值:MB)

-nm:yarn 的appName(现在yarn的ui上的名字)。  

-d:后台执行。

提交flink任务:

bin/flink run examples/batch/WordCount.jar

Per-Job Mode:单作业模式,我们也是更多的使用这种模式,这个模式会将我们的资源更合理的规划使用。

每个Flink应用程序作为一个独立的作业被提交和执行。

每次提交的Flink应用程序都会创建一个独立的作业执行环境,该作业执行环境仅用于执行该特定的作业。

作业完成后,作业执行环境会被释放,集群关闭,资源释放

bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

常用参数:

--p 程序默认并行度

下面的参数仅可用于 -m yarn-cluster 模式

--yjm JobManager可用内存,单位兆

--ynm YARN程序的名称

--yq 查询YARN可用的资源

--yqu 指定YARN队列是哪一个

--ys 每个TM会有多少个Slot

--ytm 每个TM所在的Container可申请多少内存,单位兆

--yD 动态指定Flink参数

-yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)

Application Mode:应用模式

应用模式算是前2种模式的升级,前2种模式中,Flink程序代码是在客户端执行,然后客户端提交给JobManager,客户端需要占用大量网络带宽。

应用模式需要为每一个提交的应用单独启动一个JobManager(应用程序在JobManager执行),也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager关闭。

application 模式使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。

带有 JM 和 TM 内存设置的命令提交,这种方式提交之后会带对应服务器的HDFS的WebUI页面多出一个wordcount_01的文件,该文件记录了程序运行的结果

./bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

./examples/batch/WordCount.jar --output hdfs://ddp54:8020/wordcount_01

在上面例子 的基础上自己设置 TaskManager slots 个数为3,以及指定并发数为3:

./bin/flink run-application -t yarn-application -p 3 \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

-Dtaskmanager.numberOfTaskSlots=3 \

./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_52

指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范:

./bin/flink run-application -t yarn-application \

-Dparallelism.default=3 \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

-Dtaskmanager.numberOfTaskSlots=3 \

./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_53

以上三种模式就先简述这些,其实还有很多参数没有用到,我们更多的只需要用到第二种pre-job的模式即可。

Yarn-session模式开启成功后,我们进入SQL-Client界面,在这个界面我们可以写SQL来实现系统之间的交互,我接下来以MySQL与Kafka的交互为例:

首先是要在MySQL数据库创建一些库和表当作source数据源:

CREATE TABLE src_mysql_order(
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`)
);

CREATE TABLE src_mysql_order_detail(
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id)
);

CREATE TABLE dim_store(
 store_id BIGINT,
 store_name varchar(100),
 PRIMARY KEY (`store_id`)
);

CREATE TABLE dim_goods(
 goods_id BIGINT,
 goods_name varchar(100),
 PRIMARY KEY (`goods_id`)
);

CREATE TABLE dwa_mysql_order_analysis (
   store_id BIGINT,
   store_name varchar(100),
   sales_goods_distinct_nums bigint,
   sales_amt double,
   order_nums bigint,
   PRIMARY KEY (store_id,store_name)
);

Source:在MySQL中创建完成之后我们要在SQL client界面进行映射在这里以src_mysql_order表为例,执行成功如以下界面:

CREATE TABLE src_mysql_order(

 order_id BIGINT,

 store_id BIGINT,

 sales_amt double,

 PRIMARY KEY (`order_id`) NOT ENFORCED

) WITH (

 'connector' = 'mysql-cdc',

 'hostname' = 'xxx',

 'port' = '3306',

 'username' = 'xxx',

 'password' = 'xxx',

 'database-name' = 'xxx',

 'table-name' = 'xxx',

 'scan.incremental.snapshot.enabled' = 'false'

);

Sink:对MySQL做完source映射之后,我们要将MySQL的数据导入到Kafka,因此我们也要做一些Kafka表的映射,执行成功界面如下:

CREATE TABLE ods_kafka_order (

 order_id BIGINT,

 store_id BIGINT,

 sales_amt double,

 PRIMARY KEY (`order_id`) NOT ENFORCED

) WITH (

 'connector' = 'upsert-kafka',

 'topic' = 'Kafka主题',

 'properties.bootstrap.servers' = 'Kafka集群的IP+端口号',

  'key.format' = 'json',

 'value.format' = 'json'

);

两张表都映射完成之后,我们先在MySQL添加一些测试用例:

insert into src_mysql_order values

(20221210001,10000,50),

(20221210002,10000,20),

(20221210003,10001,10);

接下来就将MySQL与Kafka实现交互,即将MySQL数据插入到Kafka作业中:

insert into ods_kafka_order_2 select * from src_mysql_order;

在这个过程中,有可能会报错:

这个报错是找不到表的元数据信息,我这里是将表名写错了,这个是比较庆幸的,但是还有一种原因就是:没有MySQLCDC或者Kafka的依赖,导致连接的元数据信息无法保存到catalog中,因此我们就需要添加MySQLCDC和Kafka的连接依赖:

进入到Flink安装路径的lib目录下:使用 rz 指令将依赖jar包上传,上传完毕之后使用 scp 指令远程复制给集群的其它机器,我们的是ddp54、ddp55:

scp -r lib/flink-sql-connector-kafka-1.16.2.jar  root@ddp54:$PWD/lib

scp -r lib/flink-sql-connector-kafka-1.16.2.jar  root@ddp55:$PWD/lib

Jar包上传完之后,我们在基础平台将Flink集群重启

集群重启之后,我们重新开启一个yarnsession进程来执行后续提交的任务。

进入yarn的web页面来查看进程启动的状况。

接下来我们重走一遍MySQL的source和Kafka的sink流程,走完之后进入SQL client界面执行交互指令,即MySQL数据插入到Kafka,执行完成之后没有报错,但是查看flink的web页面发现并没有作业在执行或执行完成,于是查看日志得知:问题是MySQL的系统时间跟所在地区时间不匹配导致的,我们可以在命令行进行时区的设置,也可以在配置文件中进行时区的设置,我选择了在my.cnf配置文件中进行时区的更改:在[mysqld]下添加默认时区设置即可,与此同时,MySQL也要开启binlog日志,可以保障数据一致性,主要用于复制和数据恢复。配置完成之后重启MySQL服务。

开启binlog日志

# 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有>关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1

 查看日志得知是MySQL的时区问题导致任务提交不成功

在 my.cnf 对时区和binlog日志进行修改

上边的MySQL配置完成之后,需要重启MySQL服务

docker restart mysql

接下来在SQL client界面再次执行指令:

insert into ods_kafka_order select * from src_mysql_order;

打开Flink的web界面,发现Flink的作业任务正在执行:

我们在SQL client界面查询MySQL的数据表信息:

SET sql-client.execution.result-mode=tableau; 

select * from src_mysql_order;

可以查看插入到MySQL的数据信息和数据的更新信息[Flink中 +I 代表插入数据 ; +U 代表更新数据 ; -U代表撤回数据]

与此同时,我们去Kafka查看数据是否到来,通过Kafka Tool查看到数据已经成功到Kafka。

至此我们实现了MySQL到Kafka的实时数据的接入以及在这个过程中遇到的一些问题以及解决办法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2052845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

风电场风机安全监测系统解决方案

建设背景 随着风电产业的快速发展,风力发电已成为一种重要的清洁能源形式。风电场中的风塔是支撑风力发电机组的重要结构,其安全稳定运行对于风电场的正常运营和发电效率至关重要。然而,风塔常常面临风载、震动、腐蚀等多种外部因素的影响&a…

一键切换全球优质Linux 系统软件源及 Docker 源,轻松安装 Docker —— 适配广泛、零门槛、超强功能的开源脚本!

概述 linuxMirrors开源脚本为 GNU/Linux 系统用户提供了强大的工具,帮助用户轻松更换系统软件源并安装 Docker。脚本适配了多种国内外镜像站,经过测试具备良好的下载速度和 IPv6 兼容性,并且还包括了中国大陆教育网镜像站的选项。无需技术背景,文档提供了详尽的操作指引和常…

telegraf、influxdb、grafana安装配置及后端监听器操作

InfluxDB(时序数据库),常用的一种使用场景:监控数据统计。 grafana,用作监控页面的前端展示。 telegraf,数据采集器。 ITG及快捷启动百度网盘:百度网盘 链接: 提取码: 0000 其他地址链接&am…

pycharm2023.1破解

下载解压文件,文件夹 /jetbra 复制电脑某个位置 注意: 补丁所属文件夹需单独存放,且放置的路径不要有中文与空格,以免 Pycharm 读取补丁错误。 点击进入 /jetbra 补丁目录,再点击进入 /scripts 文件夹,双…

JAVA中的网络编程巨详解(2w字)

在学习 Java 网络编程之前,我们先来了解什么是计算机网络。 计算机网络是指两台或更多的计算机组成的网络,在同一个网络中,任意两台计算机都可以直接通信,因为所有计算机都需要遵循同一种网络协议。 下面是一张简化的网络拓扑图…

【Unity开发】几种空值判断的性能测试

【Unity开发】几种空值判断的性能测试) 项目优化过程中,一个非常细节的优化,就是在项目数据处理过程中,会用大量的null和“”空值的判断,参考了一些网友说的性能差别很大,是不是真的需要优化的问题&#xf…

Kafka【一】Windows下安装单节点Kafka

① 下载 下载软件安装包:kafka_2.12-3.6.1.tgz,下载地址:https://kafka.apache.org/downloads 这里的3.6.1,是Kafka软件的版本。截至到2023年12月24日,Kafka最新版本为3.6.1。2.12是对应的Scala开发语言版本。Scala2…

html+css+js实现盒子

效果图&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><title>禁止打开盖子</title><style>* {box-sizing: border-box;-webkit-font-smoothing: antialiased;t…

OAuth2-0协议安全学习

有一个问题困扰了很久很久&#xff0c;翻来覆去无法入眠&#xff0c;那就是OAuth2.0有什么安全问题啊 OAuth2.0是一种常用的授权框架&#xff0c;它使网站和 Web 应用程序能够请求对另一个应用程序上的用户帐户进行有限访问&#xff0c;在全世界都有广泛运用 OAuth2.0简介 O…

pygame开发课程系列(6): 游戏优化与发布

第六章 游戏优化与发布 在游戏开发过程中&#xff0c;优化性能和正确发布是至关重要的步骤。本章将探讨如何提升游戏性能&#xff0c;以及如何将游戏打包成独立的可执行文件&#xff0c;以便于分发和使用。 6.1 性能优化 优化游戏性能可以提升用户体验&#xff0c;确保游戏…

非标零部件加工:满足个性化需求的关键

在现代制造业中&#xff0c;非标零部件加工正逐渐成为满足个性化需求的关键环节。随着各行各业对产品独特性和定制化的要求不断提高&#xff0c;传统的标准零部件已经无法完全满足市场的多样化需求。时利和将分享关于非标零部件加工是如何满足个性化需求的。 非标零部件加工的核…

如何恢复火狐浏览器中丢失的书签记录?

如何恢复火狐浏览器中丢失的书签记录&#xff1f; 在数字时代&#xff0c;网络浏览器不仅是获取信息的窗口&#xff0c;更承载着个人习惯与数据&#xff0c;火狐浏览器&#xff08;Firefox&#xff09;以其强大的自定义功能和对用户隐私的重视而广受欢迎&#xff0c;书签的丢失…

资深研发的心愿:PostgreSQL未来若能加入这些功能,将更臻完善

我们已经与 PostgreSQL 和其他数据库一起工作了一段时间。在数据库管理领域&#xff0c;PostgreSQL 以其稳健性和灵活性脱颖而出。然而&#xff0c;随着开发人员在现代应用程序开发的复杂性中航行&#xff0c;还有一些额外功能可以简化他们的工作流。以下是我希望 PostgreSQL 具…

18Canvas 组件

18 Canvas 组件 Tkinter 的 Canvas 组件是一个灵活的绘图区域&#xff0c;允许我们在其中绘制图形、文本和图像。它支持各种绘图操作&#xff0c;如画线、画矩形、画圆形等。 Canvas 组件属性 width 和 height: Canvas的宽度和高度&#xff0c;可以是像素值或以字符为单位。…

【C++】unordered_set和unordered_map的封装(哈希)

&#x1f308;个人主页&#xff1a;秦jh_-CSDN博客&#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/qinjh_/category_12575764.html?spm1001.2014.3001.5482 ​ 目录 key和pair 仿函数hash 迭代器 operator HashTable.h my_unordered_map.h my_unordered_se…

【自动驾驶】控制算法(一)绪论与前期准备

写在前面&#xff1a; &#x1f31f; 欢迎光临 清流君 的博客小天地&#xff0c;这里是我分享技术与心得的温馨角落。&#x1f4dd; 个人主页&#xff1a;清流君_CSDN博客&#xff0c;期待与您一同探索 移动机器人 领域的无限可能。 &#x1f50d; 本文系 清流君 原创之作&…

白盒测试-发送请求

发送请求-怎么操作&#xff1f; 先创建发送请求对象mockmvc--用mockmvc对象发送请求&#xff08;包含请求url,请求头&#xff0c;请求参数等&#xff09; 用到的源码是mockmvc源码--其中perform方法&#xff0c;他的入参是接口类 用mockmvc对象发送请求&#xff0c;代码是mock…

【机器学习第11章——特征选择与稀疏学习】

机器学习第11章——特征选择与稀疏学习 11.特征选择与稀疏学习11.1子集搜索与评价子集搜索子集评价 11.2 过滤式选择11.3 包裹式选择11.4 嵌入式选择11.5 稀疏表示与字典学习稀疏表示字典学习 11.6 压缩感知 11.特征选择与稀疏学习 11.1子集搜索与评价 特征&#xff1a;描述物…

全国计算机二级Python学习笔记

格式化操作符辅助指令: 格式化输出16进制&#xff0c;十进制&#xff0c;八进制整数 %x — hex 十六进制 %d — dec 十进制 %o — oct 八进制 turtle.setup()函数用于启动一个图形窗口&#xff0c;它有四个参数 turtle.setup(width, height, startx, starty) 分别是&…

kali实用工具之ettercap

ettercap最初是交换局域网的嗅探器&#xff0c;但在开发的过程中&#xff0c;它获得了越来越多的功能&#xff0c;从而使其转变为强大而灵活的中间人攻击工具。它支持许多协议&#xff08;甚至是加密协议&#xff09;的主动和被动解剖&#xff0c;并包括许多用于网络和主机分析…