Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版
46、Flink 的table api与sql之配项列表及示例
文章目录
- Flink 系列文章
- 一、Flink 指标体系
- 2、Scope 范围
- 1)、用户范围
- 2)、系统范围System Scope
- 3)、所有变量列表
- 4)、用户变量
- 3、Reporter
- 4、System metrics
- 1)、CPU
- 2)、Memory
- 3)、Threads
- 4)、GarbageCollection
- 5)、ClassLoader
- 6)、Network
- 7)、Default shuffle service
- 8)、Cluster
- 9)、Availability
- 10)、Checkpointing
- 11)、State Access Latency
- 12)、RocksDB
- 13)、State Changelog
- 14)、IO
- 15)、Connectors
- 16)、System resources
- 17)、预测执行
- 5、End-to-End latency tracking 延迟跟踪
- 6、State access latency tracking 延迟跟踪
- 7、REST API integration
- 1)/jobmanager/metrics示例
- 2) taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2示例
- 3)/taskmanagers/metrics?get=metric1,metric2示例
- 4)/taskmanagers/metrics?get=metric1,metric2&agg=min,max示例
- 8、Dashboard integration
本文简单的介绍了Flink 的指标体系的第二部分,即指标的scope、报告、系统指标、跟踪、api和dashboard的集成示例。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版
本文依赖nc、flink集群能正常使用。
本文分为6个部分,即scope、报告、系统指标、跟踪、api和dashboard集成。
本文的示例是在Flink 1.17版本中运行。
一、Flink 指标体系
Flink暴露了一个度量系统,允许收集度量并将其公开给外部系统。
2、Scope 范围
本部分的示例比较简单,不再提供具体的验证内容。
每个metric 度量都被分配了一个标识符和一组key-value对,在这些key-value对下将报告度量。
标识符基于3个组件:注册度量时的用户定义名称、可选的用户定义范围和系统提供的范围。例如,如果A.B是系统作用域,C.D是用户作用域,E是名称,那么度量的标识符将是A.B.C.D.E。
您可以通过在conf/flink-conf.yaml中设置metrics.scope.delimiter键来配置用于标识符的分隔符(默认值:.)。
1)、用户范围
您可以通过调用MetricGroup#addGroup(String name)、MetricGroup#addGroup(int name) 或MetricGroup#addGroup(String key, String value)来定义用户作用域。这些方法影响MetricGroup#getMetricIdentifier和MetricGroup#getScopeComponents返回的内容。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
2)、系统范围System Scope
系统范围包含有关度量的上下文信息,例如它在哪个任务中注册,或者该任务属于哪个作业。
应该包括哪些上下文信息可以通过在conf/flink-conf.yaml中设置以下键来配置。这些键中的每一个都需要一个格式字符串,该字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),这些常量和变量将在运行时被替换。
- metrics.scope.jm
Default: .jobmanager
应用于job manager范围内的所有指标 - metrics.scope.jm-job
Default: .jobmanager.<job_name>
应用于 job manager and job范围内的所有度量 - metrics.scope.tm
Default: .taskmanager.<tm_id>
应用于task manager范围内的所有度量 - metrics.scope.tm-job
Default: .taskmanager.<tm_id>.<job_name>
应用于范围为task manager and job的所有度量 - metrics.scope.task
Default: .taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
应用于task范围内的所有度量 - metrics.scope.operator
Default: .taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
应用于作用域为operator的所有度量
变量的数量或顺序没有限制。变量区分大小写。
操作员度量的默认作用域将产生类似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的标识符
如果还希望包含任务名称但省略task manager信息,则可以指定以下格式:
metrics.scope.operator: .<job_name>.<task_name>.<operator_name>.<subtask_index>
这可以创建标识符localhost localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.
对于此格式字符串,如果同一作业同时运行多次,可能会发生标识符冲突,从而导致度量数据不一致。因此,建议使用通过包括id(例如<job_id>)或通过为作业和运算符分配唯一名称来提供一定程度的唯一性的格式字符串。
3)、所有变量列表
- JobManager:
- TaskManager: , <tm_id>
- Job: <job_id>, <job_name>
- Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
- Operator: <operator_id>,<operator_name>, <subtask_index>
对于Batch API, <operator_id> = <task_id>.
4)、用户变量
您可以通过调用MetricGroup#addGroup(String key, String value)来定义用户变量。此方法会影响MetricGroup#getMetricIdentifier、MetricGroup#getScopeComponents和MetricGroup#getAllVariables()返回的内容。
用户变量不能用于范围格式。
3、Reporter
Flink 支持用户将 Flink 的各项运行时指标发送给外部系统。
有关如何设置Flink的度量报告程序的信息,请查看47、Flink 的指标报告介绍及示例。
4、System metrics
默认情况下,Flink会收集几个指标,这些指标可以深入了解当前状态。本节是所有这些指标的参考。
下表通常有5列:
- “Scope”列描述了用于生成系统范围的scope format。例如,如果单元格包含“Operator”,则使用“metrics.scope.Operator”的scope format。如果单元格包含多个值,用斜线分隔,则会多次报告不同实体的metrics ,例如作业管理器和任务管理器。
- (optional)“Infix”列描述了将哪个Infix附加到system scope。
- “Metrics”列列出了为给定 scope and infix注册的所有度量的名称。
- “Description”列提供了有关给定度量的测量信息。
- “Type”列描述了用于测量的度量类型。
请注意, infix/metric名称列中的所有点仍受“metrics.demitter”设置的约束。
因此,为了推断metric identifier:
1、采用基于“Scope”列的scope-format
2、将值附加到“Infix”列中(如果存在),并说明“metrics.demitter”设置
3、附加metric 名称。
1)、CPU
2)、Memory
与内存相关的指标要求Oracle的内存管理(也包含在OpenJDK的Hotspot实现中)到位。在使用其他JVM实现(例如IBM’s J9)时,某些度量可能不会公开。
3)、Threads
4)、GarbageCollection
5)、ClassLoader
6)、Network
不推荐:使用默认的Default shuffle service 中的指标
7)、Default shuffle service
与使用netty网络通信的任务执行器之间的数据交换相关的度量。
8)、Cluster
9)、Availability
此表中的指标可用于以下每个作业状态:INITIALIZING、CREATED、RUNNING、RESTARTING、CANCELLING、FAILING。是否报告这些指标取决于metrics.job.status.enable设置。
这些度量的语义可能会在以后的版本中发生变化。
实验阶段功能
当作业处于RUNNING状态时,此表中的指标提供了有关作业当前正在执行的操作的其他详细信息。是否报告这些指标取决于metrics.job.status.enable设置。
在以下情况下,作业被视为正在部署任务:
- 对于流作业,任何任务都处于“正在部署”状态
- 对于批处理作业,如果至少有一个任务处于展开状态,并且没有INITIALIZING/RUNNING任务
10)、Checkpointing
对于失败的检查点,度量是在尽最大努力的基础上更新的,可能不准确。
11)、State Access Latency
12)、RocksDB
某些RocksDB本机指标可用,但默认情况下已禁用,您可以在此处找到完整的文档
13)、State Changelog
这些指标只能通过报告器获得。
14)、IO
15)、Connectors
- Kafka Connectors
- Kinesis 源
- Kinesis 接收器
- HBase Connectors
16)、System resources
默认情况下,系统资源报告处于禁用状态。启用metrics.system-resource后,以下列出的其他度量将在Job和TaskManager上可用。系统资源度量被定期更新,并且它们呈现配置的间隔(metrics.System resource probing interval)的平均值。
系统资源报告要求类路径上存在一个可选的依赖项(例如,位于Flink的lib目录中):
- com.github.oshi:oshi核心:6.1.5(根据MIT license授权)
包括它的可传递依赖项:
- net.java.dev.jna:jna平台:jar:5.10.0
- net.java.dev.jna:jar:5.10.0
这方面的故障将报告为警告消息,如SystemResourcesMetricInitializer在启动期间记录的NoClassDefFoundError。
- System CPU
- System memory
- System network
17)、预测执行
以下指标可以用来衡量预测执行的有效性。
5、End-to-End latency tracking 延迟跟踪
Flink允许跟踪在系统中传输的记录的延迟。默认情况下,此功能处于禁用状态。要启用延迟跟踪,必须在Flink配置或ExecutionConfig中将latencyTracingInterval设置为正数。
在latencyTracingInterval,源将周期性地发出一个特殊记录,称为LatencyMarker。标记包含从记录在源处发出的时间开始的时间戳。延迟标记无法超过常规用户记录,因此,如果记录在operator面前排队,则会增加标记跟踪的延迟。
延迟标记并没有考虑用户记录在运算符中花费的时间,因为它们正在绕过它们。特别是,标记没有考虑记录在窗口缓冲区中花费的时间。只有当operator无法接受新记录,因此他们正在排队时,使用标记测量的延迟才会反映这一点。
LatencyMarketers用于导出拓扑的源和每个下游操作符之间的延迟分布。这些分布被报告为直方图度量。这些分布的粒度可以在Flink配置中进行控制。对于最高粒度的子任务,Flink将导出每个源子任务和每个下游子任务之间的延迟分布,这将导致直方图的二次(就并行性而言)数量。
目前,Flink假设集群中所有机器的时钟都是同步的。我们建议设置自动时钟同步服务(如NTP),以避免错误的延迟结果。
警告启用延迟度量可能会显著影响集群的性能(尤其是子任务粒度)。强烈建议仅将它们用于调试目的。
6、State access latency tracking 延迟跟踪
Flink还允许跟踪标准Flinkstate-backends或从AbstractStateBackend扩展的自定义state-backends的keyed state访问延迟。默认情况下,此功能处于禁用状态。要启用此功能,必须在Flink配置中将state.backend.latency-track.keyed-state-enabled设置为true。
启用跟踪keyed state访问延迟后,Flink将对每N次访问的状态访问延迟进行采样,其中N由state.backend.latency-track.sample-interval定义。此配置的默认值为100。较小的值将获得更准确的结果,但由于采样频率更高,因此对性能的影响更大。
由于此延迟度量的类型为直方图,state.backend.latency-track.history-size将控制历史记录值的最大数量,默认值为128。此配置的较大值将需要更多的内存,但将提供更准确的结果。
警告启用状态访问延迟度量可能会影响性能。建议仅将它们用于调试目的。
7、REST API integration
可以通过监控REST API查询度量。
下面是可用endpoints的列表,其中包含一个示例JSON响应。
所有endpoints均为示例形式http://hostname:8081/jobmanager/metrics,
下面我们只列出URL的路径部分。
例如,<>中的值是变量http://hostname:8081/jobs//metrics必须被请求,
例如 http://192.168.10.49:8081/jobs/cb4443fd87ed97873b55be1bdefede30/metrics.
特定实体的请求度量:
- /jobmanager/metrics
- /taskmanagers/<taskmanagerid>/metrics
- /jobs/<jobid>/metrics
- /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
- 示例
通过日志或web ui界面可以很容易找到jobid。
下图为示例性图示
本示例下面的参数如:
jobid:bb741e7e46d97541a83a492c948e000d
taskmanagerid:192.168.10.42:42933-a2a682
subtaskindex:0
vertexid:cbc357ccb763df2852fee8c4fc7d55f2
## 1、/jobmanager/metrics
http://192.168.10.41:9081/jobmanager/metrics
[{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Time"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"taskSlotsAvailable"},{"id":"taskSlotsTotal"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Time"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Count"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"numRegisteredTaskManagers"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"numRunningJobs"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"}]
## 2、/taskmanagers/<taskmanagerid>/metrics
http://192.168.10.41:9081/taskmanagers/192.168.10.42:42933-a2a682/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]
## 3、/jobs/<jobid>/metrics
http://192.168.10.41:9081/jobs/bb741e7e46d97541a83a492c948e000d/metrics
[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointExternalPath"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"lastCheckpointProcessedData"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]
## 4、/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
## 在作业详情页面中,找到并点击 "Task Managers" 选项卡。
## 在 "Task Managers" 页面中,您可以查看每个 Task Manager 的详细信息,包括其分配给该 Task Manager 的任务(即 vertex)及其 ID。
## 或者在chrome中右击检查中查看vertexid,由于下面的链接内容太多,仅仅截图展示
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics
在相应类型的所有实体中聚合的请求度量:
- /taskmanagers/metrics
- /jobs/metrics
- /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
- 示例
通过日志或web ui界面可以很容易找到jobid。
本示例下面的参数如:
jobid:bb741e7e46d97541a83a492c948e000d
vertexid:cbc357ccb763df2852fee8c4fc7d55f2
## 1、/taskmanagers/metrics
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]
## 2、/jobs/metrics
http://192.168.10.41:9081/jobs/metrics
[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointExternalPath"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"lastCheckpointProcessedData"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]
## 3、/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
## 在作业详情页面中,找到并点击 "Task Managers" 选项卡。
## 在 "Task Managers" 页面中,您可以查看每个 Task Manager 的详细信息,包括其分配给该 Task Manager 的任务(即 vertex)及其 ID。
## 或者在chrome中右击检查中查看vertexid,由于下面的链接内容太多,仅仅截图展示
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics
在相应类型的所有实体的子集上聚合的请求度量:
- /taskmanagers/metrics?taskmanagers=A,B,C
- /jobs/metrics?jobs=D,E,F
- /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
- 示例
本示例下面的参数如:
jobid:bb741e7e46d97541a83a492c948e000d
taskmanagerid1:192.168.10.42:42933-a2a682
taskmanagerid2:192.168.10.43:38542-8d626d
taskmanagerid3:192.168.10.44:43904-9a6f04
vertexid:cbc357ccb763df2852fee8c4fc7d55f2
## 1、/taskmanagers/metrics?taskmanagers=A,B,C
http://192.168.10.41:9081/taskmanagers/metrics?taskmanagers=192.168.10.42:42933-a2a682,192.168.10.43:38542-8d626d,192.168.10.44:43904-9a6f04
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]
## 2、/jobs/metrics?jobs=D,E,F
http://192.168.10.41:9081/jobs/metrics?jobs=bb741e7e46d97541a83a492c948e000d
[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointExternalPath"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"lastCheckpointProcessedData"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]
## 3、/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
## 在作业详情页面中,找到并点击 "Task Managers" 选项卡。
## 在 "Task Managers" 页面中,您可以查看每个 Task Manager 的详细信息,包括其分配给该 Task Manager 的任务(即 vertex)及其 ID。
## 或者在chrome中右击检查中查看vertexid,由于下面的链接内容太多,仅仅截图展示
## 以下按照步骤显示subtask的metrics
### 1、查询所有的subtask,其内容如上图,不再赘述
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics
### 2、
### 上图中有id为 Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.KafkaConsumer.bytes-consumed-total、Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.failed-reauthentication-rate等,本示例就以查其2个子任务
http://192.168.10.41:9081/jobs/1b0700f7510a9dd7fd65aee66ad0382a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/subtasks/metrics?subtask= Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.KafkaConsumer.bytes-consumed-total,Source__TableSourceScan(table=[[default_catalog__default_database__alanchan_kafk.failed-reauthentication-rate
### 其内容太多,见下面截图
警告度量名称可以包含查询度量时需要转义的特殊字符。例如,“a_+b”将转义为“a%2B_b”。
应转义的字符列表:
1)/jobmanager/metrics示例
GET /jobmanager/metrics
http://192.168.10.41:9081/jobmanager/metrics
[{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Time"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"taskSlotsAvailable"},{"id":"taskSlotsTotal"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.GarbageCollector.PS_MarkSweep.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Time"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.GarbageCollector.PS_Scavenge.Count"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"numRegisteredTaskManagers"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"numRunningJobs"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"}]
2) taskmanagers//metrics?get=metric1,metric2示例
请求特定taskmanagers的 Metric 的值(未聚合)
GET taskmanagers//metrics?get=metric1,metric2
本示例下面的参数如:
taskmanagerid1:192.168.10.42:42933-a2a682
taskmanagerid2:192.168.10.43:38542-8d626d
taskmanagerid3:192.168.10.44:43904-9a6f04
## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]
## 2、获取指定taskmanager的指定指标(Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load)的值
http://192.168.10.41:9081/taskmanagers/192.168.10.42:42933-a2a682/metrics?get=Status.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load
[{
"id": "Status.JVM.Memory.Mapped.TotalCapacity",
"value": "0"
}, {
"id": "Status.JVM.CPU.Load",
"value": "0.001329512482145905"
}]
3)/taskmanagers/metrics?get=metric1,metric2示例
请求特定 Metric 的聚合值
GET /taskmanagers/metrics?get=metric1,metric2
GET /taskmanagers/metrics?get=metric1,metric2
## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]
## 2、获取taskmanagers的指定指标(Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load)的值
http://192.168.10.41:9081/taskmanagers/metrics?get=Status.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load
[
{
"id": "Status.JVM.Memory.Mapped.TotalCapacity",
"min": 0.0,
"max": 0.0,
"avg": 0.0,
"sum": 0.0
},
{
"id": "Status.JVM.CPU.Load",
"min": 5.440145745299967E-4,
"max": 0.0015120478111207314,
"avg": 9.553803717257513E-4,
"sum": 0.002866141115177254
}
]
4)/taskmanagers/metrics?get=metric1,metric2&agg=min,max示例
请求特定 Metric 的特定值的聚合值
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
## 1、获取taskmananger的指标
http://192.168.10.41:9081/taskmanagers/metrics
[{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.Flink.Memory.Managed.Total"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.Shuffle.Netty.UsedMemory"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.Shuffle.Netty.TotalMemory"},{"id":"Status.JVM.Memory.Metaspace.Committed"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.Shuffle.Netty.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Max"},{"id":"Status.Shuffle.Netty.TotalMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.Memory.Metaspace.Max"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.Shuffle.Netty.UsedMemorySegments"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.Flink.Memory.Managed.Used"},{"id":"Status.JVM.Memory.Metaspace.Used"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{"id":"Status.Shuffle.Netty.AvailableMemory"}]
## 2、获取taskmanagers的指定指标(Status.JVM.Memory.Mapped.TotalCapacity和Status.JVM.CPU.Load)的值
http://192.168.10.41:9081/taskmanagers/metrics?get=Status.JVM.Memory.Mapped.TotalCapacity,Status.JVM.CPU.Load&agg=min,max
[
{
"id": "Status.JVM.Memory.Mapped.TotalCapacity",
"min": 0.0,
"max": 0.0
},
{
"id": "Status.JVM.CPU.Load",
"min": 3.784653231147696E-4,
"max": 0.001422205366454916
}
]
8、Dashboard integration
为每个task or operator收集的度量也可以在仪表板中可视化。在作业的主页面上,选择度量选项卡。在顶部图形中选择一个任务后,可以使用添加度量下拉菜单选择要显示的度量。
- task指标列为<subtask_index><metric_name>。
- operator指标列为<subtask_index><operator_name><metric_name>。
每个度量将被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。所有图形每10秒自动更新一次,并在导航到另一个页面时继续更新。
可视化度量的数量没有限制;然而,只有数字度量可以被可视化。
以上,简单的介绍了Flink 的指标体系的第二部分,即指标的scope、报告、系统指标、跟踪、api和dashboard的集成示例。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版