Clickhouse的数据副本协同原理详解(借助ZK实现)
文章目录
- Clickhouse的数据副本协同原理详解(借助ZK实现)
- Clickhouse数据副本
- 副本的特点
- ReplicatedMergeTree原理解析
- 数据结构
- Zookeeper内的节点结构
- 元数据
- 判断标识
- 操作日志
- Entry日志对象的数据结构
- LogEntry
- MutationEntry
- 副本协同的核心流程
- INSERT的核心执行流程
- (1)创建第一个副本实例
- (2)创建第二个副本实例
- (3)向第一个副本实例写入数据
- (4)由第一个副本实例推送Log日志
- (5)第二个副本实例拉取Log日志
- (6)第二个副本实例向其他副本发起下载请求
- (7)第一个副本实例响应数据下载
- (8)第二个副本实例下载数据并完成本地写入
- ALTER的核心执行流程
- (1)修改共享元数据
- (2)监听共享元数据变更并各自执行本地修改
- (3)确认所有副本完成修改
Clickhouse数据副本
Clickhouse具有丰富的表引擎,而与副本相关的表引擎则有Replicated+*MergeTree来构成,如下图所示
换言之,只有使用了ReplicatedMergeTree复制表系列引擎,才能应用副本的能力。**ReplicatedMergeTree时MergeTree的派生引擎,它在MergeTree的基础上加入了分布式协同的能力。**如下图所示:
上图展示了两个节点利用Zookeepr进行副本协同复制的原理图。在MergeTree中,一个数据分区由开始创建到全部完成,会经历两类存储区域:
- 内存:数据首先会被写入内存缓冲区
- 本地磁盘:数据接着会被写入tmp临时目录分区,待全部完成后再将临时目录重命名为正式分区
ReplicatedMergeTree在上述基础上增加了Zookeeper的部分,它会进一步在Zookeeper内创建一系列的监听节点,并以此实现多个实例之间的通信。在整个通信中,Zookeeper并不会涉及表数据的传输。
副本的特点
作为数据副本的主要实现载体,ReplicatedMergeTree在设计上有一些显著特点:
- 依赖Zookeeper:在执行Insert和Alter查询的时候,ReplicatedMergeTree需要借助Zookeeper的分布式协同能力,以实现多个副本之间的同步。但是在查询副本的时候,并不需要ZK。
- 表级别的副本:副本是在表级别定义的,所以每张表的副本配置都可以按照它的实际需求进行个性化定义,包括副本的数量,以及副本在集群内的分布位置等。
- 多主架构:可以在任意一个副本上执行Insert和Alter查询,他们的效果是相同的。这些操作会借助Zookeeper的协同能力被分发至每个副本以本地形式执行。
- Block数据块:在执行Insert命令写入数据时,会依据max_insert_block_size的大小(默认1048576行)将数据切分成若干个Block数据块。所以Block数据块是数据写入的基本单元,并且具有写入的原子性和唯一性。
- 原子性:在数据写入时,一个Block快内的数据要么全部写入成功,要么全部失败
- 唯一性:在写入一个Block数据块的时候,会按照当前Block数据块的数据顺序、数据行和数据大小等指标,计算Hash信息摘要并记录在案。在此之后,如果某个待写入的Block数据块与之前已被写入的Block数据块拥有相同的Hash摘要(Block数据块内的数据顺序、数据大小和数据行均相同),则该Block数据块会被忽略。这项设计可以预防由异常原因引起的Block数据块重复写入的问题。
ReplicatedMergeTree原理解析
ReplicatedMergeTree作为复制表系列的基础表引擎,涵盖了数据副本最为核心的逻辑。
数据结构
在ReplicatedMergeTree的核心逻辑中,大量运用了Zookeeper的能力,以实现多个ReplicatedMergeTree副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和BlockID去重判断等。在执行Insert数据写入、Merge分区和MUTATION操作的时候,都会涉及与Zookeeper的通信。但是在通信的过程中,并不会涉及任何表数据的传输,在查询数据的时候也不会访问Zookeeper。
Zookeeper内的节点结构
ReplicatedMergeTree需要依靠Zookeeper的事件监听机制以实现各个副本之间的协同。所以,**在每张ReplicatedMergeTree表的创建过程中,它会以zk_path为根路径,在Zookeeper中为这张表创建一组监听节点。**按照作用的不同,监听节点可以大致分成如下几类:
元数据
- /metadata:保存元数据信息,包括主键、分区键、采样表达式等。
- /columns: 保存列字段信息,包括列名称和数据类型
- /replicas: 保存副本名称,对应设置参数中的replica_name
判断标识
- /leader_election: 用于主副本的选举工作,主副本会主导MERGE和MUTATION操作(Alter Delete 和 Alter Update)。这些任务在主副本完成之后再借助Zookeeper将消息事件分发至其他副本。
- **/blocks:**记录Block数据块的Hash信息摘要,以及对应的partition_id。通过Hash摘要能够判断Block数据块是否重复;通过partition_id,则能够找到需要同步的数据分区。
- **/block_numbers:**按照分区的写入顺序,以相同的顺序记录partition_id。各个副本在本地进行Merge时,都会依照相同的block_numbers顺序进行
- **/quorum:**记录quorum的数量,当至少有quorum数量的副本写入成功后,整个写操作才算成功。quorum的数据由insert_quorum参数控制,默认值为0。
操作日志
- /log: 常规操作日志节点(INSERT、MERGE和DROP PARTITION),它是整个工作机制中最为重要的一环。保存了副本需要执行的任务指令。log使用了Zookeeper的持久顺序型节点,每条指令的名称以log-为前缀递增,利用log-0000000000、log-0000000001等。每一个副本实例都会监听/log节点,当有新的指令加入时,它们会把指令加入副本各自的任务队列,并执行任务。
- /mutations: MUTATION操作日志节点,作用与log日志类似,当执行ALTER DELETE和ALTER UPDATE查询时,操作指令会被添加到这个节点。mutations同样使用了Zookeeper的持久顺序型节点,但是它的命名没有前缀,每条指令直接以递增数字的形式保存,例如0000000000、0000000001等。
- /replicas/{replica_name}/*: 每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令,其中较为重要的节点有如下几个:
- /queue:任务队列节点,用于执行具体的操作任务。当副本从/log或/mutations节点监听到操作指令时,会将执行任务添加到该节点下,并基于队列执行
- /log_pointer:log日志指针节点,记录了最后一次执行的log日志下标信息,例如log_pointer:4对应log/log-0000000003
- mutation_pointer:mutations日志指针节点,记录了最后一次执行的mutations日志名称,例如mutation_pointer:0000000000对应了/mutations/0000000000。
Entry日志对象的数据结构
从上一小节的介绍中能够得知,ReplicatedMergeTree在Zookeeper中有两组非常重要的父节点,那就是/log和/mutations。它们是分发操作指令的信息通道,而发送指令的方式,则是为这些父节点添加子节点。所有的副本实例,都会监听父节点的变化,当有子节点被添加时,它们能够实时感知。
这些被添加的子节点在Clickhouse中被统一抽样为Entry对象,而具体实现则由LogEntry和MutationEntry对象承载,分别对应/log和/mutations节点。
LogEntry
LogEntry用于封装/log的子节点信息,它拥有如下几个核心属性:
- source replica:发送这条Log指令的副本来源,对应replica_name
- type:操作指令类型,主要有get、merge、和mutation三种,分别对应从远程副本下载分区、合并区分和MUTATION操作。
- block_id:当前分区的BlockID,对应/blocks路径下子节点的名称
- partition_name:当前分区目录的名称
MutationEntry
MutationEntry用于封装/mutations的子节点信息,它同样拥有如下几个核心属性:
- source replica:发送这条MUTATION指令的副本来源,对应replica_name
- commands:操作指令,主要有ALTER DELETE 和 ALTER UPDATE
- mutation_id:MUTATION操作的版本号
- partition_id:当前分区目录的ID
以上就是Entry日志对象的数据结构信息,在接下来将要介绍的核心流程中,将会看到它们的身影。
副本协同的核心流程
副本协同的核心流程主要有INSERT、MERGE、MUTATION和ALTER四种,分别对应了数据写入,分区合并,数据修改和元数据修改。INSERT和ALTER查询是分布式执行的。 借助Zookeeper的事件通知机制,多个副本之间会自动进行有效协同,但是它们不会使用Zookeeper存储任何分区数据。
INSERT的核心执行流程
当需要在ReplicatedMergeTree中执行INSERT查询以写入数据时,即会进入INSERT核心流程。整体流程如下图所示:
(1)创建第一个副本实例
假设首先从Linux121节点开始,对Linux121节点执行下面的语句后,会创建第一个副本实例
CREATE TABLE test.replicated_test_1
(
`id` String,
`price` Float64,
`create_time` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_test_1', 'linux121')
PARTITION BY toYYYYMM(create_time)
ORDER BY id;
在创建的过程中,ReplicatedMergeTree会进行一些初始化操作,例如:
- 根据zk_path初始化所有的Zookeeper节点。
- 在/replicas/节点下注册自己的副本实例Linux121
这里我因为已经在Linux122节点下创建了对应表副本,所以注册的replicas包括linux121和linux122
-
启动监听任务,监听/log日志节点
-
参与副本选举,选举出主副本,选举的方式是向/leader_election/插入子节点,第一个插入成功的副本就是主副本
(2)创建第二个副本实例
接着,在Linux122节点执行下面的语句,创建第二个副本实例。表结构和zk_path需要与第一个副本相同,而replica_name则需要设置成Linux122的域名
CREATE TABLE test.replicated_test_1
(
`id` String,
`price` Float64,
`create_time` DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_test_1', 'linux122')
PARTITION BY toYYYYMM(create_time)
ORDER BY id;
在创建过程中,第二个ReplicatedMergeTree同样会进行一些初始化操作,例如:
- 在/replicas/节点下注册自己的副本实例Linux122,如上面图中所示。
- 启动监听任务,监听/log日志节点
- 参与副本选举,选举出主副本。 在这个例子中,Linux121副本成为主副本
(3)向第一个副本实例写入数据
现在尝试向第一个副本Linux121写入数据。执行如下命令:
insert into replicated_test_1 values ('A001', 100.0, '2023-04-01 12:00:00');
上述命令执行之后,首先会在本地完成分区目录的写入,在clickhouse-server.log日志下可以找到对应记录:
2023.04.02 16:37:54.521226 [ 9683 ] {} <Trace> system.metric_log: Renaming temporary part tmp_insert_202304_3005_3005_0 to 202304_8903_8903_0.
此外,如果设置了insert_quorum参数(默认为0),并且insert_quorum >= 2,则Linux121会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于insert_quorum时,整个写入操作才算成功。
(4)由第一个副本实例推送Log日志
在第(3)步骤完成之后,会继续由执行了INSERT的副本向/log节点推送操作日志。在这个例子中,会由第一个副本Linux121担此重任。日志的编号为:/log/log-0000000000,而LogEntry的核心属性如下:
从日志内容可以看出,操作类型为get下载,而需要下载的分区是202304_0_0_0。其余所有副本都会基于Log日志以相同的顺序执行命令。
(5)第二个副本实例拉取Log日志
Linux122副本会一直监听/log节点变化,当Linux121推送了/log/log-0000000000之后,Linux122便会触发日志的拉取任务并更新log_pointer,将其指向最新日志下标:
/replicas/linux122/log_pointer: 0
在拉取了LogEntry之后,它并不会直接执行,而是将其转为任务对象放至队列:
/replicas/linux122/queue
Pulling 1 entries to queue: log-0000000000 - log-0000000000
这是因为在复杂的情况下,考虑到在同一时段内,会连续收到许多个LogEntry,所以使用队列的形式消化任务是一个更为合理的设计。注意,拉取的LogEntry是一个区间,这同样也是因为可能会连续收到多个LogEntry。
(6)第二个副本实例向其他副本发起下载请求
Linux122基于/queue队列开始执行任务。当看到type类型为get的时候,ReplicatedMergeTree会明白此时在远端的其他副本中已经成功写入了数据分区,而自己需要同步这些数据。
Linux122上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:
- 从/replicas节点拿到所有的副本节点
- 遍历这些副本,选取其中一个。选取的副本需要拥有最大的log_pointer下标,并且/queue子节点数量最少。log_pointer下标最大,意味着该副本执行的日志最多,数据应该更加完整。而/queue最小,则意味着该副本目前的任务执行负担最小。
在这个例子中,算法选择的远端副本是Linux121。于是Linux122副本向Linux121节点发起了HTTP请求,希望下载分区202304_0_0_0:
2023.04.02 16:10:28.848950 [ 7092 ] {} <Debug> test.replicated_test_1 (b07cd471-2267-4e0b-ba15-eafd33b020f3): Fetching part 202304_0_0_0 from /clickhouse/tables/01/replicated_test_1/replicas/linux121
如果第一次下载请求失败,在默认情况下,Linux122再尝试请求4次,一共会尝试5次(由max_fetch_partition_retries_count参数控制,默认为5)。
(7)第一个副本实例响应数据下载
Linux121的DataPartsExchange端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区202304_0_0_0基于DataPartsExchange的服务响应返回Linux121:
Sending part 202404_0_0_0
(8)第二个副本实例下载数据并完成本地写入
Linux122副本在收到Linux121的分区数据后,首先将其写至临时目录:
tmp_fetch_202304_0_0_0
待全部数据接收完成之后,重命名该目录:
Renaming temporary part tmp_fetch_202304_0_0_0 to 202304_0_0_0
至此,整个写入流程结束。
可以看到,在INSERT的写入过程中,Zookeeper不会进行任何实质性的数据传输。本着谁执行谁负责的原则,在这个案例中由Linux121首先在本地写入了分区数据。之后,也由这个副本负责发送Log日志,通知其他副本下载数据。如果设置了insert_quorum并且insert_quorum>=2 ,则还会由该副本监控完成写入的副本数量。其他副本在接收到Log日志之后,会选择一个最合适的远端副本,点对点地下载分区数据。
ALTER的核心执行流程
当对ReplicatedMergeTree执行ALTER操作进行元数据修改的时候,即会进入ALTER部分的逻辑,例如增加、删除表字段等。而ALTER的核心流程如下图所示:
与INSERT的流程相比,ALTER的流程会简单很多,其执行过程并不会涉及/log日志的分发。整个流程从上至下按照时间顺序进行,其大致分成3个步骤。现在根据上图讲解整个过程:
(1)修改共享元数据
在Linux122节点尝试增加一个列字段,执行如下语句:
ALTER TABLE replicated_test_1 add column place String after id
执行之后,Linux122会修改Zookeeper内的共享元数据节点:
/metadata,/columns
Update shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes
数据修改后,节点的版本号也会同时提升:
Version of metadata nodes in Zookeeper changed. Waiting for structure write lock.
与此同时,linux122还会负责监听所有副本的修改完成情况:
Waiting for linux121 to apply changes
Waiting for linux122 to apply changes
(2)监听共享元数据变更并各自执行本地修改
Linux121和Linux122两个副本分别监听共享元数据的变更。之后,它们会分别对本地的元数据版本号与共享版本号进行对比。在这个案例中,它们会发现本地版本号低于共享版本号,于是它们开始在各自的本地执行更新操作:
Metadata changed in Zookeeper. Applying changes locally
Applied changes to the metadata of the table
(3)确认所有副本完成修改
Linux122确认所有副本均已完成修改。
ALTER finished
Done processing query
至此,整个ALTER流程结束
可以看到,在ALTER整个的执行过程中,Zookeeper不会进行任何实质性的数据传输。所有的ALTER操作,最终都是由各个副本在本地完成的。本着谁执行谁负责的原则,在这个案例中,由linux122负责对共享元数据的修改以及对各个副本修改进度的监控。