1、副本
1.1、概述
集群是副本和分片的基础,它将ClickHouse的服务拓扑由单节点延伸到多个节点,但它并不像Hadoop生态的某些系统那样,要求所有节点组成一个单一的大集群。ClickHouse的集群配置非常灵活,用户既可以将所有节点组成一个单一集群,也可以按照业务的诉求,把节点划分为多个小的集群。在每个小的集群区域之间,它们的节点、分区和副本数量可以各不相同
从作用来看,ClickHouse集群的工作更多是针对逻辑层面的。集群定义了多个节点的拓扑关系,这些节点在后续服务过程中可能会协同工作,而执行层面的具体工作则交给了副本和分片来执行。副本和分片这对双胞胎兄弟,有时候看起来泾渭分明,有时候又让人分辨不清。这里有两种区分的方法。一种是从数据层面区分,假设ClickHouse的N个节点组成了一个集群,在集群的各个节点上,都有一张结构相同的数据表Y。如果N1的Y和N2的Y中的数据完全不同,则N1和N2互为分片;如果它们的数据完全相同,则它们互为副本。换言之,分片之间的数据是不同的,而副本之间的数据是完全相同的。所以抛开表引擎的不同,单纯从数据层面来看,副本和分片有时候只有一线之隔。
另一种是从功能作用层面区分,使用副本的主要目的是防止数据丢失,增加数据存储的冗余;而使用分片的主要目的是实现数据的水平切分
本章接下来会按照由易到难的方式介绍副本、分片和集群的使用方法。从数据表的初始形态1分片、0副本开始介绍;接着介绍如何为它添加副本,从而形成1分片、1副本的状态;再介绍如何引入分片,将其转换为多分片、1副本的形态(多副本的形态以此类推)
这种形态的变化过程像极了企业内的业务发展过程。在业务初期,我们从单张数据表开始;在业务上线之后,可能会为它增加副本,以保证数据的安全,或者希望进行读写分离;随着业务量的发展,单张数据表可能会遇到瓶颈,此时会进一步为它增加分片,从而实现数据的水平切分。在接下来的示例中,也会遵循这样的演示路径进行说明。
1.2、数据副本
不知大家是否还记得,在介绍MergeTree的时候,曾经讲过它的命名规则。如果在*MergeTree的前面增加Replicated的前缀,则能够组合成一个新的变种引擎,即Replicated-MergeTree复制表。
换言之,只有使用了ReplicatedMergeTree复制表系列引擎,才能应用副本的能力(后面会介绍另一种副本的实现方式)。或者用一种更为直接的方式理解,即使用ReplicatedMergeTree的数据表就是副本。
ReplicatedMergeTree是MergeTree的派生引擎,它在MergeTree的基础上加入了分布式协同的能力。
在MergeTree中,一个数据分区由开始创建到全部完成,会历经两类存储区域。
(1)内存:数据首先会被写入内存缓冲区。
(2)本地磁盘:数据接着会被写入tmp临时目录分区,待全部完成后再将临时目录重命名为正式分区。ReplicatedMergeTree在上述基础之上增加了ZooKeeper的部分,它会进一步在ZooKeeper内创建一系列的监听节点,并以此实现多个实例之间的通信。在整个通信过程中,ZooKeeper并不会涉及表数据的传输。
1.3、副本的特点
作为数据副本的主要实现载体,ReplicatedMergeTree在设计上有一些显著特点。
依赖ZooKeeper:在执行INSERT和ALTER查询的时候,ReplicatedMergeTree需要借助ZooKeeper的分布式协同能力,以实现多个副本之间的同步。但是在查询副本的时候,并不需要使用ZooKeeper。关于这方面的更多信息,会在稍后详细介绍。
表级别的副本:副本是在表级别定义的,所以每张表的副本配置都可以按照它的实际需求进行个性化定义,包括副本的数量,以及副本在集群内的分布位置等。
多主架构(Multi Master):可以在任意一个副本上执行INSERT和ALTER查询,它们的效果是相同的。这些操作会借助ZooKeeper的协同能力被分发至每个副本以本地形式执行。
Block数据块:在执行INSERT命令写入数据时,会依据max_insert_block_size的大小(默认1048576行)将数据切分成若干个Block数据块。所以Block数据块是数据写入的基本单元,并且具有写入的原子性和唯一性。
原子性:在数据写入时,一个Block块内的数据要么全部写入成功,要么全部失败。
唯一性:在写入一个Block数据块的时候,会按照当前Block数据块的数据顺序、数据行和数据大小等指标,计算Hash信息摘要并记录在案。在此之后,如果某个待写入的Block数据块与先前已被写入的Block数据块拥有相同的Hash摘要(Block数据块内数据顺序、数据大小和数据行均相同),则该Block数据块会被忽略。这项设计可以预防由异常原因引起的Block数据块重复写入的问题。如果只是单纯地看这些特点的说明,可能不够直观。没关系,接下来会逐步展开,并附带一系列具体的示例。
1.4、ZooKeeper的配置方式
ClickHouse使用一组zookeeper标签定义相关配置,默认情况下,在全局配置config. xml中定义即可。但是各个副本所使用的Zookeeper配置通常是相同的,为了便于在多个节点之间复制配置文件,更常见的做法是将这一部分配置抽离出来,独立使用一个文件保存。
首先,在服务器的/etc/clickhouse-server/config.d目录下创建一个名为metrika.xml的配置文件:
接着,在全局配置config.xml中使用<include_from>标签导入刚才定义的配置:
并引用ZooKeeper配置的定义
其中,incl与metrika.xml配置文件内的节点名称要彼此对应。至此,整个配置过程就完成了。
ClickHouse在它的系统表中,颇为贴心地提供了一张名为zookeeper的代理表。通过这张表,可以使用SQL查询的方式读取远端ZooKeeper内的数据。有一点需要注意,在用于查询的SQL语句中,必须指定path条件,例如查询根路径:
SELECT * FROM system.zookeeper where path = '/';
SELECT name,value,czxid,mzxid FROM
system.zookeeper where path = '/clickhouse';
1.5、副本的定义形式
正如前文所言,使用副本的好处甚多。首先,由于增加了数据的冗余存储,所以降低了数据丢失的风险;其次,由于副本采用了多主架构,所以每个副本实例都可以作为数据读、写的入口,这无疑分摊了节点的负载。
在使用副本时,不需要依赖任何集群配置, ReplicatedMergeTree结合ZooKeeper就能完成全部工作。
ReplicatedMergeTree的定义方式如下:
ENGINE =ReplicatedMergeTree('zk_path','replica_name')
zk_path用于指定在ZooKeeper中创建的数据表的路径,路径名称是自定义的,并没有固定规则,用户可以设置成自己希望的任何路径。即便如此,ClickHouse还是提供了一些约定俗成的配置模板以供参考,例如:
/clickhouse/tables/{shard}/table_name
其中:
/clickhouse/tables/是约定俗成的路径固定前缀,表示存放数据表的根路径。
{shard}表示分片编号,通常用数值替代,例如01、02、03。一张数据表可以有多个分片,而每个分片都拥有自己的副本。
table_name表示数据表的名称,为了方便维护,通常与物理表的名字相同(虽然ClickHouse并不强制要求路径中的表名称和物理表名相同);而replica_name的作用是定义在ZooKeeper中创建的副本名称,该名称是区分不同副本实例的唯一标识。一种约定成俗的命名方式是使用所在服务器的域名称。
对于zk_path而言,同一张数据表的同一个分片的不同副本,应该定义相同的路径;而对于replica_name而言,同一张数据表的同一个分片的不同副本,应该定义不同的名称。
1.6、ReplicatedMergeTree原理解析
ReplicatedMergeTree作为复制表系列的基础表引擎,涵盖了数据副本最为核心的逻辑,将它拿来作为副本的研究标本是最合适不过了。因为只要剖析了ReplicatedMergeTree的核心原理,就能掌握整个ReplicatedMergeTree系列表引擎的使用方法。
1.7、数据结构
在ReplicatedMergeTree的核心逻辑中,大量运用了ZooKeeper的能力,以实现多个ReplicatedMergeTree副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和BlockID去重判断等。在执行INSERT数据写入、MERGE分区和MUTATION操作的时候,都会涉及与ZooKeeper的通信。但是在通信的过程中,并不会涉及任何表数据的传输,在查询数据的时候也不会访问ZooKeeper,所以不必过于担心ZooKeeper的承载压力。
因为ZooKeeper对ReplicatedMergeTree非常重要,所以下面首先从它的数据结构开始介绍。
1.8、ZooKeeper内的节点结构
ReplicatedMergeTree需要依靠ZooKeeper的事件监听机制以实现各个副本之间的协同。所以,在每张ReplicatedMergeTree表的创建过程中,它会以zk_path为根路径,在Zoo-Keeper中为这张表创建一组监听节点。按照作用的不同,监听节点可以大致分成如下几类:
1)元数据:
/metadata:保存元数据信息,包括主键、分区键、采样表达式等。
/columns:保存列字段信息,包括列名称和数据类型。
/replicas:保存副本名称,对应设置参数中的replica_name。
2)判断标识:
/leader_election:用于主副本的选举工作,主副本会主导MERGE和MUTATION操作(ALTER DELETE和ALTER UPDATE)。这些任务在主副本完成之后再借助ZooKeeper将消息事件分发至其他副本。
/blocks:记录Block数据块的Hash信息摘要,以及对应的partition_id。通过Hash摘要能够判断Block数据块是否重复;通过partition_id,则能够找到需要同步的数据分区。
/block_numbers:按照分区的写入顺序,以相同的顺序记录partition_id。各个副本在本地进行MERGE时,都会依照相同的block_numbers顺序进行。
/quorum:记录quorum的数量,当至少有quorum数量的副本写入成功后,整个写操作才算成功。quorum的数量由insert_quorum参数控制,默认值为0。
3)操作日志:
/log:常规操作日志节点(INSERT、MERGE和DROP PARTITION),它是整个工作机制中最为重要的一环,保存了副本需要执行的任务指令。log使用了ZooKeeper的持久顺序型节点,每条指令的名称以log-为前缀递增,例如log-0000000000、log-0000000001等。每一个副本实例都会监听/log节点,当有新的指令加入时,它们会把指令加入副本各自的任务队列,并执行任务。关于这方面的执行逻辑,稍后会进一步展开。
/mutations:MUTATION操作日志节点,作用与log日志类似,当执行ALERTDELETE和ALERT UPDATE查询时,操作指令会被添加到这个节点。mutations同样使用了ZooKeeper的持久顺序型节点,但是它的命名没有前缀,每条指令直接以递增数字的形式保存,例如0000000000、0000000001等。关于这方面的执行逻辑,同样稍后展开。
/replicas/{replica_name}/*:每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令,其中较为重要的节点有如下几个:
/queue:任务队列节点,用于执行具体的操作任务。当副本从/log或/mutations节点监听到操作指令时,会将执行任务添加至该节点下,并基于队列执行。
/log_pointer:log日志指针节点,记录了最后一次执行的log日志下标信息,例如log_pointer:4对应了/log/log-0000000003(从0开始计数)。
/mutation_pointer:mutations日志指针节点,记录了最后一次执行的mutations日志名称,例如mutation_pointer:0000000000对应了/mutations/000000000。
Entry日志对象的数据结构
ReplicatedMergeTree在ZooKeeper中有两组非常重要的父节点,那就是/log和/mutations。它们的作用犹如一座通信塔,是分发操作指令的信息通道,而发送指令的方式,则是为这些父节点添加子节点。所有的副本实例,都会监听父节点的变化,当有子节点被添加时,它们能实时感知。
这些被添加的子节点在ClickHouse中被统一抽象为Entry对象,而具体实现则由Log-Entry和MutationEntry对象承载,分别对应/log和/mutations节点。
1)LogEntryLogEntry用于封装/log的子节点信息,它拥有如下几个核心属性:
source replica:发送这条Log指令的副本来源,对应replica_name。
type:操作指令类型,主要有get、merge和mutate三种,分别对应从远程副本下载分区、合并分区和MUTATION操作。
block_id:当前分区的BlockID,对应/blocks路径下子节点的名称。partition_name:当前分区目录的名称。
2)MutationEntryMutationEntry用于封装/mutations的子节点信息,它同样拥有如下几个核心属性:
source replica:发送这条MUTATION指令的副本来源,对应replica_name。
commands:操作指令,主要有ALTER DELETE和ALTER UPDATE。
mutation_id:MUTATION操作的版本号。
partition_id:当前分区目录的ID。以上就是Entry日志对象的数据结构信息,在接下来将要介绍的核心流程中,将会看到它们的身影。
以上就是Entry日志对象的数据结构信息,在接下来将要介绍的核心流程中,将会看到它们的身影。
1.9、副本协同的核心流程
副本协同的核心流程主要有INSERT、MERGE、MUTATION和ALTER四种,分别对应了数据写入、分区合并、数据修改和元数据修改。INSERT和ALTER查询是分布式执行的。借助ZooKeeper的事件通知机制,多个副本之间会自动进行有效协同,但是它们不会使用ZooKeeper存储任何分区数据。而其他查询并不支持分布式执行,包括SELECT、CREATE、DROP、RENAME和ATTACH。例如,为了创建多个副本,我们需要分别登录每个ClickHouse节点,在它们本地执行各自的CREATE语句(后面将会介绍如何利用集群配置简化这一操作)。接下来,会依次介绍上述流程的工作机理。为了便于理解,我先来整体认识一下各个流程的介绍方法。
首先,拟定一个演示场景,即使用ReplicatedMergeTree实现一张拥有1分片、1副本的数据表,并以此来贯穿整个讲解过程(对于大于1个副本的场景,流程以此类推)。
接着,通过对ReplicatedMergeTree分别执行INSERT、MERGE、MUTATION和ALTER操作,以此来讲解相应的工作原理。与此同时,通过实际案例,论证工作原理。
当需要在ReplicatedMergeTree中执行INSERT查询以写入数据时,即会进入INSERT核心流程
创建第一个副本实例
假设首先从CH5节点开始,对CH5节点执行下面的语句后,会创建第一个副本实例:
CREATE TABLE replicated_sales_1 (
id String,
price Float64,
create_time DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch5.nauu.com')
partition by toYYYYMM(create_time)
ORDER BY id ;
在创建的过程中,ReplicatedMergeTree会进行一些初始化操作,例如:根据zk_path初始化所有的ZooKeeper节点。
在/replicas/节点下注册自己的副本实例ch5.nauu.com。
启动监听任务,监听/log日志节点。
参与副本选举,选举出主副本,选举的方式是向/leader_election/插入子节点,第一个插入成功的副本就是主副本。
接着,在CH6节点执行下面的语句,创建第二个副本实例。表结构和zk_path需要与第一个副本相同,而replica_name则需要设置成CH6的域名:
CREATE TABLE replicated_sales_1 (
id String,
price Float64,
create_time DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch6.nauu.com')
partition by toYYYYMM(create_time)
ORDER BY id ;
在创建过程中,第二个ReplicatedMergeTree同样会进行一些初始化操作,例如:
在/replicas/节点下注册自己的副本实例ch6.nauu.com。
启动监听任务,监听/log日志节点。
参与副本选举,选举出主副本。在这个例子中,CH5副本成为主副本。
现在尝试向第一个副本CH5写入数据。执行如下命令:
上述命令执行之后,首先会在本地完成分区目录的写入:
接着向/blocks节点写入该数据分区的block_id:
该block_id将作为后续去重操作的判断依据。如果此时再次执行刚才的INSERT语句,试图写入重复数据,则会出现如下提示:
即副本会自动忽略block_id重复的待写入数据。
此外,如果设置了insert_quorum参数(默认为0),并且insert_quorum>=2,则CH5会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于insert_quorum时,整个写入操作才算成功。
由第一个副本实例推送Log日志
在3步骤完成之后,会继续由执行了INSERT的副本向/log节点推送操作日志。在这个例子中,会由第一个副本CH5担此重任。日志的编号是/log/log-0000000000,而LogEntry的核心属性如下
从日志内容中可以看出,操作类型为get下载,而需要下载的分区是201905_0_0_0。其余所有副本都会基于Log日志以相同的顺序执行命令。
至此,整个写入流程结束。
可以看到,在INSERT的写入过程中,ZooKeeper不会进行任何实质性的数据传输。本着谁执行谁负责的原则,在这个案例中由CH5首先在本地写入了分区数据。之后,也由这个副本负责发送Log日志,通知其他副本下载数据。如果设置了insert_quorum并且insert_quorum>=2,则还会由该副本监控完成写入的副本数量。其他副本在接收到Log日志之后,会选择一个最合适的远端副本,点对点地下载分区数据。
1.10、MERGE的核心执行流程
当ReplicatedMergeTree触发分区合并动作时,即会进入这个部分的流程,它的核心流程如图所示。
无论MERGE操作从哪个副本发起,其合并计划都会交由主副本来制定。在INSERT的例子中,CH5节点已经成功竞选为主副本,所以为了方便论证,这个案例就从CH6节点开始。整个流程从上至下按照时间顺序进行,其大致分成5个步骤。现在,根据图1中所示编号讲解整个过程。
创建远程连接,尝试与主副本通信
首先在CH6节点执行OPTIMIZE,强制触发MERGE合并。这个时候,CH6通过/replicas找到主副本CH5,并尝试建立与它的远程连接。
主副本接收通信
主副本CH5接收并建立来自远端副本CH6的连接
由主副本制定MERGE计划并推送Log日志
由主副本CH5制定MERGE计划,并判断哪些分区需要被合并。在选定之后,CH5将合并计划转换为Log日志对象并推送Log日志,以通知所有副本开始合并。日志的核心信息如下:
从日志内容中可以看出,操作类型为Merge合并,而这次需要合并的分区目录是201905_0_0_0和201905_1_1_0。
与此同时,主副本还会锁住执行线程,对日志的接收情况进行监听:
其监听行为由replication_alter_partitions_sync参数控制,默认值为1。当此参数为0时,不做任何等待;为1时,只等待主副本自身完成;为2时,会等待所有副本拉取完成。
各个副本分别拉取Log日志
CH5和CH6两个副本实例将分别监听/log/log-0000000002日志的推送,它们也会分别拉取日志到本地,并推送到各自的/queue任务队列:
各个副本分别在本地执行MERGE
CH5和CH6基于各自的/queue队列开始执行任务:
各个副本开始在本地执行MERGE:
至此,整个合并流程结束。
可以看到,在MERGE的合并过程中,ZooKeeper也不会进行任何实质性的数据传输,所有的合并操作,最终都是由各个副本在本地完成的。而无论合并动作在哪个副本被触发,都会首先被转交至主副本,再由主副本负责合并计划的制定、消息日志的推送以及对日志接收情况的监控。
1.11、MUTATION的核心执行流程
当对ReplicatedMergeTree执行ALTER DELETE或者ALTER UPDATE操作的时候,即会进入MUTATION部分的逻辑,它的核心流程如图所示。
与MERGE类似,无论MUTATION操作从哪个副本发起,首先都会由主副本进行响应。所以为了方便论证,这个案例还是继续从CH6节点开始(因为CH6不是主副本)。整个流程从上至下按照时间顺序进行,其大致分成5个步骤。现在根据图中所示编号讲解整个过程。
推送MUTATION日志
在CH6节点尝试通过DELETE来删除数据(执行UPDATE的效果与此相同),执行如下命令:
执行之后,该副本会接着进行两个重要事项:
创建MUTATION ID:
将MUTATION操作转换为MutationEntry日志,并推送到/mutations/0000000000。MutationEntry的核心属性如下:
由此也能知晓,MUTATION的操作日志是经由/mutations节点分发至各个副本的。
所有副本实例各自监听MUTATION日志
CH5和CH6都会监听/mutations节点,所以一旦有新的日志子节点加入,它们都能实时感知:
当监听到有新的MUTATION日志加入时,并不是所有副本都会直接做出响应,它们首先会判断自己是否为主副本。
由主副本实例响应MUTATION日志并推送Log日志
只有主副本才会响应MUTATION日志,在这个例子中主副本为CH5,所以CH5将MUTATION日志转换为LogEntry日志并推送至/log节点,以通知各个副本执行具体的操作。日志的核心信息如下:
从日志内容中可以看出,上述操作的类型为mutate,而这次需要将201905_0_1_1分区修改为201905_0_1_1_2(201905_0_1_1 +”_” + mutation_id)。
各个副本实例分别拉取Log日志
CH5和CH6两个副本分别监听/log/log-0000000003日志的推送,它们也会分别拉取日志到本地,并推送到各自的/queue任务队列:
各个副本实例分别在本地执行MUTATION
CH5和CH6基于各自的/queue队列开始执行任务:
各个副本,开始在本地执行MUTATION:
至此,整个MUTATION流程结束。
可以看到,在MUTATION的整个执行过程中,ZooKeeper同样不会进行任何实质性的数据传输。所有的MUTATION操作,最终都是由各个副本在本地完成的。而MUTATION操作是经过/mutations节点实现分发的。本着谁执行谁负责的原则,在这个案例中由CH6负责了消息的推送。但是无论MUTATION动作从哪个副本被触发,之后都会被转交至主副本,再由主副本负责推送Log日志,以通知各个副本执行最终的MUTATION逻辑。同时也由主副本对日志接收的情况实行监控。
1.12、ALTER的核心执行流程
当对ReplicatedMergeTree执行ALTER操作进行元数据修改的时候,即会进入ALTER部分的逻辑,例如增加、删除表字段等。而ALTER的核心流程如图所示。
与之前的几个流程相比,ALTET的流程会简单很多,其执行过程中并不会涉及/log日志的分发。整个流程从上至下按照时间顺序进行,其大致分成3个步骤。现在根据图所示编号讲解整个过程。
修改共享元数据
在CH6节点尝试增加一个列字段,执行如下语句:
执行之后,CH6会修改ZooKeeper内的共享元数据节点:
数据修改后,节点的版本号也会同时提升:
与此同时,CH6还会负责监听所有副本的修改完成情况:
监听共享元数据变更并各自执行本地修改
CH5和CH6两个副本分别监听共享元数据的变更。之后,它们会分别对本地的元数据版本号与共享版本号进行对比。在这个案例中,它们会发现本地版本号低于共享版本号,于是它们开始在各自的本地执行更新操作:
确认所有副本完成修改
CH6确认所有副本均已完成修改:
至此,整个ALTER流程结束。
可以看到,在ALTER整个的执行过程中,ZooKeeper不会进行任何实质性的数据传输。所有的ALTER操作,最终都是由各个副本在本地完成的。本着谁执行谁负责的原则,在这个案例中由CH6负责对共享元数据的修改以及对各个副本修改进度的监控。
2、数据分片
2.1、前序
通过引入数据副本,虽然能够有效降低数据的丢失风险(多份存储),并提升查询的性能(分摊查询、读写分离),但是仍然有一个问题没有解决,那就是数据表的容量问题。到目前为止,每个副本自身,仍然保存了数据表的全量数据。所以在业务量十分庞大的场景中,依靠副本并不能解决单表的性能瓶颈。想要从根本上解决这类问题,需要借助另外一种手段,即进一步将数据水平切分,也就是我们将要介绍的数据分片。
ClickHouse中的每个服务节点都可称为一个shard(分片)。从理论上来讲,假设有N(N >= 1)张数据表A,分布在N个ClickHouse服务节点,而这些数据表彼此之间没有重复数据,那么就可以说数据表A拥有N个分片。然而在工程实践中,如果只有这些分片表,那么整个Sharding(分片)方案基本是不可用的。对于一个完整的方案来说,还需要考虑数据在写入时,如何被均匀地写至各个shard,以及数据在查询时,如何路由到每个shard,并组合成结果集。所以,ClickHouse的数据分片需要结合Distributed表引擎一同使用,如图所示。
Distributed表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。
2.2、1分片、0副本
在ClickHouse中,集群配置用shard代表分片、用replica代表副本。那么在逻辑层面,表示1分片、0副本语义的配置如下所示:
可以看到,这样的配置似乎有些反直觉,shard更像是逻辑层面的分组,而无论是副本还是分片,它们的载体都是replica,所以从某种角度来看,副本也是分片。
关于这方面的详细介绍会在后续展开,现在先回到之前的话题。由于Distributed表引擎需要读取集群的信息,所以首先必须为ClickHouse添加集群的配置。找到前面在介绍ZooKeeper配置时增加的metrika.xml配置文件,将其加入集群的配置信息。
集群有两种配置形式,下面分别介绍。
2.3、不包含副本的分片
如果直接使用node标签定义分片节点,那么该集群将只包含分片,不包含副本。以下面的配置为例:
该配置定义了一个名为shard_2的集群,其包含了2个分片节点,它们分别指向了是CH5和CH6服务器。现在分别对配置项进行说明:
shard_2表示自定义的集群名称,全局唯一,是后续引用集群配置的唯一标识。在一个配置文件内,可以定义任意组集群。
node用于定义分片节点,不包含副本。
host指定部署了ClickHouse节点的服务器地址。
port指定ClickHouse服务的TCP端口。接下来介绍选填参数:
weight分片权重默认为1,在后续小节中会对其详细介绍。
user为ClickHouse用户,默认为default。
password为ClickHouse的用户密码,默认为空字符串。
secure为SSL连接的端口,默认为9440。
compression表示是否开启数据压缩功能,默认为true。
2.4、3分片、0副本
2.5、自定义分片与副本
集群配置支持自定义分片和副本的数量,这种形式需要使用shard标签代替先前的node,除此之外的配置完全相同。在这种自定义配置的方式下,分片和副本的数量完全交由配置者掌控。其中,shard表示逻辑上的数据分片,而物理上的分片则用replica表示。如果在1个shard标签下定义N(N >= 1)组replica,则该shard的语义表示1个分片和N -1个副本。接下来用几组配置示例进行说明。
1)不包含副本的分片
下面所示的这组集群配置的效果与先前介绍的shard_2集群相同:
sharding_simple集群的语义为2分片、0副本(1分片、0副本,再加上1分片、0副本)。
2.6、N个分片和N个副本
这种形式可以按照实际需求自由组合,例如下面的这组配置,集群sharding_simple_1拥有1个分片和1个副本:
下面所示集群sharding_ha拥有2个分片,而每个分片拥有1个副本:
从上面的配置信息中能够得出结论,集群中replica数量的上限是由ClickHouse节点的数量决定的,例如为了部署集群sharding_ha,需要4个ClickHouse服务节点作为支撑。
在完成上述配置之后,可以查询系统表验证集群配置是否已被加载:
SELECT cluster,host_name FROM system.clusters;
不用重启集群,自动加载
2.7、基于集群实现分布式DDL
前言
不知道大家是否还记得,在前面介绍数据副本时为了创建多张副本表,我们需要分别登录到每个ClickHouse节点,在它们本地执行各自的CREATE语句。这是因为在默认的情况下,CREATE、DROP、RENAME和ALTER等DDL语句并不支持分布式执行。而在加入集群配置后,就可以使用新的语法实现分布式DDL执行了,其语法形式如下:
其中,cluster_name对应了配置文件中的集群名称,ClickHouse会根据集群的配置信息顺藤摸瓜,分别去各个节点执行DDL语句。
在执行了上述语句之后,ClickHouse会根据集群shard_2的配置信息,分别在CH5和CH6节点本地创建test_1_local。
如果要删除test_1_local,则执行下面的分布式DROP:
值得注意的是,在改写的CREATE语句中,用{shard}和{replica}两个动态宏变量代替了先前的硬编码方式。执行下面的语句查询系统表,能够看到当前ClickHouse节点中已存在的宏变量:
这些宏变量是通过配置文件的形式预先定义在各个节点的配置文件中的,配置文件如下所示。
在CH5节点的config.xml配置中预先定义了分区01的宏变量:
在CH6节点的config.xml配置中预先定义了分区02的宏变量:
2.8、数据结构
与ReplicatedMergeTree类似,分布式DDL语句在执行的过程中也需要借助ZooKeeper的协同能力,以实现日志分发。
ZooKeeper内的节点结构
在默认情况下,分布式DDL在ZooKeeper内使用的根路径为:
该路径由config.xml内的distributed_ddl配置指定:
在此根路径之下,还有一些其他的监听节点,其中包括/query-[seq],其是DDL操作日志,每执行一次分布式DDL查询,在该节点下就会新增一条操作日志,以记录相应的操作指令。当各个节点监听到有新日志加入的时候,便会响应执行。DDL操作日志使用ZooKeeper的持久顺序型节点,每条指令的名称以query-为前缀,后面的序号递增,例如query-0000000000、query-0000000001等。在每条query-[seq]操作日志之下,还有两个状态节点:
(1)/query-[seq]/active:用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为active的节点。
(2)/query-[seq]/finished:用于检查任务完成情况,在任务的执行过程中,每当集群内的某个host节点执行完毕之后,便会在该节点下写入记录。例如下面的语句。上述语句表示集群内的CH5和CH6两个节点已完成任务。
2.9 、DDLLogEntry日志对象的数据结构
在/query-[seq]下记录的日志信息由DDLLogEntry承载,它拥有如下几个核心属性:
query记录了DDL查询的执行语句,例如:
hosts记录了指定集群的hosts主机列表,集群由分布式DDL语句中的ONCLUSTER指定,例如:
在分布式DDL的执行过程中,会根据hosts列表逐个判断它们的执行状态。initiator记录初始化host主机的名称,hosts主机列表的取值来自于初始化host节点上的集群,例如:
hosts主机列表的取值来源等同于下面的查询:
2.10 、分布式DDL的核心执行流程
与副本协同的核心流程类似,接下来,创建test_1_local的过程为例,解释分布式DDL的核心执行流程。整个流程如图所示。
整个流程从上至下按照时间顺序进行,其大致分成3个步骤。现在,根据图所示编号讲解整个过程。
(1)推送DDL日志:首先在CH5节点执行CREATE TABLE ON CLUSTER,本着谁执行谁负责的原则,在这个案例中将会由CH5节点负责创建DDLLogEntry日志并将日志推送到ZooKeeper,同时也会由这个节点负责监控任务的执行进度。
(2)拉取日志并执行:CH5和CH6两个节点分别监听/ddl/query-0000000064日志的推送,于是它们分别拉取日志到本地。首先,它们会判断各自的host是否被包含在DDLLog-Entry的hosts列表中。如果包含在内,则进入执行流程,执行完毕后将状态写入finished节点;如果不包含,则忽略这次日志的推送。
(3)确认执行进度:在步骤1执行DDL语句之后,客户端会阻塞等待180秒,以期望所有host执行完毕。如果等待时间大于180秒,则会转入后台线程继续等待(等待时间由distributed_ddl_task_timeout参数指定,默认为180秒)。
2.11、Distributed原理解析
Distributed表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作。
从实体表层面来看,一张分片表由两部分组成:
本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片。
分布式表:通常以_all为后缀进行命名。分布式表只能使用Distributed表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。
对于分布式表与本地表之间表结构的一致性检查,Distributed表引擎采用了读时检查的机制,这意味着如果它们的表结构不兼容,只有在查询时才会抛出错误,而在创建表时并不会进行检查。不同ClickHouse节点上的本地表之间,使用不同的表引擎也是可行的,但是通常不建议这么做,保持它们的结构一致,有利于后期的维护并避免造成不可预计的错误。
定义形式
Distributed表引擎的定义形式如下所示:
ENGINE = Distributed(cluster,database,table,[,sharding_key])
其中,各个参数的含义分别如下:
cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,它会使用集群的配置信息来找到相应的host节点。
database和table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。
sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个host节点的本地表。
现在用示例说明Distributed表的声明方式,建表语句如下所示:
CREATE TABLE test_shard_2_all ON CLUSTER shard_2_0 (
id UInt64
) ENGINE = Distributed(shard_2_0,default,test_shard_2_local,intHash64(id));
上述表引擎参数的语义可以理解为,代理的本地表为default.test_shard_2_local,它们分布在集群sharding_simple的各个shard,在数据写入时会根据rand()随机函数的取值决定数据写入哪个分片。值得注意的是,此时此刻本地表还未创建,所以从这里也能看出,Distributed表运用的是读时检查的机制,对创建分布式表和本地表的顺序并没有强制要求。同样值得注意的是,在上面的语句中使用了ONCLUSTER分布式DDL,这意味着在集群的每个分片节点上,都会创建一张Distributed表,如此一来便可以从其中任意一端发起对所有分片的读、写请求。
接着需要创建本地表,一张本地表代表着一个数据分片。这里同样可以利用先前已经配置好的集群配置,使用分布式DDL语句迅速的在各个节点创建相应的本地表:
CREATE TABLE test_shard_2_local ON CLUSTER shard_2_0(
id UInt64
) ENGINE = MergeTree()
ORDER BY id
PARTITION BY id ;
至此,拥有两个数据分片的分布式表test_shard_2就建好了。
2.12、查询的分类
Distributed表的查询操作可以分为如下几类:
会作用于本地表的查询:对于INSERT和SELECT查询,Distributed将会以分布式的方式作用于local本地表。而对于这些查询的具体执行逻辑,将会在后续小节介绍。
只会影响Distributed自身,不会作用于本地表的查询:Distributed支持部分元数据操作,包括CREATE、DROP、RENAME和ALTER,其中ALTER并不包括分区的操作(ATTACH PARTITION、REPLACE PARTITION等)。这些查询只会修改Distributed表自身,并不会修改local本地表。例如要彻底删除一张分布式表,则需要分别删除分布式表和本地表,示例如下。
DROP TABLE test_shard_2_all ON CLUSTER shard_2_0;
DROP TABLE test_shard_2_local ON CLUSTER shard_2_0;
不支持的查询:Distributed表不支持任何MUTATION类型的操作,包括ALTER DELETE和ALTER UPDATE。
2.13、分片规则
关于分片的规则这里将做进一步的展开说明。分片键要求返回一个整型类型的取值,包括Int系列和UInt系列。例如分片键可以是一个具体的整型列字段:
Distributed(cluster,database,table,userid)
也可以是一个返回整型的表达式:
--按照随机数划分
Distributed(cluster,database,table,rand());
--按照用户id的散列值划分
Distributed(cluster,database,table,intHash64(userid));
如果不声明分片键,那么分布式表只能包含一个分片,这意味着只能映射一张本地表,否则,在写入数据时将会得到如下异常:
如果一张分布式表只包含一个分片,那就意味着其失去了使用的意义了。所以虽然分片键是选填参数,但是通常都会按照业务规则进行设置。那么数据具体是如何被划分的呢?想要讲清楚这部分逻辑,首先需要明确几个概念。
2.14、分片权重(weight)
在集群的配置中,有一项weight(分片权重)的设置:
weight默认为1,虽然可以将它设置成任意整数,但官方建议应该尽可能设置成较小的值。分片权重会影响数据在分片中的倾斜程度,一个分片权重值越大,那么它被写入的数据就会越多。
2.15、slot(槽)
slot可以理解成许多小的水槽,如果把数据比作是水的话,那么数据之水会顺着这些水槽流进每个数据分片。slot的数量等于所有分片的权重之和,假设集群sharding_simple有两个Shard分片,第一个分片的weight为10,第二个分片的weight为20,那么slot的数量则等于30。slot按照权重元素的取值区间,与对应的分片形成映射关系。在这个示例中,如果slot值落在[0, 10)区间,则对应第一个分片;如果slot值落在[10, 20] 备注:正无穷大,如果没有第三个分片区间,则对应第二个分片。