ClickHouse10-ClickHouse中Kafka表引擎

news2024/11/18 8:10:35

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(
    `sys_time` Datetime COMMENT '',
    `num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(
    `sys_time` Datetime COMMENT '',
    `num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(
    `sys_time` Datetime COMMENT '',
    `num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECT
    sys_time,
    num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;

SELECT *
FROM test_ck_sync1_res

Query id: a666f893-5be9-4022-9327-3a1507aa5485

┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘

2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';

CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_reader

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');

INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT Values

Query id: 223a63ab-97fa-488d-8ea7-c2e194155d26

Ok.

4 rows in set. Elapsed: 1.054 sec. 

[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"

9,"Plan","Test1","1970-01-01 08:00:00"

10,"Plan","Test2","1970-01-01 08:00:00"

11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1548178.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

碳课堂|什么是碳资产?企业如何进行碳资产管理?

碳资产是绿色资产的重要类别,在全球气候变化日益严峻的背景下备受关注。在“双碳”目标下,碳资产管理是企业层面实现碳减排目标和低碳转型的关键。 一、什么是碳资产? 碳资产是以碳减排为基础的资产,是企业为了积极应对气候变化&…

华为昇腾asend

昇腾Ascend C编程语言 Ascend C原生支持C/C编程规范,通过多层接口抽象、并行编程范式、孪生调试等技术,极大提高了算子的开发效率,帮助AI 参考文章 手把手教你在昇腾平台上搭建PyTorch训练环境 - 哔哩哔哩 (bilibili.com)https://www.bilibi…

【Java面试题】计算机网络

文章目录 1.计算机网络基础1.1网络分层模型/OSI七层模型是什么?1.2TCP/IP四层模型是什么?每一层的作用?1.2.1TCP四层模型?1.2.2为什么网络要分层? 1.2常见网络协议1.2.1应用层常见的协议1.2.2网络层常见的协议 2.HTTP2…

基于springboot实现大学生入学审核系统项目【项目源码+论文说明】

基于springboot实现大学生入学审核系统演示 摘要 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了大学生入学审核系统的开发全过程。通过分析大学生入学审核系统管理的不足,创建了一个计算机管理大学生入…

大数据开发(日志离线分析项目)

大数据开发(日志离线分析项目) 一、项目需求1、使用jqueryecharts的方式调用程序后台提供的rest api接口,获取json数据,然后通过jquerycss的方式进行数据展示。工作流程如下:2、七大角度1、用户基本信息分析模块2、浏览…

微信怎么恢复聊天记录?效果惊人的3个方法

微信作为我们日常生活中最常用的即时通讯工具之一,承载着我们与亲友之间的重要沟通记录。然而,不可避免地会遇到误删聊天记录的情况,可能是因为手误、设备问题或其他原因。 当我们发现重要的聊天记录不见了,往往会感到焦虑和困扰…

ValueError: Cannot load file containing pickled data when allow_pickle=False

问题描述 遇到报错:ValueError: Cannot load file containing pickled data when allow_pickleFalse 解决方案 经过查阅有人说是与numpy的版本有关,但是还是不要轻易改变环境中的版本,不一定哪个地方就会报错。这里放个解决方案:…

量化交易入门(二十二)MACD指标实现,能挣到钱吗?

学习了MACD指标以后,我们通过量化回测来验证一下,这个指标能否帮我们挣到钱。我们还是使用Backtrader框架,实现MACD的交易策略,然后用苹果股票历史数据完成这个示例。 示例代码如下: import backtrader as bt import…

server端

一、创建项目 expess server 1.1 安装nodemon npm i nodemon 1.2 设置连接数据库mongodb 安装mongoose npm i mongoose 在根目录新建config文件夹/db.config.js // 引入mongodb数据库操作模块 const mongoose require("mongoose") // 连接数据库mongoose.con…

【Git】日志功能

1. git日志显示 # 显示前3条日志 git log -3# 单行显示 git log --oneline# 图表日志 git log --graph# 显示更改摘要 git log --stat# 显示更改位置 git log --patch 或 git log -p# 查看指定文件的提交历史记录 git log {filename}例子1:单行显示 例子2&#xff…

自学编程的六种方法,你必须知道

随着互联网日趋迅猛,编程已经在我们生活当中无处不在了。众所周知,程序员的工资都很不错,于是越来越多的人,都想加入到编程的行业中来。那么如何加入到程序员的行业当中? PHP从入门到放弃,C语言从入门到放…

3款免费甘特图制作工具的比较和选择指南

GanntProject GanttProject https://www.ganttproject.biz/ 是一款项目管理和调度应用,适用于 Windows、macOS 和 Linux。它易于使用,无需任何设置,适用于个人用户和小型团队。该应用提供任务层次结构和依存关系、里程碑、基准行、Gantt 图表…

【深度学习】最强算法模型之:潜在狄利克雷分配(LDA)

潜在狄利克雷分配 1、引言2、潜在狄利克雷分配2.1 定义2.2 原理2.3 算法公式2.4 代码示例 3、总结 1、引言 小屌丝:鱼哥, 给我讲一讲LDA 小鱼:LDA? 你指的是? 小屌丝:就是算法模型的LDA啊, 你…

基于XGBoost和数据预处理的电动汽车车型预测

基于XGBoost和数据预处理的电动汽车车型预测 文章目录 基于XGBoost和数据预处理的电动汽车车型预测1、前言2、导入数据3、各县电动汽车采用情况条形图4、电动车类型饼图5、前5最欢迎的电动车制造商6、XGBoost模型6.1 字符串列的标识6.2 删除不相关的列6.3 编码分类变量6.4 电动…

【机器学习之---数学】统计学基础概念

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 统计学基础 1. 频率派 频率学派(传统学派)认为样本信息来自总体,通过对样本信息的研究可以合理地推断和估计总体信息…

探究 HTTPS 的工作过程

目录 1. HTTPS 协议原理 1.1. 为什么要有HTTPS协议 1.2. 如何理解安全 1.3. HTTPS 协议是什么 2. HTTPS 的前置概念 2.1. 什么是加密 && 解密 2.2. 为什么要加密 2.3. 常见的加密方式 2.3.1. 对称加密 2.3.2. 非对称加密 2.4. 数据摘要 && 数据指纹…

Linux系统------------MySQL备份与恢复

目录 一、数据备份的重要性 二、数据库备份的分类 2.1从物理与逻辑的角度,备份可分为物理备份与逻辑备份 2.2从数据库的备份策略角度:备份可分为完全备份、差异备份、 增量备份 2.2.1完全备份 2.2.2差异备份 2.2.3增量备份 2.2.4备份方式比较 三…

白酒:生产过程的能耗分析与节能减排措施

在当今的绿色环保时代,节能减排已经成为各行各业关注的焦点。作为传统的白酒生产企业,云仓酒庄深知环境保护的重要性,并积极采取措施降低生产过程中的能耗,为可持续发展贡献力量。 在豪迈白酒的生产过程中,能耗主要来自…

【剑指offr--C/C++】JZ23 链表中环的入口结点 与哈希表

一、哈希表(unordered_set)知识点 unordered_set是一种无序的数据集合容器,元素和键同时存在,元素没有按任何特定的顺序排序,而是根据它们的散列(hash)值组织成桶,以允许直接通过值…

不花一分钱,10分钟搭建自己的网站

不花一分钱,10分钟搭建自己的网站 文章目录 不花一分钱,10分钟搭建自己的网站效果展示第1步 账号注册与登录第2步 新建仓库第3步 新建文件夹及文件第4步 网站发布部署大功告成 效果展示 课程效果展示 进阶效果展示 第1步 账号注册与登录 点击这里 https://gitee.com/&…