Flink 客户端操作命令及可视化工具

news2024/11/24 8:56:35

Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、Scala ShellSQL ClientRestful APIWeb五个方面进行整理。

Flink安装目录的bin目录下可以看到flinkstart-scala-shell.shsql-client.sh等文件,这些都是客户端操作的入口。
[点击并拖拽以移动] ​

flink 常见操作:可以通过 -help 查看帮助

run 运行任务

-d:以分离模式运行作业
-c:如果没有在jar包中指定入口类,则需要在这里通过这个参数指定;
-m:指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager,可以说是yarn集群名称;
-p:指定程序的并行度。可以覆盖配置文件中的默认值;
-s:保存点savepoint的路径以还原作业来自(例如hdfs:///flink/savepoint-1537);

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f

就可以看到我们提交的JobManager,默认是一个并发。
[点击并拖拽以移动] ​

点进去就可以看到详细的信息
[点击并拖拽以移动] ​

点击左侧TaskManager —Stdout能看到具体输出的日志信息。
[点击并拖拽以移动] ​

或者查看TaskManager节点的log目录下的*.out文件,也能看到具体的输出信息。
[点击并拖拽以移动] ​

list 查看任务列表

-mjobmanager<arg>作业管理器(主)的地址连接。

[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop 停止任务

需要指定jobmanagerip:protjobId。如下报错可知,一个job能够被stop要求所有的source都是可以stoppable的,即实现了 StoppableFunction接口。

[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".
    at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

StoppableFunction接口如下,属于优雅停止任务。

 /**
 * @Description 需要 stoppabel 的函数必须实现此接口,例如流式任务 source*
 *               stop() 方法在任务收到 stop信号的时候调用
 *               source 在接收到这个信号后,必须停止发送新的数据优雅的停止。
 * @Date 2020/7/9 17:26
 */
 @PublicEvolving
 public interface StoppableFunction {
     /**
     * 停止 source,与 cancel() 不同的是,这是一个让 source优雅停止的请求。
     * 等待中的数据可以继续发送出去,不需要立即停止
    */
    void stop();
}

Cancel 取消任务

如果在conf/flink-conf.yaml里面配置state.savepoints.dir,会保存savepoint,否则不会保存savepoint。(重启)

state.savepoints.dir: file:///tmp/savepoint

执行 Cancel命令 取消任务

[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.

也可以在停止的时候显示指定savepoint目录

1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.

取消和停止(流作业)的区别如下:
cancel()调用, 立即调用作业算子的cancel()方法,以尽快取消它们。如果算子在接到cancel()调用后没有停止,Flink将开始定期中断算子线程的执行,直到所有算子停止为止。
stop()调用 ,是更优雅的停止正在运行流作业的方式。stop()仅适用于source实现了StoppableFunction接口的作业。当用户请求停止作业时,作业的所有source都将接收stop()方法调用。直到所有source正常关闭时,作业才会正常结束。这种方式,使 作业正常处理完所有作业。

触发 savepoint

当需要生成savepoint文件时,需要手动触发savepoint。如下,需要指定正在运行的 JobID 和生成文件的存放目录。同时,我们也可以看到它会返回给用户存放的savepoint的文件名称等信息。

 [root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
 Executing TopSpeedWindowing example with default input data set.
 Use --input to specify file input.
 Printing result to stdout. Use --output to specify output path.
 Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d
 [root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/
 Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.
 Waiting for response...
 Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfd
 You can resume your program from this savepoint with the run command.

savepointcheckpoint的区别:
checkpoint是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;savepoint是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。
checkpoint是作业failover的时候自动使用,不需要用户指定。savepoint一般用于程序的版本更新,bug修复,A/B Test等场景,需要用户指定。

从指定 savepoint 中启动

[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2

查看JobManager的日志能够看到Reset the checkpoint ID为我们指定的savepoint文件中的ID
[点击并拖拽以移动] ​

modify 修改任务并行度

这里修改masterconf/flink-conf.yamltask slot数修改为4。并通过xsync分发到 两个slave节点上。

taskmanager.numberOfTaskSlots: 4

修改参数后需要重启集群生效:关闭/启动集群

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh 
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

启动任务

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a

从页面上能看到Task Slots总计变为了8,运行的Slot1,剩余Slot数量为7
[点击并拖拽以移动] ​

这时候默认的并行度是1
[点击并拖拽以移动] ​

Flink1.0版本命令行flink modify已经没有这个行为了,被移除了。。。Flink1.7上是可以运行的。

[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.

Info 显示程序的执行计划

[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar 
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷贝输出的json内容,粘贴到这个网站:http://flink.apache.org/visualizer/可以生成类似如下的执行图。

[点击并拖拽以移动] ​

可以与实际运行的物理执行计划进行对比。
[点击并拖拽以移动] ​

SQL Client Beta

进入 Flink SQL

[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded

Select查询,按Q退出如下界面;

Flink SQL> select 'hello word';
                                                                                                        SQL Query Result (Table)
 Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:37:04.649

                    EXPR$0
                hello word




Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

打开http://hadoop1:8081能看到这条select语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的Custom Source,输出用的是Stream Collect Sink,且只输出一条结果。
[点击并拖拽以移动] ​

[点击并拖拽以移动] ​

explain 查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
== Abstract Syntax Tree ==         //抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Optimized Logical Plan ==      //优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])
   +- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Physical Execution Plan ==    //物理执行计划
Stage 13 : Data Source
    content : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

    Stage 15 : Operator
        content : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
        ship_strategy : HASH

结果展示

SQL Client支持两种模式来维护并展示查询结果:

table mode

在内存中物化查询结果,并以分页table形式展示。用户可以通过以下命令启用table mode:例如如下案例;

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.

Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                                                                                                          SQL Query Result (Table)
 Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:55:08.589

                      name                       cnt
                     Alice                         1
                      Greg                         1
                       Bob                         2



Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

changelog mode

不会物化查询结果,而是直接对continuous query产生的添加和撤回retractions结果进行展示:如下案例中的-表示撤回消息

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.

Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                                                                                                        SQL Query Result (Changelog)
 Table program finished.                                                                                                                                                                                               Updated: 16:58:05.777

 +/-                      name                       cnt
   +                       Bob                         1
   +                     Alice                         1
   +                      Greg                         1
   -                       Bob                         1
   +                       Bob                         2



Q Quit                                                                        + Inc Refresh                                                                 O Open Row
R Refresh                                                                     - Dec Refresh

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

Environment Files

CREATE TABLE 创建表DDL语句:

Flink SQL> CREATE TABLE pvuv_sink (
>     dt VARCHAR,
>     pv BIGINT,
>     uv BIGINT
> ) ;
[INFO] Table has been created.

SHOW TABLES 查看所有表名

Flink SQL>  show tables;
pvuv_sink

DESCRIBE 表名 查看表的详细信息;

Flink SQL>  describe pvuv_sink;
root
 |-- dt: STRING
 |-- pv: BIGINT
 |-- uv: BIGINT

插入等操作均与关系型数据库操作语句一样,省略N个操作

Restful API

接下来我们演示如何通过rest api来提交jar包和执行任务。
[点击并拖拽以移动] ​

通过Show Plan可以看到执行图
[点击并拖拽以移动] ​

提交之后的操作,取消的话点击页面的Cancel Job

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1333200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C->Cpp】深度解析#由C迈向Cpp(2)

目录 &#xff08;一&#xff09;缺省参数 全缺省参数 半缺省参数 缺省参数只能在函数的声明中出现&#xff1a; 小结&#xff1a; &#xff08;二&#xff09;函数重载 函数重载的定义 三种重载 在上一篇中&#xff0c;我们从第一个Cpp程序为切入&#xff0c;讲解了Cpp的…

纯HTML代码实现给图片增加水印并下载保存到本地

<!DOCTYPE html> <html> <head><meta charset"utf-8"><meta name"viewport" content"widthdevice-width, initial-scale1, maximum-scale1, user-scalableno"/><title>图片水印打码工具-宋佳乐博客</tit…

智能优化算法应用:基于白鲸算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于白鲸算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于白鲸算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.白鲸算法4.实验参数设定5.算法结果6.参考文献7.MA…

vue3 新项目 - 搭建路由router

创建router/index 文件 main.ts 安装 router 然后 在 app下面 去 设置 路由出口

P6 音频格式—— AAC

目录 前言 01 AAC是什么&#xff1f; 02 为什么需要进行AAC进行音频压缩处理&#xff1f; 03 AAC的特点以及优势 04 AAC格式详解&#xff1a; 4.1. ADIF的数据结构&#xff1a; 4.1.1 ADIF Header具体的表格: 4.2. ADTS的结构&#xff08;重点&#xff09;&#xff1a; …

项目管理4321方法论

文章目录 一、项目立项准备&#xff08;4步&#xff09;case1、识别价值---解决背后痛点的才是价值&#xff0c;价值是做任何事情的出发点case2、明确目标---支撑价值实现的&#xff0c;目标是 具体/可衡量/可达到/相关性/有时限的case3、识别干系人---找对人才能做对事&#x…

MYSQL函数\约束\多表查询\事务

函数 字符串函数 数值函数 mod就是取余 日期函数 流程函数 约束 外键约束 删除更新\外键 多表查询 多表关系 一对多 多对多 一对一 多表查询 内连接 select e.name d.name from emp e join dept d on e.id d.id; 外连接 select emp.*, d.name from emp left join tm…

堆与二叉树(下)

接着上次的&#xff0c;这里主要介绍的是堆排序&#xff0c;二叉树的遍历&#xff0c;以及之前讲题时答应过的简单二叉树问题求解 堆排序 给一组数据&#xff0c;升序&#xff08;降序&#xff09;排列 思路 思考&#xff1a;如果排列升序&#xff0c;我们应该建什么堆&#x…

【贪心】买卖股票的最佳时机含手续费

/** 贪心&#xff1a;每次选取更低的价格买入&#xff0c;遇到高于买入的价格就出售(此时不一定是最大收益)。* 使用buy表示买入股票的价格和手续费的和。遍历数组&#xff0c;如果后面的股票价格加上手续费* 小于buy&#xff0c;说明有更低的买入价格更新buy。如…

面向船舶结构健康监测的数据采集与处理系统(一)系统架构

世界贸易快速发展起始于航海时代&#xff0c;而船舶作为重要的水上交通工具&#xff0c;有 其装载量大&#xff0c;运费低廉等优势。但船舶在运营过程中出现的某些结构处应力值 过大问题往往会给运营部门造成重大的损失&#xff0c;甚至造成大量的人员伤亡和严重 的环境污染…

Fastjson 常用语法

一.Json数据格式回顾 1.1 什么是json JSON:(JavaScript Object Notation, JS 对象简谱) 是一种轻量级的数据交换格式。它基于 ECMAScript(欧洲计算机协会制定的js规范)的一个子集&#xff0c;采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得 JSO…

现代控制理论-李雅普诺夫

现代控制理论-李雅普诺夫 单输入单输出系统&#xff08;BIBO&#xff09;的系统函数如下&#xff1a; 则&#xff0c;该系统的能控标准型&#xff08;能空性&#xff09;为&#xff1a; 能观性&#xff1a; 李雅普诺夫下的稳定性&#xff1a; 李雅普诺夫下的渐进稳定性&a…

AIGC:大语言模型LLM的幻觉问题

引言 在使用ChatGPT或者其他大模型时&#xff0c;我们经常会遇到模型答非所问、知识错误、甚至自相矛盾的问题。 虽然大语言模型&#xff08;LLMs&#xff09;在各种下游任务中展示出了卓越的能力&#xff0c;在多个领域有广泛应用&#xff0c;但存在着幻觉的问题&#xff1a…

Unity动画系统学习笔记(二)根运动、动画事件与状态机行为

一、根运动 在学习根运动前需要了解两个名词&#xff1a; 身体变换&#xff1a;身体变换是角色的质心。它用于 Mecanim 的重定向引擎&#xff0c;并提供最稳定的移位模型。身体方向是相对于 Avatar T 形姿势的下身和上身方向的平均值。身体变换和方向存储在动画剪辑中&#x…

使用VisualStutio2022开发第一个C++程序

使用VisualStudio2022创建C项目 第一步&#xff1a;新建C的控制台应用 第二步&#xff1a;填写项目名称和代码存放位置&#xff0c;代码的存放目录不要有中文名 第三步:点击创建&#xff0c;VisualStudio会自动开始帮我们创建项目 第四步&#xff1a;项目创建好以后&…

【PostGIS】PostgreSQL15+对应PostGIS安装教程及空间数据可视化

一、PostgreSQL15与对应PostGIS安装 PostgreSQL15安装&#xff1a;下载地址PostGIS安装&#xff1a;下载地址&#xff08;选择倒数第二个&#xff09; 1、PostgreSQL安装 下载安装包&#xff1b;开始安装&#xff0c;这里使用默认安装&#xff0c;一直next直到安装完成&…

接口测试的持续集成的工具(git代码管理工具,jenkins持续集成)

持续集成的概念&#xff1a;大白话就是持续的做一件事情&#xff0c;使其使用起来更加流畅&#xff1b;结合测试来讲就是说用工具管理好代码的同时&#xff0c;使代码运行的更加自动以及智能&#xff1b;提升测试效率。 ⽹址&#xff1a;https://git-scm.com/downloads 长这个…

SpringSecurity6 | 失败后的跳转

✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏: MySQL学习 🥭本文内容: SpringSecurity6 | 失败后的跳转 📚个人知识库: Leo知识库,欢迎大家访问 学习…

【网络安全/CTF】unseping 江苏工匠杯

该题考察序列化反序列化及Linux命令执行相关知识。 题目 <?php highlight_file(__FILE__);class ease{private $method;private $args;function __construct($method, $args) {$this->method $method;$this->args $args;}function __destruct(){if (in_array($thi…