目录
- Presto 介绍
- Presto 架构
- Coordinator协调器
- 节点发现服务(discovery service)
- 工作节点
- 基于连接器的架构
- 查询执行模型
- 查询优化
- JVM 配置
- Presto Web UI
- Presto 内存管理
- Presto 任务并发性
- Presto 工作节点调度
- Presto 网络数据交换
- Presto 资源组
Presto 介绍
Presto 架构
Coordinator协调器:用于接受用户查询并管理工作节点以执行查询工作。
Worker工作节点:负责执行任务和处理数据。
协调器上通常会运行一个节点发现服务(discovery service),工作节点通过注册到此服务以加入集群。
客户端、协调器和工作节点之间的通信和数据传输完全通过基于 HTTP/HTTPS 的 RESTful API 调用。
Coordinator协调器
Presto 协调器负责接收用户 SQL 查询、解析查询语句、规划查询执行并管理工作节点。
协调器会跟踪每个工作节点的动态,并协调查询任务的执行。对于一条查询,协调器将创建一个包含多个 Stage(阶段)的逻辑模型。
一旦接收到一条 SQL 语句,协调器负责解析、分析、优化和调度查询任务在 Presto 工作节点上的执行。查询语句被翻译成一系列相连的任务(Task),这些任务被分发到各个工作节点上执行。在工作节点处理数据的同时,协调器会将结果抽取出来放到输出缓冲区中,并将缓冲区的内容暴露给客户端。一旦客户端读完输出缓冲区的内容,协调器就会代表客户端向工作节点请求更多的数据。另外,工作节点也在不断地与数据源交互并从中读取数据。最终,客户端不断地请求数据,工作节点则不断地从数据源读取数据并提供给客户端,直到查询执行结束。
节点发现服务(discovery service)
Presto 使用节点发现服务来发现集群中的所有节点。每个 Presto 实例在启动时都会注册到发现服务并定期发送心跳信号。
工作节点通常将节点发现服务的配置指向协调器的主机名和端口。
工作节点
工作节点负责执行协调器分配的任务并处理数据。工作节点使用连接器从数据源获取数据,它们相互之间也会交换中间数据。它们将最终的结果数据发送给协调器,由协调器负责收集来自各个工作节点的结果并发送给客户端。
在安装时,工作节点需要知道集群的节点发现服务所在的主机名或 IP 地址。工作节点会在启动时将自己注册到节点发现服务上,以便协调器向其分配任务。
基于连接器的架构
Presto 存储与计算分离的核心是基于连接器的架构,连接器为 Presto 提供了连接任意数据源的接口(服务提供者接口(service provider interface,SPI)。
协调器使用的是元数据、数据统计和数据定位接口,工作节点使用的是数据流接口。
查询执行模型
SQL 语句首先以文本形式提交到协调器,协调器解析和分析这条语句,之后创建一个由 Presto 内部数据结构表示的执行计划,叫作查询计划。
查询计划生成过程利用了元数据 SPI 和数据统计 SPI 来创建查询计划。也就是说,协调器会使用 SPI 直接连接到数据源,以收集有关表和其他元数据的信息。
协调器通过元数据 SPI 获取表、列和数据类型的信息。这些信息用于对查询进行语义校验、类型检查和安全检查。
统计 SPI 用于获取行数和表大小的信息,从而在计划期间进行基于代价的查询优化。
在创建分布式查询计划时会利用数据位置 SPI 来生成表内容的逻辑切片。切片是任务分配和并行的最小单位。
协调器将查询计划切分成 Stage,从而分配给集群中的多个工作节点进行并行处理,从而加快整体查询的执行速度。多个 Stage 会被组织成一棵依赖树。Stage 的数量依赖于查询的复杂度。例如,查询的表、返回的列、JOIN 语句、WHERE 条件、GROUP BY 操作和其他 SQL 语句都可能影响 Stage 的数量。
分布式执行计划定义了 Stage 和查询在 Presto 集群上执行的方式。协调器使用它在工作节点上进一步计划和调度任务。一个 Stage 通常包含一个或多个任务,每个任务则负责处理一小部分数据。
一个任务处理数据的单位是切片(split)。切片代表一个工作节点可以抽取并处理的一段底层数据,它是并行和任务分配的单位。连接器所执行的特定数据操作取决于底层的数据源。例如,Hive 连接器用文件的路径、读取偏移量和长度来描述切片信息,这些信息指明了所要处理文件的区域。
源 Stage 的任务以 page1的形式生产数据,每个 page 都是以列式存储格式表示的一系列行。这些 page 传输到下游的中间 Stage。Exchange 算子从上游 Stage 中读取数据,从而在不同 Stage 之间传输 page。
处理一条查询,协调器首先根据来自连接器的元数据创建切片列表。使用该切片列表,协调器开始在工作节点上调度任务,以获取其中的数据。在查询执行期间,协调器跟踪所有可用于处理的切片和任务在工作节点上执行的位置。一些任务完成了处理,并产生了很多供下游处理的切片,协调器就会继续调度更多的任务来处理它们。 一旦工作节点处理完所有切片,全部数据就可用。此时协调器会将结果返回给客户端。
查询优化
hive.collect-column-statistics-on-write=true “执行写操作的同时收集统计信息”
etc/config.properties
discovery-server.enabled=true 通常此属性在协调器上设置为 true。在所有的工作节点上都必须删除此属性,从而禁用它。
JVM 配置
etc/jvm.config
-server
-XX:+UseG1GC
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+HeapDumpOnOutOfMemoryError
-XX:-UseBiasedLocking
-Djdk.attach.allowAttachSelf=true
-Xms16G
-Xmx16G
-XX:G1HeapRegionSize=32M
-XX:ReservedCodeCacheSize=512M
-Djdk.nio.maxCachedBufferSize=2000000
设定 Xmx 属性的值来扩大 JVM 的最大内存分配池,使用 Xms 参数可以设定初始的最小内存分配池大小。通常建议将 Xmx 和 Xms 设定为相同值。通常推荐将 Xmx 和 Xms 都设置为系统总内存的 80%。
Presto 和 Apache Spark 不应该运行在同一组硬件上。
Presto Web UI
运行中的查询
Presto 集群中当前正在运行的查询总数,所有用户的查询都会计入.
队列中的查询
Presto 集群中所有用户在队列中等待的查询总数。基于资源组的配置,协调器调度在队列中等待的查询。
阻塞的查询
集群中阻塞的查询总数。由于缺少必要的切片或其他资源,阻塞的查询无法继续执行。
活动的工作节点
集群中活动的工作节点的数量。通过手动或自动扩展添加或删除任何工作节点时,这些节点都会注册到发现服务上,因此显示的数字会据此更新。
可运行的驱动
集群中可运行的驱动的平均数。
保留内存
Presto 中所有保留内存的量,以字节为单位。
行每秒
集群上运行着的所有查询每秒处理的总行数。
字节每秒
集群上运行着的所有查询每秒处理的总数据量,以字节为单位。
工作节点并行度
工作节点的并行度总量,也就是集群中所有查询运行时在所有工作节点上花费的线程 CPU 时间总量。
查询数量由 Presto 集群的配置决定。
Presto 内存管理
Presto 管理的内存被分为以下两种类型。
用户内存
用户查询中的聚合和排序等操作会影响用户内存的分配。
系统内存
系统内存的分配基于查询引擎自己的执行实现。缓冲区的读写和 shuffle、表扫描等操作会影响系统内存的分配。
query.max-memory-per-node
一个查询在单个工作节点上可以使用的用户内存的最大值。这些内存可用于处理聚合和输入数据分配等。
query.max-total-memory-per-node
一个查询在单个节点上可用的内存总量(包括用户内存和系统内存)的最大值,这个值必须大于 query.max-memory-per-node。当一个查询消耗的用户内存和系统内存总量超过了这个限制,它就会被终止。
query.max-memory
一个查询在全集群所有工作节点上可以使用的用户内存总量的最大值。
query.max-total-memory
一个查询可以在全集群所有节点上使用的内存总量(包括用户内存和系统内存)的最大值,因此这个值必须大于 query.max-memory。
如果一个查询因超过限制而被终止,系统将返回下列错误代码以说明原因。
EXCEEDED_LOCAL_MEMORY_LIMIT 表示内存使用超过了 query.max-memory-per-node 或 query.max-total-memory-per-node 的限制。
EXCEEDED_GLOBAL_MEMORY_LIMIT 表示内存使用超过了 query.max-memory 或 query.max-total-memory 的限制。
配置案例说明
1 个协调器;
10 个工作节点,通常每个工作节点都具有相同的系统规格;
每个工作节点拥有的物理内存:50GB;
jvm.config 在 -Xmx 中设置的 JVM 堆内存上限是 38GB;
query.max-memory-per-node:13GB;
query.max-total-memory-per-node:16GB;
memory.heap-headroom-per-node:9GB;
query.max-memory:50GB;
query.max-total-memory:60GB;
每个工作节点上的可用内存总量约 50GB,为操作系统、代理/后台进程以及系统上 JVM 外的组件保留了约 12GB 的内存,因此将 JVM 堆内存上限设为 38GB。
一个查询在集群整体可用的用户内存 query.max-memory 被设为 50GB。在讨论 max-memory 的同时,我们还需要考虑原始散列分区数(initial-hash-partitions),这个值在理想情况下应该小于等于工作节点的数量。
如果我们将原始散列分区数设为 8,max-memory 设为 50GB,则每个节点得到的用户内存配额就大约为 50GB/8 = 6.25GB。将节点本地的限制 max-memory-per-node 设为 13GB,在允许有数据倾斜的情况下,一个节点最多可以消耗平均内存的两倍。
为了防止死锁,你可以设置 query.low-memory-killer.policy 属性。可选的值是 total-reservation 或 total-reservation-on-blocked-nodes。选用 total-reservation 时,Presto 会终结集群中占用内存最多的查询,从而释放资源。另外,选用 total-reservation-on-blocked-nodes 时,系统会终止在阻塞的节点上使用最多内存的查询。
Presto 任务并发性
任务工作线程数
默认值是机器 CPU 数的 2 倍。例如,一台具有 2 个 6 核 CPU 的机器使用 2 × 6 × 2 = 24 个工作线程。如果你观察到所有的线程都在使用,但 CPU 使用率仍很低,则可以尝试借助 task.max-worker-threads 属性来增加线程数,以此提高 CPU 利用率和性能。建议尝试慢慢增加这个值,因为将其设置得过高会带来更高的内存使用和额外的上下文切换成本,反而会带来负面效果。
任务的算子并发数
Join 和 Aggregation 之类的算子通过本地数据分区和并行执行算子来实现并行处理。例如,数据先在本地根据 GROUP BY 的列进行分区,之后多个 Aggregation 算子会并行地进行处理。这些并行操作的默认并发数是 16,你可以通过设定 task.concurrency 属性来调整它。如果你运行很多并发的查询,上下文切换的成本可能会导致性能下降。对于只运行少量并发查询的集群,更高的并发数可以提高并行度并因此提高性能。
Presto 工作节点调度
通过调整某些调度相关属性的默认值来提高 Presto 集群的性能。通常有 3 个常见的配置可以调整:
- 每个任务的切片数;
- 每个节点的切片数;
- 本地调度策略
根据任务或节点调度切片
每个工作节点能处理的切片数量有上限,默认情况下其最大值是 100。你可以通过 node-scheduler.max-splits-per-node 属性调整这个值。当发现工作节点上的切片数已经达到此限制但仍没有充分利用资源时,你可以考虑调整这个值。提高该值通常会提高性能,特别是存在很多切片的情况下。此外,你还可以提高 node-scheduler.max-pending-splits-per-task 属性的值,这个属性的值不应该超过 node-scheduler.max-splits-per-node 属性的值。它确保在工作节点处理任务的同时,等待的任务可以被放入队列中。
本地调度策略
使用 node-scheduler.network-topology 属性设定切片调度策略,此属性的默认值 legacy 在调度切片时不会考虑数据位置。你可以将其修改为 flat,从而使用考虑数据位置的改进策略。Presto 调度器会使用队列 50% 的空间来调度本地切片。
当 Presto 安装并放置在 HDFS 数据节点上,或使用分布式缓存时,适合使用 flat 策略。将分布式存储中的数据缓存在工作节点上,因此,调度本地切片会提供更好的性能。
Presto 网络数据交换
并发性
Presto 的数据交换客户端负责请求上游任务产生的数据,它使用的默认线程数是 25。可以通过设定 exchange.client-threads 属性来修改这个值。使用更多的线程可以提高大规模和高并发配置集群的性能。这是因为在这些集群中,产生数据的速度更快,提高用于消费数据的并发性可以降低时延。更多的线程也会需要更多的内存。
缓冲区大小
在数据交换时,发送方和接收方会将收发的数据放在缓冲区中。发送方和接收方的缓冲区大小可以分别配置。
在发送方,任务将产生的数据写入缓冲区并等待下游的数据交换客户端进行请求。默认的缓冲区大小是 32MB,可以通过 sink.max-buffer-size 属性调整。提高这个值可能有助于提高大规模集群的吞吐量。
在接收方,在下游任务处理数据前,数据先放置在缓冲区中。这个缓冲区的默认大小也是 32MB 且可以通过 exchange.max-buffer-size 属性进行调整。设定较大的接收缓冲区可以从网络上获取更多数据来延缓背压(back pressure)的发生,从而提高查询性能,特别是大规模集群的性能。
Presto 资源组
资源组(resource group)是 Presto 中用于限制系统资源使用。资源组的配置包含两个方面:资源组属性和选择器规则。
资源组由 CPU 和内存限制、并发数限制、队列优先级和队列中查询的优先级权重等定义。
选择器规则允许 Presto 将接收到的查询请求分配到特定的资源组。
vim etc/resource-groups.properties
resource-groups.configuration-manager=file
resource-groups.config-file=etc/resource-groups.json
vim etc/resource-groups.json