Clickhouse分布式表引擎(Distributed)写入核心原理解析
- Clickhouse分布式表引擎(Distributed)写入核心原理解析
- Clickhouse分布式表引擎(Distributed)查询核心原理解析
Distributed表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作。
从实体表层面来看,一张分片表由两部分组成:
- 本地表:通常以_local为后缀进行命名。本地表是承载数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片。
- 分布式表:通常以_dist为后缀进行命名。分布式表只能使用Distributed表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。
对于分布式表与本地表之间表结构的一致性检查,Distributed表引擎采用了读时检查的机制,这意味着如果它们的表结构不兼容,只有在查询时才会抛出错误,而在创建表时并不会进行检查。
定义形式
Distributed表引擎的定义形式如下所示:
ENGINE = Distributed(cluster, database, table, [,sharding_key])
其中,各个参数的含义分别如下:
- cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,它会使用集群的配置信息来找到相应的host节点。
- database和table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。
- sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个host节点的本地表。
比如如下分布式表:
CREATE TABLE test_shard_dist on cluster ch_cluster(
`id` Int8
) ENGINE=Distributed('ch_cluster', 'test', 'test_shard_local', rand());
上述建表语句将分片键指定为rand()函数,此时在数据写入时会根据随机函数的取值决定数据写入哪个分片。
值得注意的是,此时对应的本地表还未创建,所以从这里也可以看出来,Distributed表运用的是读时检查的机制,对创建分布式表和本地表的顺序并没有强制要求。
接着,创建本地表,一张本地表代表着一个数据分片。
CREATE TABLE test_shard_local on cluster ch_cluster(
`id` Int8
)ENGINE=MergeTree()
order by id
partition by id
在本次案例中,我们的ch_cluster由三个节点组成,因此会在三个节点上都创建出对应的本地表和分布式表。
分布式写入的核心流程
在向集群内的分片写入数据时,通常有两种思路:一种是借助外部计算系统,事先将数据均匀分片,再借由计算系统直接将数据写入Clickhouse集群的各个本地表。如下图所示:
在这个流程中,继续使用集群ch_cluster的示例,该集群由2个分片和0个副本组成。整个流程从上至下按照时间顺序进行,其大致分为5个步骤:
(1)在第一个分片节点写入本地分片数据
首先在Linux121节点,对分布式表test_shard_dist执行INSERT查询,尝试写入10、30、200、55四行数据。
执行后分布式表主要会做两件事情:
- 根据分片规则划分数据,将不同的数据标记划分给不同的节点插入。
- 将属于当前分片的数据直接写入本地表test_shard_local
(2)第一个分片建立远端连接,准备发送远端分片数据
将归至远端分片的数据以分区为单位,分别写入test_shard_all存储目录下的临时bin文件,如下图所示:
这里的1.bin是一个[increase_num].bin,有几个shard分片,则依次递增。
(3)第一个分片向远端分片发送数据
此时,Clickhouse会有另一组监听任务负责监听/test_shard_dist目录下的文件变化,这些任务负责将目录数据发送至远端分片。
<Debug> test.test_shard_dist.DirectoryMonitor: Started processing `/var/lib/clickhouse/data/test/test_shard_dist/shard2_replica1/1.bin` (2.00 rows, 2.00 B bytes)
<Debug> test.test_shard_dist.DirectoryMonitor: Started processing `/var/lib/clickhouse/data/test/test_shard_dist/shard3_all_replicas/2.bin` (2.00 rows, 2.00 B bytes)
从Linux121节点的Clickhouse日志中我们可以看到,其负责将分片数据发送到对应的远端节点上。我这里一共有三个节点,所以Linux121节点将分片数据发往对应的Linux122、Linux123节点,分别对应数据块1.bin和2.bin。
其中需要注意的是,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。
(4)第二个分片接收数据并写入本地
Linux122和Linux123节点确认建立与Linux121的连接
<Trace> Connection (linux121:9000): Connected to ClickHouse server version 22.4.6.
在接收到来自Linux121节点发送的数据后,将它们写入本地表中
Linux122节点服务日志:
<Debug> executeQuery: (from [::ffff:192.168.80.121]:56626, initial_query_id: 8579dc90-d839-4e63-8442-b31a8c7fe6cf) INSERT INTO test.test_shard_local (id) VALUES
<Trace> ContextAccess (default): Access granted: INSERT(id) ON test.test_shard_local
<Trace> test.test_shard_local (e43a2707-58a8-4bfb-8615-d9b175debdfe): Renaming temporary part tmp_insert_10_1_1_0 to 10_1_1_0
Linux123节点服务日志:
<Debug> executeQuery: (from [::ffff:192.168.80.121]:44996, initial_query_id: 8579dc90-d839-4e63-8442-b31a8c7fe6cf) INSERT INTO test.test_shard_local (id) VALUES
<Trace> ContextAccess (default): Access granted: INSERT(id) ON test.test_shard_local
<Trace> test.test_shard_local (e43a2707-58a8-4bfb-8615-d9b175debdfe): Renaming temporary part tmp_insert_-56_1_1_0 to -56_1_1_0.
接收数据–>执行写入命令->重命名临时分区->实际分区(这里原表是按id作为分区的,所以是-56_1_1_0)
(5)由第一个分片确认完成写入
最后,还是由Linux121节点确认所有的数据发送完毕:
<Trace> test.test_shard_dist.DirectoryMonitor: Finished processing `/var/lib/clickhouse/data/test/test_shard_dist/shard2_replica1/1.bin` (took 5 ms)
<Trace> test.test_shard_dist.DirectoryMonitor: Finished processing `/var/lib/clickhouse/data/test/test_shard_dist/shard3_all_replicas/2.bin` (took 5 ms)
至此,整个流程结束。
可以看到,在整个流程中,Distributed表负责所有分片的写入工作。本着谁执行谁负责的原则,在这个示例中,由Linux121节点的分布式表负责切分数据,并向所有其他分片节点发送数据。
在由Distributed表负责向远端分片发送数据时,有异步写入和同步写入两种模式:如果是异步写,则在Distributed表写完本地分片之后,INSERT查询就会返回成功写入的信息。如果是同步写,则在执行INSERT查询之后,会等待所有分片完成写入。
使用何种模式由参数insert_distributed_sync
参数控制,默认为false,即为异步写。如果将其设置为true,则可以进一步通过insert_distributed_timeout
参数控制同步等待的超时时间。
分布式写入时副本复制数据的核心流程
如果在集群的配置中包含了副本,那么除了刚才的分片写入流程之外,还会触发副本数据的复制流程。数据在多个副本之间,有两种复制实现方式:一种是继续借助Distributed表引擎,由它将数据写入副本。另一种则是借助ReplicatedMergeTree表引擎实现副本数据的分发。
(1)通过Distributed复制数据
在这种实现方式下,即使本地表不使用ReplicatedMergeTree表引擎,也能实现数据副本的功能。Distributed会同时负责分片和副本的数据写入工作,而副本数据的写入流程与分片逻辑相同。
比如对于下面的配置
<!-- 1个分片,1个副本 -->
<ch_cluster>
<shard>
<replica>
<host>linux121</host>
<port>9000</port>
</replica>
<replica>
<host>linuxxxx</host>
<port>9000</port>
</replica>
</shard>
</ch_cluster>
Linux121和linuxxxx互为副本,此时,按照上面介绍分布式数据写入的逻辑,Linux121的分布式表不仅负责将数据写入本地,还需要负责将数据发往副本所在的节点。
总结,用这种方式时,向Distributed表写入数据,它会负责将数据写入集群内的每个replica
细心的朋友应该能发现,在这种实现方案下,Distributed节点需要同时负责分片和副本的数据写入工作,它很可能会成为写入的单点瓶颈, 所有就有了接下来将要说明的第二种方案。
(2)通过ReplicatedMergeTree复制数据
如果在集群的shard配置中增加internal_replication
参数并将其设置为true(默认为false),那么Distributed表在该shard中只会选择一个合适的replica并对其写入数据。此时,如果使用ReplicatedMergeTree作为本地表的引擎,则在该shard内,多个replica副本之间的数据复制会交由ReplicatedMergeTree自己处理,不再由Distributed负责,从而为其减负。
在shard中选择replica的算法大致如下:首选,在Clickhouse的服务节点中,拥有一个全局及数据器errors_count。当服务出现任何异常时,该计数器累加1。接着,当一个shard内拥有多个replica时,选择errors_count错误最少的那个。
至此,我们介绍了关于Clickhouse使用分布式表写入的核心流程原理和副本复制原理。
但是在此还是需要注意,通过上面我们介绍可以知道,在使用Distributed表引擎写入时,由分布式表写入的节点负责将数据分片,并发送到集群中的其他节点中,这种情况在数据量比较大时,很有可能造成写入的单点瓶颈。同时加重网络中传输的数据量,容易造成网络拥塞。因此,在实际生产中,更建议直接写本地表。