引言
Presto是一款分布式SQL查询引擎,它能够在大规模数据集上实现快速、交互式的查询。本文将介绍Presto的基本概念并结合一些实际的代码示例,能够让的大家快速入门并在实际项目中应用。
官网:Launch Presto: Local download, JDBC, Docker or on AWS Cloud
1. Presto 简介
facebook开源的prestodb是一个分布式的sql引擎,支持多种数据源接入,采用统一的sql语句进行查询。内部实现也类似spark,将这个查询分为分析、优化、阶段划分、执行这些步骤。
- Presto是由Facebook开发的分布式sql查询引擎,用来进行高速、实时的数据查询
- Presto的产生是为了解决Hive的MapReduce模型太慢且不能通过BI等工具展现HDFS的问题
- Presto是一个计算引擎,它不存储数据,通过丰富的connector获取第三方服务的数据,并支持扩展。可以通过连接Hive,来实现快速query hive table
- 可以跨数据源进行联合查询
2019年,prestodb分化为prestodb和prestosql,prestosql有原始团队维护,现改名为trino。分化近两年后,从生态上看,trino势头明显强过prestodb。例如,下面几点只有在trino中才有:
- 聚合下推支持
- join下推支持
- elasticsearch索引支持通配*
后面的研究都基于trino进行。
查询例子
# 联合查询hive的表和mysql的表
select * from hive.testdb.tableA a join mysql.testdb.tableB b
where a.id = b.id
show catalogs
show schemas
2. Presto 数据模型
Presto 是一款分布式 SQL 查询引擎,其数据模型基于表(Table)和架构(Schema)。Presto 不存储数据,而是通过连接各种数据源进行实时查询。以下是 Presto 的核心数据模型元素:
-
Schema(架构):
- Schema 是 Presto 中的顶层命名空间,用于组织和隔离表。每个表都属于一个特定的 Schema。
- 在 Presto 中,Schema 可以看作是一个数据库,不同的是,Presto 的 Schema 通常指向不同的数据源。
-
Table(表):
- Table 是 Presto 中的数据存储单元。每个表都属于一个特定的 Schema。
- Presto 支持从各种数据源(如 Hive、MySQL、PostgreSQL 等)中的表执行查询。
-
Column(列):
- 表中的每一列代表了数据的一个属性。列定义了数据的类型,如整数、字符串、日期等。
- 查询时,可以引用表中的特定列以检索相应的数据。
-
Row(行):
- 表中的每一行代表了一条记录。每行中的数据按列排列,形成一个记录的完整集合。
-
Connector(连接器):
- 连接器是 Presto 中用于连接到不同数据源的插件。每个连接器负责实现 Presto 与特定数据源的交互。
- Presto 可以同时连接到多个数据源,能够跨越多种类型的数据存储执行查询。
-
Catalog(目录):
- 目录是 Presto 中用于组织连接器的逻辑单元。每个连接器都注册到一个或多个目录中。
- 通过目录,Presto 可以了解到底有哪些数据源可以查询。
-
Session(会话):
- 会话是 Presto 中的查询执行环境。每个查询都在一个独立的会话中执行,会话保持了查询的上下文信息。
- 在会话中可以设置各种配置选项,如查询超时时间、内存限制等。
Presto的数据模型相当灵活,用户可以通过 SQL 查询语言访问和操作各种数据源中的数据。通过连接器的引入,Presto 可以与不同类型的存储系统协同工作,提供统一的查询接口,使得数据分析变得更加方便和高效。
3. 聚合下推
聚合下推是我们最关心的特性。我们知道sql引擎本质上是在引擎侧对数据进行计算处理的,在大数据条件下,如果所有的数据都在引擎侧计算处理,性能比较差,稳定性也有问题,主要体现在:
- 大量数据的拉取,对源数据库造成的IO压力和网络开销
- 大量数据留存在引擎侧进行计算,引擎本身有OOM的风险
一般而已,sql引擎都支持一种要pushdown的优化策略。例如如果用户查询中包含对数据源数据的过滤语义,那么过滤操作可以下放给数据源进行,这个优化称为“过滤下推”。绝大多数sql引擎都支持过滤下推。此外还有projection下推(投影下推)。但是却极少有引擎支持聚合下推。
用户对数据的查询需求,其实往往是聚合分析场景。而一般的sql引擎只能将源数据拉取到引擎中进行聚合计算,区别可能仅仅是单机聚合或者分布式聚合。presto或spark,作为分布式sql引擎,利用MR思想,支持对大量数据进行分布式聚合。
然而,随着数据量的变大,即使是分布式聚合,依然要面临大量数据从数据源拉取的尴尬。我们知道绝大多数的数据库都是支持聚合操作的,而且许多列式数据库、时序数据库聚合查询的性能是极其强悍的。那么作为sql引擎是否能支持将聚合查询也下沉给数据库完成呢?
trino于2020/06发布的版本中声称在数据源接口层支持applyAggregation函数,这意味着数据库如果有能力完成聚合查询,可以实现该函数,提高查询性能,减少数据传输。Release 335 (14 Jun 2020) — Trino 436 Documentation
通过详细调研,trino目前仅有jdbc相关的数据源实现了applyAggregation。为了,验证和深入理解applyAggregation,尝试在elasticsearch数据源上实现聚合pushdown。
最终,实现了term aggregation和min/max/sum/avg/count(x)/count(*),下面是测试的基本功能,可以看到对于40000条记录的index,下推聚合的性能明显高于普通聚合:
The following simple test is based on an index of more than 40000 records.
The difference in query efficiency between the two methods can be figured out.
trino:default> select hostname, avg("values") from elasticsearch.default.slmday60 group by hostname;
hostname | _col1
---------------+-------------------
192.168.21.58 | 4992.663530635401
192.168.21.59 | 4989.727731732876
(2 rows)
Query 20210225_091409_00005_rb8ni, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0.53 [2 rows, 0B] [3 rows/s, 0B/s]
trino:default> set session elasticsearch.aggregation_pushdown_enabled=false;
SET SESSION
trino:default> select hostname, avg("values") from elasticsearch.default.slmday60 group by hostname;
hostname | _col1
---------------+-------------------
192.168.21.58 | 4992.663530635401
192.168.21.59 | 4989.727731732876
(2 rows)
Query 20210225_091431_00007_rb8ni, FINISHED, 1 node
Splits: 50 total, 50 done (100.00%)
2.80 [42.1K rows, 1.68MB] [15.1K rows/s, 617KB/s]] ]></ac:plain-text-body></ac:structured-macro><p>对比聚合下推和非聚合下推情况下的执行计划:</p><p>非聚合下推</p><ac:structured-macro ac:name="code" ac:schema-version="1" ac:macro-id="fd0d9b25-0b25-49a0-861b-ec070471aea2"><ac:plain-text-body><![CDATA[Fragment 0 [SINGLE]
Output layout: [hostname, avg]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[hostname, _col1]
│ Layout: [hostname:varchar, avg:double]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ _col1 := avg
└─ RemoteSource[1]
Layout: [hostname:varchar, avg:double]
Fragment 1 [HASH]
Output layout: [hostname, avg]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [hostname:varchar, avg:double]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
└─ Aggregate(FINAL)[hostname][$hashvalue]
│ Layout: [hostname:varchar, $hashvalue:bigint, avg:double]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ avg := avg("avg_0")
└─ LocalExchange[HASH][$hashvalue] ("hostname")
│ Layout: [hostname:varchar, avg_0:row(double, bigint), $hashvalue:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
└─ RemoteSource[2]
Layout: [hostname:varchar, avg_0:row(double, bigint), $hashvalue_1:bigint]
Fragment 2 [SOURCE]
Output layout: [hostname, avg_0, $hashvalue_2]
Output partitioning: HASH [hostname][$hashvalue_2]
Stage Execution Strategy: UNGROUPED_EXECUTION
Aggregate(PARTIAL)[hostname][$hashvalue_2]
│ Layout: [hostname:varchar, $hashvalue_2:bigint, avg_0:row(double, bigint)]
│ avg_0 := avg("values")
└─ ScanProject[table = elasticsearch:SCAN:slmday60, grouped = false]
Layout: [hostname:varchar, values:bigint, $hashvalue_2:bigint]
聚合下推
Fragment 0 [SINGLE]
Output layout: [hostname, _efgnrtd]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[hostname, _col1]
│ Layout: [hostname:varchar, _efgnrtd:double]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
│ _col1 := _efgnrtd
└─ RemoteSource[1]
Layout: [hostname:varchar, _efgnrtd:double]
Fragment 1 [SOURCE]
Output layout: [hostname, _efgnrtd]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
TableScan[elasticsearch:AGG:slmday60, grouped = false]
Layout: [hostname:varchar, _efgnrtd:double]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
hostname := hostname::varchar
_efgnrtd := _efgnrtd_0::double
多个group by,多个聚合函数都没有问题,也可以支持没有groupby情况下的聚合,例如:
select count(*)
from elasticsearch.default.slmday60
where "@timestamp" > TIMESTAMP '2020-04-13' and "@timestamp" < TIMESTAMP '2020-04-13 00:01:00'
count(x):使用value_count(field)聚合
count(*): 使用value_count("_id")聚合
4. 安装与配置
下面是简单安装的步骤,具体安装方式可能有所不同,请参考Presto官方文档Deploying Presto — Presto 0.284 Documentation。
# 下载Presto压缩包
wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.267/presto-server-0.267.tar.gz
# 解压
tar -xvf presto-server-0.267.tar.gz
# 进入Presto目录
cd presto-server-0.267
# 配置Presto节点
cp etc/node.properties{.example,}
# 配置连接器(例如Hive)
cp etc/catalog/hive.properties{.example,}
5. 启动 Presto 节点
# 启动Presto服务
bin/launcher start
6. Presto 实战示例
6.1 连接到 Presto
使用Presto CLI连接到Presto服务器:
# 连接到Presto
presto --server localhost:8080 --catalog hive --schema default
6.2 执行 SQL 查询
在Presto CLI中执行简单的SQL查询:
-- 查询Hive中的数据
SELECT * FROM test_db LIMIT 10;
6.3 连接其他数据源
Presto支持多种数据源,如MySQL、PostgreSQL等。首先,需要在etc/catalog
目录下配置相应的属性文件。以下是连接MySQL的示例:
# 复制MySQL配置文件
cp etc/catalog/mysql.properties{.example,}
编辑mysql.properties
,配置MySQL连接信息:
connector.name=mysql
connection-url=jdbc:mysql://192.168.101.32:3306/test
connection-user=root
connection-password=root123
然后,重新启动Presto节点:
bin/launcher restart
之后,就可以写sql查询不同数据源之间的数据了
# 联合查询hive的表和mysql的表
select * from hive.testdb.tableA a join mysql.testdb.tableB b
where a.id = b.id
show catalogs
show schemas
7. 结语
Presto的强大之处在于它能够无缝连接各种数据源,提供快速、交互式的数据分析能力。在实际项目中,结合Presto的灵活性,可以更方便地处理大规模数据集,加速数据分析和决策过程。希望这篇文章对大家了解和使用Presto有所帮助。