Clickhouse分布式表引擎(Distributed)查询核心原理解析
- Clickhouse分布式表引擎(Distributed)写入核心原理解析
- Clickhouse分布式表引擎(Distributed)查询核心原理解析
与分布式数据写入时可以选择写分布式表或本地表有所不同,在面向集群查询数据的时候,只能通过Distributed表引擎实现。当Distributed表接收到SELECT查询的时候,它会依次查询每个分片的数据,再合并汇总返回。
多副本的路由规则
在查询数据的时候,如果集群中的一个shard,拥有多个replica,那么Distributed表引擎需要面临副本选择的问题。它会使用负载均衡算法从众多replica中选择一个,而具体使用何种负载均衡算法,则有load_balancing参数控制:
load_balancing = random/nearest_hostname/in_order/first_or_random
有如下四种负载均衡算法:
(1)random
random是默认的负载均衡算法。在Clickhouse的服务节点中,拥有一个全局计数器errors_count,当服务发生任何异常时,该计数累加1。而random算法会选择errors_count错误数量最少的replica。如果有多个replica的errors_count计数相同,则在它们之中随机选择一个。
(2)nearest_hostname
nearest_hostname可以看作random算法的变种,首选它会选择errors_count错误数量最少的replica,如果多个replica的errors_count计数相同,则选择集群配置中host名称与当前host最相似的一个。而相似的规则是以当前host名称为基准按字节逐位比较,找出不同字节数最少的一个。
例如:
CH-1-1
CH-1-2
只有一个字节不同
CH-1-1
CH-2-2
有两个字节不同
(3)in_order
in_order同样可以看作random算法的变种,首先它会选择errors_count错误数量最少的replica,如果多个replica的errors_count计数相同,则按照集群配置中的replica的定义顺序逐个选择。
(4)first_or_random
first_or_random可以看作in_order算法的变种,首先它会选择errors_count错误数量最少的replica,如果多个replica的errors_count计数相同,它首先会选择集群配置中第一个定义的replica,如果该replica不可用,则进一步随机选择一个其他的replica。
多分片查询的核心流程
分布式查询与分布式写入类似,同样本着谁执行谁负责的原则,它会由接收SELECT查询的Distributed表,并负责串联起整个过程。首先它会将针对分布式表的SQL语句,按照分片数量将查询拆分成若干个针对本地表的子查询,然后向各个分片发起查询,最后再汇总各个分片的查询结果。如果对分布式表按如下方式发起查询:
SELECT * FROM distributed_table
那么,它会将其转为如下形式之后,再发送到远端分片节点来执行:
SELECT * FROM local_table
SQL的执行计划
Distributed表引擎会将查询计划转换为多个分片的UNION联合查询,以此合并最终的查询结果。
整个执行计划从上至下大致分成两个步骤:
(1)查询各个分片数据
在上图所示的执行计划中,One和Remote步骤是并行执行的,它们分别负责了本地和远端分片的查询动作。其中,在One步骤会将SQL转换成对本地表的查询。在本例中,我们有一个分布式表test_shard_dist:
SELECT count() FROM test.shard_dist;
在集群中的其他节点的服务日志可以看到:
<Debug> executeQuery: (from [::ffff:192.168.80.121]:57412, initial_query_id: 822da000-3799-4ee0-8824-76cfc2db7537) SELECT count() FROM `test`.`test_shard_local`
其他节点在收到查询请求后,会将查询转换成对本地表的查询。
(2)合并返回结果
多个分片数据均查询返回后,按如下方法在发起分布式查询的节点上汇总返回结果:
<Debug> test.test_shard_local (SelectExecutor): Choose complete Aggregate projection _minmax_count_projection
使用Global优化分布式子查询
如果在分布式查询中使用子查询,可能会面临两难的局面。下面来看一个示例。假设有这样一张分布式表test_query_dist,它拥有两个分片,每个分片内的数据如下所示:
Linux121节点:
SELECT *
FROM test_query_local
Query id: 7c34fedf-64ae-4e68-b146-2931f6693a3a
┌─id─┬─repo─┐
│ 1 │ 100 │
│ 2 │ 100 │
│ 3 │ 100 │
└────┴──────┘
3 rows in set. Elapsed: 0.004 sec.
Linux122节点:
SELECT *
FROM test_query_local
Query id: 15932914-66d6-4e6b-a6f9-adc5b66dcb83
┌─id─┬─repo─┐
│ 3 │ 200 │
│ 4 │ 200 │
└────┴──────┘
2 rows in set. Elapsed: 0.002 sec.
其中,id代表用户的编号,repo代表仓库编号。如果现在有一项查询需求,要求找到同时拥有两个仓库的用户。
对于这种交集查询的需求,可以使用IN子查询,:
select uniq(id) from test_query_dist where repo = 100
and id in (select id from test_query_dist where repo = 200);
执行查询可以发现以下报错:
Received exception from server (version 22.4.6):
Code: 288. DB::Exception: Received from localhost:9000. DB::Exception: Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny'). You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.: While processing test_query_dist: While processing id IN (SELECT id FROM test_query_dist WHERE repo = 200). (DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED)
报错信息中提示了Double-distributed IN/JOIN subqueries is denied
,表示平方次的IN/JSON子查询是不允许的。这是什么问题呢?
这是由于上面的SQL语句在IN查询子句中,同样使用了分布式表查询:
select id from test_query_dist where repo = 200
所以在Linux122节点接收到这条SQL之后,它将再次向其他分片发起远程请求,如同所示:
因此可以得出结论,在IN查询子句使用分布式表的时候,查询请求会被放大N的平方倍,其中N等于集群内分片节点的数量,假如集群内有10个分片节点,则在一次查询的过程中,会最终导致100次的查询请求,这显示是不可接受的。这也是上面报错的原因,Clickhouse不允许使用这种方式,直接报错。
使用GLOBAL优化查询
为了解决查询放大的问题,可以使用GLOBAL IN或JSON进行优化。现在对刚才的SQL进行改造,为其增加GLOBAL修饰符:
select uniq(id) from test_query_dist where repo = 100
and id global in (select id from test_query_dist where repo = 200);
再次分析查询的核心过程,如下图所示:
整个过程由上至下大致分成5个步骤:
- 将IN子句单独踢出,发起了一次分布式查询。
- 将分布式表转local本地表后,分别在本地和远端分片执行查询
- 将IN子句查询的结果进行汇总,并放入一张临时的内存表进行保存
- 将内存表发送到远端分片节点
- 将分布式表转为本地表后,开始执行完整的SQL语句,IN子句直接使用临时内存表的数据
至此,整个核心流程结束。可以看到,在使用GLOBAL修饰符之后,CLickhouseHouse使用内存表临时保存了IN子句查询到的数据,并将其发送到远端分片节点,以此到达了数据共享的目的,从而避免了子查询放大的问题。由于数据会在网络间分发,所以需要特别注意临时表的大小,IN或者JOIN子句返回的数据不宜过大。