【基础篇】ClickHouse 表引擎之集成Kafka

news2025/1/9 2:13:08

文章目录

  • 0.前言
  • 1.集成示例
  • 官方教程
    • 示例1:
    • 示例2:
    • 配置
      • Kerberos 支持
    • 虚拟列
  • 参考文档

在这里插入图片描述

0.前言

ClickHouse为了方便与Kafka集成,提供了一个名为Kafka引擎的专用表引擎。Kafka引擎允许你在ClickHouse中创建一个表,这个表的数据源来自于一个或多个Kafka队列。结合使用Kafka引擎和Materialized Views,可以实现将数据从Kafka队列消费,然后将数据存储到其他引擎的表中,从而实现实时数据处理和查询。

1.集成示例

要创建一个Kafka引擎的表,你需要提供以下几个关键参数:

  1. kafka_broker_list:Kafka代理地址列表,用逗号分隔的字符串。
  2. kafka_topic:要订阅的Kafka主题。
  3. kafka_group_name:消费者组名称,用于标识ClickHouse实例所属的消费者组。
  4. kafka_format:消息格式,用于指定如何将Kafka中的消息解析成表的行,例如JSONEachRow等。

创建一个Kafka引擎的表的示例:

CREATE TABLE kafka_table
(
    column1 String,
    column2 UInt64,
    column3 Float64
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka1:9092,kafka2:9092',
    kafka_topic = 'kafka_topic_name',
    kafka_group_name = 'clickhouse_group',
    kafka_format = 'JSONEachRow';

为了将数据从Kafka表消费并存储到其他表引擎(例如MergeTree)的表中,你可以创建一个Materialized View,例如:

CREATE MATERIALIZED VIEW mv_kafka_to_storage
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(column2)
ORDER BY (column1, column2)
AS SELECT
    column1,
    column2,
    column3
FROM kafka_table;

使用Kafka引擎和Materialized View,你可以在ClickHouse中实现实时数据消费、处理和查询,从而大大提高数据处理的效率。


官方教程

此引擎与 Apache Kafka 结合使用。

Kafka 特性:

  • 发布或者订阅数据流。
  • 容错存储机制。
  • 处理流数据。

老版格式:

    Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
          [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版格式:

    Kafka SETTINGS
      kafka_broker_list = 'localhost:9092',
      kafka_topic_list = 'topic1,topic2',
      kafka_group_name = 'group1',
      kafka_format = 'JSONEachRow',
      kafka_row_delimiter = '\n',
      kafka_schema = '',
      kafka_num_consumers = 2

必要参数:

  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。

可选参数:

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。例如,普罗托船长 需要 schema 文件路径以及根对象 schema.capnp:Message 的名字。
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

示例1:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。了解更多信息请访问 http://kafka.apache.org/intro。

SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

  1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
  2. 创建一个结构表。
  3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

示例2:

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;

为了提高性能,接受的消息被分组为 max_insert_block_size 大小的块。如果未在 stream_flush_interval_ms 毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

停止接收主题数据或更改转换逻辑,请 detach 物化视图:

  DETACH TABLE consumer;
  ATTACH TABLE consumer;

如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

配置

GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

  <!-- Global configuration options for all tables of Kafka engine type -->
  <kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
  </kafka>

  <!-- Configuration specific for topic "logs" -->
  <kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
  </kafka_logs>

有关详细配置选项列表,请参阅 librdkafka配置参考。在 ClickHouse 配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>

Kerberos 支持

对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。

示例:

  <!-- Kerberos-aware Kafka -->
  <kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
    <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
    <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  </kafka>

虚拟列

  • _topic – Kafka 主题。
  • _key – 信息的键。
  • _offset – 消息的偏移量。
  • _timestamp – 消息的时间戳。
  • _timestamp_ms – 消息的时间戳(毫秒)。
  • _partition – Kafka 主题的分区。

参考文档

  • ClickHouse Kafka Engine: https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/
  • ClickHouse + Kafka — How to Build Real-Time Data Pipelines: https://medium.com/@coderunner/debugging-with-git-7afbcd3b9f1e

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1017530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react的状态管理简单钩子方法

1.recoil useProvider文件: import { atom, useRecoilState } from recoil;const initState atom({key: initState,default: {state: [],}, })// 将业务逻辑拆分到一个单独文件中&#xff0c;方便进行状态管理 export interface StateProps {id: number;text: string;isFini…

学习Node js:raw-body模块源码解析

raw-body是什么 raw-body的主要功能是处理HTTP请求体的原始数据。它提供了以下核心功能&#xff1a; 解析请求体&#xff1a;可以从HTTP请求中提取原始数据&#xff0c;包括文本和二进制数据。配置选项&#xff1a;通过配置项&#xff0c;可以设置请求体的大小限制、编码方式…

主题教育活动知识竞赛小程序界面分享

主题教育活动知识竞赛小程序界面分享

Git - Git 工作流程

文章目录 Git WorkFlow图解小结 Git WorkFlow Git Flow是一种基于Git的工作流程&#xff0c;确实利用了Git作为分布式版本控制系统的优势。 本地代码库 (Local Repository): 每个开发者都维护自己的本地代码库&#xff0c;这是Git分布式性质的体现。本地代码库包含了完整的项目…

清华发力了,EUV光刻机技术取得重大突破,外媒:没想到如此快

日前媒体纷纷传言清华研发成功EUV光刻机&#xff0c;这个其实夸大了事实&#xff0c;不过却也确实是EUV光刻机的重大突破&#xff0c;将绕开ASML等西方垄断的EUV光刻技术路线&#xff0c;开辟一条全新的道路。 据了解清华研发成功的并非是EUV光刻机&#xff0c;而是可用于EUV光…

输入M*N阶矩阵A和B,用函数编程计算并输出A与B之和

#include<stdio.h> #define M 2 #define N 3 void scanfjuzhen(int arr[M][N])//向二维数组输入数据 {int i 0;int j 0;for (i 0; i < M; i){for (j 0; j < N; j)scanf("%d", &arr[i][j]);} } void sumjuzhen(int a[M][N], int b[M][N],int c[M]…

性能测试 —— Jmeter 常用三种定时器

1、同步定时器 位置&#xff1a;HTTP请求->定时器->Synchronizing Timer 当需要进行大量用户的并发测试时&#xff0c;为了让用户能真正的同时执行&#xff0c;添加同步定时器&#xff0c;用户阻塞线程&#xff0c;知道线程数达到预先配置的数值&#xff0c;才开始执行…

Rust踩雷笔记(7)——一个链表题例子初识裸指针

题目在这https://leetcode.cn/problems/palindrome-linked-list/&#xff0c;leetcode 234的回文链表&#xff0c;思路很简单&#xff0c;就是fast和slow两个指针&#xff0c;fast一次移动两个、slow一次一个&#xff0c;最后slow指向的链表反转后和head比较就行了。 很简单一…

趣解设计模式之《新娘到底叫啥名啊?》

〇、小故事 前一段时间&#xff0c;在网上流传了这么一段视频&#xff0c;视频是一对新人的婚礼现场&#xff0c;主持人让新郎当着众多亲戚朋友的面&#xff0c;大声对新娘表达自己的爱意&#xff0c;小伙子自信满满大声的对众人说&#xff1a;“我爱你&#xff0c;周秀楠&…

ddns有什么作用?无公网IP怎么将内网IP端口映射外网访问

DDNS是什么&#xff1f; DDNS英文全称Dynamic Domain Name Server&#xff0c;中文含义是指动态域名服务。很多普通路由器或者智能路由器设置中&#xff0c;都可以找到DDNS&#xff08;动态DNS&#xff09;功能。 上面的解释可能过于专业&#xff0c;其实DDNS通俗点说&#xf…

Alpaca构建方式探秘:低成本构造指令数据增强LLM

原博客地址&#xff1a;Alpaca: A Strong, Replicable Instruction-Following Model github地址&#xff1a;https://github.com/tatsu-lab/stanford_alpaca Alpaca简介 Alpaca是斯坦福大学在Meta开源的大模型LLaMA 7B基础上使用自构建的52K指令数据重新训练得到的增强模型&am…

Android Studioc打印+查看日志

目录 Logcat 写入日志 查看应用日志 Logcat 写入日志 错误&#xff1a;Log.e(String, String)警告&#xff1a;Log.w(String, String)信息&#xff1a;Log.i(String, String)调试&#xff1a;Log.d(String, String)详细程度&#xff1a;Log.v(String, String) 查看应用日志 …

IT老齐的Redis 6实战课

NoSQL与Redis6新特性 什么是Redis REDIS一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库&#xff0c;并提供多种语言的 APIRedis的读写性能非常好,官方压测数据为 Redis能读的速度是110000次/s,写的…

【MySQL系列】MySQL的用户管理

「前言」文章内容大致是MySQL的用户管理。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、用户管理1.1 用户信息1.2 创建新用户1.3 删除用户1.4 修改用户密码 二、数据库的权限2.1 给用户授权2.2 回收用户权限 一、用户管理 MySQL与Linux类似&#x…

图形学线性代数

最近学图形学&#xff01;学的是GAMES101-现代计算机图形学入门-闫令琪↓会做点笔记 Lecture 03 Transformation_哔哩哔哩_bilibili 点乘 1&#xff09;主要作用是找到两个方向/向量的夹角 2&#xff09;找一个向量投影到另一个向量 作用 叉乘 此处x,y,z都是坐标轴 作用 1&a…

HTML+CSS画一个卡通中秋月饼

HTMLCSS画一个卡通中秋月饼&#x1f96e;&#x1f96e;&#x1f96e; 中秋活动水个文章 整个divcss实现个月饼&#xff0c;给前端初学者一个练手的demo 效果图 思路 HTMl 先来个轮廓画脸上的东西&#xff1a;眼睛、眉毛、腮红、嘴巴眼睛丰富下瞳孔画20个花瓣 CSS 轮廓是要外…

4G版本云音响设置教程腾讯云平台版本

文章目录 4G本云音响设置教程介绍一、申请设备三元素1.腾讯云物联网平台2.创建产品3.设置产品参数4.添加设备5.获取三元素 二、设置设备三元素1.打开MQTTConfigTools2.计算MQTT参数3.使用USB连接设备4.设置参数 三、腾讯云物联网套件协议使用说明1.推送协议信息2.topic规则说明…

WebGL 根据模型矩阵的逆转置矩阵计算运动物体的光照效果

目录 前言 坐标变换引起法向量变化 变化规律&#xff1a; 魔法矩阵&#xff1a;逆转置矩阵 逆转置矩阵的用法总结 Matrix4对象的 setInverseOf 、transpose 方法规范&#xff08;以完成逆转置矩阵&#xff09; 示例代码&#xff08;LightedTranslatedRotatedCube.js&am…

虚拟机Ubuntu操作系统常用终端命令(2)(详细解释+详细演示)

本篇概要 本篇讲述了Ubuntu操作系统常用的几个功能&#xff0c;即超级用户&#xff0c;虚拟机系统损坏如何修复&#xff0c;用户和组&#xff0c;如何以root登录界面以及文件的权限方面的知识。希望能够得到大家的支持。 文章目录 本篇概要1.超级用户1.1使用超级用户1.2切换到…

【ICer必备基础】MOS电容——电容电压特性详解

【ICer必备基础】MOS电容——电容电压特性详解 1相关定义2MOS电容描述3MOS电容能带分析4可变电容实际应用 1相关定义 MOS电容是集成电路中非常重要的应用&#xff0c;器件电容的定义为&#xff1a; 阈值反型点&#xff1a; 当达到最大耗尽宽度且反型层电荷密度为零时的情形。此…