ClickHouse Kafka 引擎教程

news2025/1/9 16:35:35

如果您刚开始并且第一次设置 Kafka 和 ClickHouse 需要帮助怎么办?这篇文章也许会提供下帮助。

我们将通过一个端到端示例,使用 Kafka 引擎将数据从 Kafka 主题加载到 ClickHouse 表中。我们还将展示如何重置偏移量和重新加载数据,以及如何更改表架构。最后,我们将演示如何将数据从 ClickHouse 写回 Kafka 主题。

先决条件

下面的练习假设你已经安装并运行了 Kafka 和 ClickHouse。为了方便起见,我们使用了 Kubernetes。Kafka 版本是 Confluent 5.4.0,使用带有三个 Kafka 代理的 Kafka helm chart 安装。ClickHouse版本为20.4.2,使用ClickHouse Kubernetes Operator安装在单个节点上。

这些练习应该适用于任何类型的安装,但您需要相应地更改主机名。如果 Kafka 代理较少,则可能还需要更改复制因子。

Kafka-ClickHouse 集成概述

Kafka 是一种极具可扩展性的消息总线。它的核心是由运行在不同主机上的代理管理的分布式日志。以下是应用程序模型的简短说明。

生产者将消息写入主题,主题是一组消息。使用者从主题中读取消息,该主题分布在分区上。消费者被安排在消费者组中,这允许应用程序从 Kafka 并行读取消息,而不会丢失或重复。

下图说明了上述主要部分。

ClickHouse 可以使用 Kafka 表引擎和物化视图直接从 Kafka 主题读取消息,该视图获取消息并将其推送到 ClickHouse 目标表。目标表通常使用 MergeTree 引擎或 ReplicatedMergeTree 等变体来实现。消息流如下图所示。

也可以从 ClickHouse 写回 Kafka。消息流更简单 - 只需插入到 Kafka 表中即可。下面是流程图。

在 Kafka 上创建主题

现在让我们在 Kafka 上设置一个主题,我们可以使用它来加载消息。登录到 Kafka 服务器,然后使用以下示例中的命令创建主题。在此示例中,“kafka”是服务器的 DNS 名称。如果您有其他 DNS 名称,请改用该名称。您还可以调整分区数以及复制因子。

kafka-topics \
--bootstrap-server kafka:9092 \
--topic readings \
--create --partitions 6 \
--replication-factor 2

检查主题是否已成功创建。

kafka-topics --bootstrap-server kafka:9092 --describe readings
你将看到如下所示的输出,其中显示了其分区的主题和当前状态。
Topic: readings    PartitionCount: 6    ReplicationFactor: 2    Configs:
    Topic: readings    Partition: 0    Leader: 0    Replicas: 0,2    Isr: 0,2
    Topic: readings    Partition: 1    Leader: 2    Replicas: 2,1    Isr: 2,1
    Topic: readings    Partition: 2    Leader: 1    Replicas: 1,0    Isr: 1,0
    Topic: readings    Partition: 3    Leader: 0    Replicas: 0,1    Isr: 0,1
    Topic: readings    Partition: 4    Leader: 2    Replicas: 2,0    Isr: 2,0
    Topic: readings    Partition: 5    Leader: 1    Replicas: 1,2    Isr: 1,2

现在 Kafak 准备工作已完成。让我们转向ClickHouse。

ClickHouse Kafka 引擎设置

要将数据从 Kafka 主题读取到 ClickHouse 表,我们需要做三件事:

  • 一个目标 MergeTree 表,用于为引入的数据提供主目录

  • 一个 Kafka 引擎表,使主题看起来像一个 ClickHouse 表

  • 用于自动将数据从 Kafka 移动到目标表的具体化视图

首先,我们将定义目标 MergeTree 表。登录到 ClickHouse 执行以下 SQL

CREATE TABLE readings (
    readings_id Int32 Codec(DoubleDelta, LZ4),
    time DateTime Codec(DoubleDelta, LZ4),
    date ALIAS toDate(time),
    temperature Decimal(5,2) Codec(T64, LZ4)
) Engine = MergeTree
PARTITION BY toYYYYMM(time)
ORDER BY (readings_id, time);

接下来,我们需要使用 Kafka 引擎创建一个表来连接主题并读取数据。引擎将使用主题“readings”和消费者组名称“readings consumer_group1”从主机 kafka 的代理读取数据。输入格式为 CSV。

请注意,我们省略了“date”列。它是目标表中的别名,将从“time”列自动填充。


CREATE TABLE readings_queue (
    readings_id Int32,
    time DateTime,
    temperature Decimal(5,2)
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-headless.kafka:9092',
         kafka_topic_list = 'readings',
         kafka_group_name = 'readings_consumer_group1',
         kafka_format = 'CSV',
         kafka_max_block_size = 1048576;

前面的设置处理最简单的情况:单个代理、单个主题且没有专用配置。

最后,我们创建一个物化视图,用于在 Kafka 和合并树表之间传输数据。

CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS
SELECT readings_id, time, temperature
FROM readings_queue;

这就是 Kafka 到 ClickHouse 的集成。让我们来测试一下。

加载数据

现在是时候使用 kafka-console-producer 命令加载一些输入数据了。以下示例使用 CSV 格式添加三条记录。


kafka-console-producer --broker-list kafka:9092 --topic readings <<END
1,"2020-05-16 23:55:44",14.2
2,"2020-05-16 23:55:45",20.1
3,"2020-05-16 23:55:51",12.9
END
传输到 readings  表需要几秒钟。如果我们从表中进行查询,我们会得到以下输出。
SELECT *
FROM readings

┌─readings_id─┬────────────────time─┬─temperature─┐
│           1 │ 2020-05-16 23:55:44 │       14.20 │
│           2 │ 2020-05-16 23:55:45 │       20.10 │
│           3 │ 2020-05-16 23:55:51 │       12.90 │
└─────────────┴─────────────────────┴─────────────┘

Kafka 和 ClickHouse 现已连接。

从 Kafka 重读消息

前面的示例从 Kafka 主题中的起始位置开始,并在消息到达时读取消息。这是正常方式,但有时再次阅读消息很有用。例如,您可能希望在修复架构中的 bug 或重新加载备份后重新读取消息。幸运的是,这很容易做到。我们只是重置使用者组中的偏移量。

假设我们丢失了 readings 表中的所有消息,并希望从 Kafka 重新加载它们。首先,让我们使用 TRUNCATE 命令“丢失”消息。

TRUNCATE TABLE readings;

在重置分区上的偏移量之前,我们需要关闭消息消费。为此,请在 ClickHouse 中分离 readings_queue 表,如下所示。

DETACH TABLE readings_queue

接下来,使用以下 Kafka 命令重置用于 readings_queue 表的使用者组中的分区偏移量 (kafka 节点执行)。

kafka-consumer-groups --bootstrap-server kafka:9092 \
 --topic readings --group readings_consumer_group1 \
 --reset-offsets --to-earliest --execute

现在重新连接readings_queue表。

ATTACH TABLE readings_queue

等待几秒钟,丢失的记录将被恢复。您可以运行 SELECT 来确认它们已恢复。

添加虚拟列

使用显示原始 Kafka 消息坐标的信息标记行通常很有用。为此,Kafka 表引擎自动定义了虚拟列。下面介绍如何更改 readings 表以显示源主题、分区和偏移量。

首先,让我们通过分离 Kafka 表来禁用消息使用。消息可能会堆积在主题上,但我们不会丢失它们。

DETACH TABLE readings_queue

接下来,我们通过连续执行以下 SQL 命令来更改目标表和物化视图。请注意,我们只是删除并重新创建具体化视图,而更改目标表,从而保留现有数据。

ALTER TABLE readings
  ADD COLUMN _topic String,
  ADD COLUMN _offset UInt64,
  ADD COLUMN _partition UInt64
  
DROP TABLE readings_queue_mv

CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS
  SELECT readings_id, time, temperature, _topic, _offset, _partition
  FROM readings_queue;

最后,我们通过重新附加 readings_queue 表来再次启用消息使用。


ATTACH TABLE readings_queue

您可以通过截断表并重新加载消息来确认新架构,就像我们在上一节中所做的那样。如果查询数据,它将如下所示。

SELECT
    readings_id AS id, time, temperature AS temp,
    _topic, _offset, _partition
FROM readings

┌─id─┬────────────────time─┬──temp─┬─_topic───┬─_offset─┬─_partition─┐
│  1 │ 2020-05-16 23:55:44 │ 14.20 │ readings │       0 │          5 │
│  2 │ 2020-05-16 23:55:45 │ 20.10 │ readings │       1 │          5 │
│  3 │ 2020-05-16 23:55:51 │ 12.90 │ readings │       2 │          5 │
└────┴─────────────────────┴───────┴──────────┴─────────┴────────────┘

顺便说一句,上述过程与在消息格式更改时升级架构的方式相同。此外,物化视图提供了一种非常通用的方法,可以使 Kafka 消息适应目标表行。您甚至可以定义多个具体化视图,以将消息流拆分到不同的目标表中。

从 ClickHouse 写入 Kafka

在本教程的最后,我们将展示如何将消息从 ClickHouse 写回 Kafka。这是一个相对较新的功能,在当前的 Altinity 稳定版本 19.16.18.85 中可用。

让我们首先在 Kafka 中创建一个新主题来包含消息。我们称其为“readings_high”

kafka-topics \
--bootstrap-server kafka:9092 \
--topic readings_high \
--create --partitions 6 \
--replication-factor 2

接下来,我们需要使用 Kafka 表引擎定义一个指向新主题的表。事实证明,此表可以读取和写入消息,但在此示例中,我们将仅使用它进行写入。

CREATE TABLE readings_high_queue (
    readings_id Int32,
    time DateTime,
    temperature Decimal(5,2)
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
         kafka_topic_list = 'readings_high',
         kafka_group_name = 'readings_high_consumer_group1',
         kafka_format = 'CSV',
         kafka_max_block_size = 1048576;

最后,让我们添加一个实例化视图,将温度大于 20.0 的所有行传输到 readings_high_queue 表。此示例说明了 ClickHouse 物化视图的另一个用例,即在特定条件下生成事件。

CREATE MATERIALIZED VIEW readings_high_queue_mv TO readings_high_queue AS
SELECT readings_id, time, temperature FROM readings
WHERE toFloat32(temperature) >= 20.0

在单独的终端窗口中启动消费者,以从 Kafka 上的 readings_high 主题打印消息,如下所示。这将允许您在 ClickHouse 将行写入 Kafka 时查看行。

kafka-console-consumer --bootstrap-server kafka:9092 --topic readings_high

最后,加载一些数据,这些数据将演示如何写回 Kafka。让我们在原始主题中添加一个新批量。在另一个窗口中运行以下命令。

kafka-console-producer --broker-list kafka:9092 --topic readings <<END
4,"2020-05-16 23:55:52",9.7
5,"2020-05-16 23:55:56",25.3
6,"2020-05-16 23:55:58",14.1
END

几秒钟后,您将在运行 kafka-console-consumer 命令的窗口中看到第二行弹出。它应如下所示:

5,"2020-05-16 23:55:56",25.3

故障处理

如果您在使用任何示例时遇到问题,请查看 ClickHouse 日志。如果尚未启用跟踪日志记录,请启用跟踪日志记录。您可以看到如下消息,这些消息表示 Kafka 表引擎中的活动。

2020.05.17 07:24:20.609147 [ 64 ] {} <Debug> StorageKafka (readings_queue): Started streaming to 1 attached views

所有错误将保存在在clickhouse-server.err.log中。

结论

正如这篇博客文章所展示的,Kafka 表引擎提供了一种简单而强大的方法来集成 Kafka 主题和 ClickHouse 表。显然,管理集成还有很多工作要做,尤其是在生产系统中。我们希望本文能帮助您入门,并使您能够自己探索其他可能性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314422.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

「构」向云端 - 我与 2023 亚马逊云科技 re:Invent 大会

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 2023年亚马逊AWS re:Invent大会宣布一项Amazon Q的创新项目&#x…

微信小程序---使用npm包安装Vant组件库

在小程序项目中&#xff0c;安装Vant 组件库主要分为如下3步: 注意&#xff1a;如果你的文件中不存在pakage.json&#xff0c;请初始化一下包管理器 npm init -y 1.通过 npm 安装(建议指定版本为1.3.3&#xff09; 通过npm npm i vant/weapp1.3.3 -S --production 通过y…

十六、YARN和MapReduce配置

1、部署前提 &#xff08;1&#xff09;配置前提 已经配置好Hadoop集群。 配置内容&#xff1a; &#xff08;2&#xff09;部署说明 &#xff08;3&#xff09;集群规划 2、修改配置文件 MapReduce &#xff08;1&#xff09;修改mapred-env.sh配置文件 export JAVA_HOM…

基于BWA,Bowtie2,samtools、checkm等工具计算宏基因组学序列分析中Contigs与Genes在样品中的丰度,多种计算方式和脚本对比

计算contigs和genes相对丰度可以提供有关微生物群落结构和功能的信息。以下是计算这两个指标的意义&#xff1a; 1. Contigs的相对丰度&#xff1a;contigs是利用基因组测序技术获得的碎片序列&#xff0c;通过计算contigs的相对丰度可以了解微生物群落中不同菌种的相对丰度。…

2023.12.14每日一题

2023.12.14 题目来源我的题解二维前缀和二维差分 题目来源 力扣每日一题&#xff1b;题序&#xff1a;2132 我的题解 哈哈哈哈&#xff01;&#xff01;&#xff01;我不会&#xff0c;借鉴一下官方题解 二维前缀和二维差分 求二维前缀和&#xff0c;用于判断快速判断右下角…

GPT-4V被超越?SEED-Bench多模态大模型测评基准更新

&#x1f4d6; 技术报告 SEED-Bench-1&#xff1a;https://arxiv.org/abs/2307.16125 SEED-Bench-2&#xff1a;https://arxiv.org/abs/2311.17092 &#x1f917; 测评数据 SEED-Bench-1&#xff1a;https://huggingface.co/datasets/AILab-CVC/SEED-Bench SEED-Bench-2&…

Tomcat-安装部署(源码包安装)

一、简介 Tomcat 是由 Apache 开发的一个 Servlet 容器&#xff0c;实现了对 Servlet 和 JSP 的支持&#xff0c;并提供了作为Web服务器的一些特有功能&#xff0c;如Tomcat管理和控制平台、安全域管理和Tomcat阀等。 简单来说&#xff0c;Tomcat是一个WEB应用程序的托管平台…

【期末复习向】长江后浪推前浪之ChatGPT概述

参考文章&#xff1a;GPT系列模型技术路径演进-CSDN博客 这篇文章讲了之前称霸NLP领域的预训练模型bert&#xff0c;它是基于预训练理念&#xff0c;采用完形填空和下一句预测任务2个预训练任务完成特征的提取。当时很多的特定领域的NLP任务&#xff08;如情感分类&#xff0c…

jenkins-Generic Webhook Trigger指定分支构建

文章目录 1 需求分析1.1 关键词 : 2、webhooks 是什么&#xff1f;3、配置步骤3.1 github 里需要的仓库配置&#xff1a;3.2 jenkins 的主要配置3.3 option filter配置用于匹配目标分支 实现指定分支构建 1 需求分析 一个项目一般会开多个分支进行开发&#xff0c;测试&#x…

Redis设计与实现之跳跃表

目录 一、跳跃表 1、跳跃表的实现 2、跳跃表的应用 3、跳跃表的时间复杂度是什么&#xff1f; 二、跳跃表有哪些应用场景&#xff1f; 三、跳跃表和其他数据结构&#xff08;如数组、链表等&#xff09;相比有什么优点和缺点&#xff1f; 四、Redis的跳跃表支持并发操作吗…

使用React实现随机颜色选择器,JS如何生成随机颜色

背景 在标签功能中&#xff0c;由于有「背景色」属性&#xff0c;每次新增标签时都为选择哪种颜色犯难。因此&#xff0c;我们思考如何通过JS代码生成随机颜色&#xff0c;提取一个通用的随机颜色生成工具&#xff0c;并基于React框架封装随机颜色选择器组件。 实际效果 原理…

智能插座是什么

智能插座 电工电气百科 文章目录 智能插座前言一、智能插座是什么二、智能插座的类别三、智能插座的原理总结 前言 智能插座的应用广泛&#xff0c;可以用于智能家居系统中的电器控制&#xff0c;也可以应用在办公室、商业场所和工业控制中&#xff0c;方便快捷地实现电器的远…

Python:如何将MCD12Q1\MOD11A2\MOD13A2原始数据集批量输出为TIFF文件(镶嵌/重投影/)?

博客已同步微信公众号&#xff1a;GIS茄子&#xff1b;若博客出现纰漏或有更多问题交流欢迎关注GIS茄子&#xff0c;或者邮箱联系(推荐-见主页). 00 前言 之前一段时间一直使用ENVI IDL处理遥感数据&#xff0c;但是确实对于一些比较新鲜的东西IDL并没有python那么好的及时性&…

【STM32独立看门狗(IWDG) 】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、看门狗是什么&#xff1f;1.简介2. 主要功能3.独立看门狗如何工作4.寄存器写保护5.看门狗 看门时间 二、使用步骤1.开启时钟2.初始化看门狗3.开启看门狗4.喂…

Knife4j 接口文档如何设置 Authorization 鉴权参数?

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

用23种设计模式打造一个cocos creator的游戏框架----(十一)桥接模式

1、模式标准 模式名称&#xff1a;桥接模式 模式分类&#xff1a;结构型 模式意图&#xff1a;将抽象部分与其实现部分分离&#xff0c;使它们都可以独立地变化。 结构图&#xff1a; 适用于&#xff1a; 1、不希望在抽象和它的实现部分之间有一个固定的绑定关系。例如&am…

高并发如何实现单用户信息查询接口

高并发如何实现单用户信息查询接口 故事情节 产品&#xff1a;小李&#xff0c;有个单用户信息查询的功能&#xff0c;需要你实现一下小李&#xff1a;这还不简单&#xff0c;两分钟我给你实现两分钟过去…小李&#xff1a;欧克了&#xff0c;部署上线了运维&#xff1a;哪个…

Nessus漏洞扫描报错:42873 - SSL Medium Strength Cipher Suites Supported (SWEET32)

个人搭建的windows server 2019服务器,被Nessus工具扫描出现三个漏洞,修复比较过程比较坎坷,特记录下 首先:报错信息: 42873 - SSL Medium Strength Cipher Suites Supported (SWEET32) 104743 - TLS Version 1.0 Protocol Detection 157288 - TLS Version 1.1 Protocol …

网络互通--三层交换机配置

目录 一、三层交换机的原理 1、概念 2、PC A与不同网段的PC B第一次数据转发过程 3、一次路由&#xff0c;多次转发的概念 4、 三层交换机和路由器的比较 二、利用实验理解交换机 1、建立以下拓扑图​编辑 2、分别配置主机的IP地址&#xff0c;子网掩码、网关等信息 3、…

自然语言处理阅读第一弹

Transformer架构 encoder和decoder区别 Embeddings from Language Model (ELMO) 一种基于上下文的预训练模型,用于生成具有语境的词向量。原理讲解ELMO中的几个问题 Bidirectional Encoder Representations from Transformers (BERT) BERT就是原生transformer中的Encoder两…