ClickHouse创建分布式表
当数据量剧增的时候,clickhouse是采用分片的方式进行数据的存储的,类似于redis集群的实现方式。然后想进行统一的查询的时候,因为涉及到多个本地表,可以通过分布式表的方式来提供统一的入口。由于是涉及到分布式存储,保证高可用就必须有数据冗余—即副本(replica)。Clickhouse依靠ReplicatedMergeTree引擎族与Zookeeper实现了复制表机制,成为其高可用的基础。该引擎和 MergeTree 的不同之处在于它会删除排序键值相同的重复项。
同时,Clickhouse具有数据分片(shard)的概念,这也是分布式存储的特点之一,即通过并行读写提高效率。ClickhouseHouse依靠Distributed引擎实现了分布式表机制,在所有分片(本地表)上建立视图进行分布式查询。
本文使用ReplicatedMergeTree和Distributed引擎来构建Clickhouse的分布式表。分布式表包括了逻辑表和物理表,逻辑表主要用于查询,物理表是实际存储数据的。
官方给出的创建复制表示例
CREATE TABLE table_name
(
EventDate DateTime,
CounterID UInt32,
UserID UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/table_name', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
贴上本文本地表的创建代码:
-- 建本地表
CREATE TABLE test.countly_device_local on cluster ch_cluster
(
`appKey` String COMMENT 'appKey',
`deviceId` String COMMENT 'deviceId',
`nginxTime` DateTime COMMENT 'nginxTime',
`rooted` String COMMENT 'rooted',
`charging` String COMMENT 'charging',
`idfa` String COMMENT 'idfa'
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/countly_device_local', '{replica}')
PARTITION BY toYYYYMMDD(nginxTime)
ORDER BY (nginxTime);
其中,on cluster ch_cluster
语法标识分布式DDL,即执行一次就可以在集群所有实例上创建同样的本地表。
ReplicatedMergeTree
ReplicatedMergeTree引擎族接受两个参数:
zoo_path
—Zookeeper中该表的路径replica_name
—Zookeeper中的该表的副本名称
如上例所示,这些参数可以包含宏替换的占位符,即大括号的部分。它们会被替换为配置文件里macros
那部分配置的值。
示例:
<macros>
<layer>05</layer>
<shard>02</shard>
<replica>example05-02-1.yandex.ru</replica>
</macros>
«ZooKeeper 中该表的路径»对每个可复制表都要是唯一的。不同分片上的表要有不同的路径。
这种情况下,路径应包含下面这些部分:
/clickhouse/tables/
是公共前缀,官方推荐使用这个。
{layer}-{shard}
是分片标识部分。在此示例中,由于 Yandex.Metrica 集群使用了两级分片,所以它是由两部分组成的。但对于大多数情况来说,你只需保留 {shard} 占位符即可,它会替换展开为分片标识。
table_name
是该表在 ZooKeeper 中的名称。使其与 ClickHouse 中的表名相同比较好。 这里它被明确定义,跟 ClickHouse 表名不一样,它并不会被 RENAME 语句修改。
HINT:你可以在前面添加一个数据库名称 table_name
也是 例如。 db_name.table_name
副本名称用于标识同一个表分片的不同副本。你可以使用服务器名称,如上例所示。同个分片中不同副本的副本名称要唯一。
你也可以显式指定这些参数,而不是使用宏替换。对于测试和配置小型集群这可能会很方便。但是,这种情况下,则不能使用分布式 DDL 语句(ON CLUSTER
)。使用大型集群时,官方建议使用宏替换,因为它可以降低出错的可能性。
要删除副本,使用 DROP TABLE
。但它只删除那个 – 位于运行该语句的服务器上的副本。
想要删除集群上的所有副本,可以使用drop table [if exists] table_name on cluster cluster_name
Distributed Table & Distributed Engine
ClickHouse分布式表的本质并不是一张表,而是一些本地物理表(分片)的分布式视图,分布式引擎本身不存储数据, 但可以在多个服务器上进行分布式查询。读是自动并行的。读取时,远程服务器表的索引(如果有的话)会被使用。
支持分布式表的引擎是Distributed,建表DDL语句示例如下
Distributed(logs, default, hits[, sharding_key])
分布式引擎参数:服务器配置文件中的集群名,远程数据库名,远程表名,数据分片键(可选)。
将会从位于«logs»集群中 default.hits 表所有服务器上读取数据。
建立上面的本地表的分布式的语句为:
-- 建分布式表
create TABLE test.countly_device_dist on cluster ch_cluster as test.countly_device_local
ENGINE = Distributed("ch_cluster", "test", "countly_device_local", rand());
远程服务器不仅用于读取数据,还会对尽可能数据做部分处理。 例如,对于使用 GROUP BY 的查询,数据首先在远程服务器聚合,之后返回聚合函数的中间状态给查询请求的服务器。再在请求的服务器上进一步汇总数据。
要查看集群的详细信息,可以查看system.clusters
表:
select * from system.clusters
r-clickhouse-m220-210.hd163.internal :) select * from system.clusters;
SELECT *
FROM system.clusters
Query id: bbf4a9fa-5a94-48b4-b485-e50bc9a2e891
┌─cluster─────────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name──────────────┬─host_address───┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─estimated_recovery_time─┐
│ ch_base_cluster │ 1 │ 10000 │ 1 │ clickhouse_node1_1_1 │ 10.105.220.210 │ 9000 │ 1 │ default │ │ 0 │ 0 │
│ ch_base_cluster │ 1 │ 10000 │ 2 │ clickhouse_node2_1_2 │ 10.105.220.211 │ 9001 │ 0 │ default │ │ 0 │ 0 │
│ ch_base_cluster │ 2 │ 10000 │ 1 │ clickhouse_node2_2_1 │ 10.105.220.211 │ 9000 │ 0 │ default │
...
本文所使用的环境有一个名为ch_cluster
的集群,它由两个分片组成,每个分片包含两个副本。
要注意区分分片和副本:
- 分片是指包含数据不同部分的服务器(要读取所有数据,必须访问所有分片)。
- 副本是存储复制数据的服务器(要读取所有数据,访问任一副本上的数据即可)。
向分布式表中写数据
向集群写数据的方法有两种:
- 自已指定要将哪些数据写入哪些服务器,并直接在每个分片上执行写入。换句话说,在分布式表上«查询»,在数据表上 INSERT。 这是最灵活的解决方案 – 你可以使用任何分片方案,对于复杂业务特性的需求,这可能是非常重要的。 这也是最佳解决方案,因为数据可以完全独立地写入不同的分片。
- 在分布式表上执行 INSERT。在这种情况下,分布式表会跨服务器分发插入数据。 为了写入分布式表,必须要配置分片键(最后一个参数)。当然,如果只有一个分片,则写操作在没有分片键的情况下也能工作,因为这种情况下分片键没有意义。
每个分片都可以在配置文件中定义权重。默认情况下,权重等于1。数据依据分片权重按比例分发到分片上。例如,如果有两个分片,第一个分片的权重是9,而第二个分片的权重是10,则发送 9 / 19 的行到第一个分片, 10 / 19 的行到第二个分片。
选择将一行数据发送到哪个分片的方法是,首先计算分片表达式,然后将这个计算结果除以所有分片的权重总和得到余数。该行会发送到那个包含该余数的从prev_weight
到prev_weights + weight
的半闭半开区间对应的分片上,其中 prev_weights
是该分片前面的所有分片的权重和,weight
是该分片的权重。例如,如果有两个分片,第一个分片权重为9,而第二个分片权重为10,则余数在 [0,9) 中的行发给第一个分片,余数在 [9,19) 中的行发给第二个分片。
分片表达式可以是由常量和表列组成的任何返回整数表达式。例如,您可以使用表达式 ‘rand()’ 来随机分配数据,或者使用 ‘UserID’ 来按用户 ID 的余数分布
总结
Clickhouse采用了特殊的引擎设计结构和各种方案保证其查询和存储的高效。分布式的创建和使用还有很多的细节等待去深挖。