探索ClickHouse——连接Kafka和Clickhouse

news2025/1/11 4:09:51

安装Kafka

新增用户

sudo adduser kafka
sudo adduser kafka sudo
su -l kafka

安装JDK

sudo apt-get install openjdk-8-jre

下载解压kafka

可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1

配置

配置kafka

vim ~/kafka/config/server.properties

将下面这行加入文件的末尾

# ~/kafka/config/server.properties
delete.topic.enable=true

同时修改log的路径

# ~/kafka/config/server.properties
log.dirs=/home/kafka/logs

创建zookeeper service

sudo vim /etc/systemd/system/zookeeper.service

将下面内容填入上述文件中

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

创建kafka service

sudo vim /etc/systemd/system/kafka.service

将下面内容填入上述文件中

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

启动kafka#

启动服务

sudo systemctl start kafka

查看状态

sudo systemctl status kafka

● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago
Main PID: 3561758 (sh)
Tasks: 42 (limit: 2143)
Memory: 292.4M
CPU: 2.768s
CGroup: /system.slice/kafka.service
├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1”
└─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/>
Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.

可以看到kafka已经处于running状态。

测试

创建Topic

~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

发送消息

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

订阅Topic

新启动一个界面,执行下面命令

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

它会收到上面发的消息

Hello, World

连接

创建表

使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。

clickhouse-client --stream_like_engine_allow_direct_select 1
CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;

CREATE TABLE uk_price_paid_from_kafka
(
uuid_string String,
price_string String,
time String,
postcode String,
a String,
b String,
c String,
addr1 String,
addr2 String,
street String,
locality String,
town String,
district String,
county String,
d String,
e String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1
Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28
Ok.
0 rows in set. Elapsed: 0.008 sec.

给kafka发送消息

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic

进入消息输入模式,发送下面两个消息

"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A"
"{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A"

在这里插入图片描述

Clickhouse收到消息

在clickhouse-client交互终端中执行下面指令:

select * from uk_price_paid_from_kafka;

在这里插入图片描述
可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。

问题

后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。

参考资料

  • https://openjdk.org/install/
  • https://kafka.apache.org/quickstart
  • https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries
  • https://cloud.tencent.com/developer/article/1892086
  • https://sineyuan.github.io/post/clickhouse-kafka/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1048550.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QQ聊天记录文件怎么恢复?这3个方法亲测有效

QQ为用户提供了聊天、语音、视频、在线游戏、社交分享等丰富的功能,满足了用户的各种通讯以及娱乐需求。无论是现在还是过去,QQ仍然在我们的生活中扮演着重要的角色。 如果在使用QQ的过程中,发现文件过期或者被删除了该怎么办?qq…

480439-15-4,一种具有荧光单体的pH敏感性染料Fluorescein O-methacrylate

产品简介:荧光素O-甲基丙烯酸酯是一种具有荧光单体的pH敏感性染料。它可以通过490 nm的激发光谱和520 nm的发射光谱进行表征。它具有荧光素,其是一种负电荷最少的指示剂。它的特性包括生物相容性、无毒性,以及在水溶液中的良好分散性。 CAS号…

Vue3最佳实践 第六章 Pinia,Vuex与axios,VueUse 1(Pinia)

Pinia状态管理 在 Vue3 中项目中组件之间传递数据时,可以使用 Props 和 Emit,还可以使用 Provide/Inject 来代替 Props 和 Emit。Props 和 Emit 只能在具有父子关系的组件之间传递数据,所以层级越深,过程就越复杂。为了解决此类问…

brew 安装MySQL 5.7

写在前面:博主是一只经过实战开发历练后投身培训事业的“小山猪”,昵称取自动画片《狮子王》中的“彭彭”,总是以乐观、积极的心态对待周边的事物。本人的技术路线从Java全栈工程师一路奔向大数据开发、数据挖掘领域,如今终有小成…

【云原生】k8s集群调度

目录 一、调度约束 1.1List-Watch工作机制 1.2调度过程 二、指定调度节点 2.1修改成 nodeSelector 调度方式 三、亲和性 (1)节点亲和性 (2)Pod 亲和性 3.1 键值运算关系 四、污点(Taint) 和 容忍(Tolerations) 4.1污点(…

搭建 Prometheus 对服务进行监控

前言: 服务平时没啥问题,一到过节我放假,它也想放假,所以只能找个监工看着了。当前市面上主流的监工方案是 zabbix 和 prometheus,没有优劣之分,只是适用场景略有区别。我这边的需求就主要是监控服务的端口…

温度与振动监测技术在电机故障智能诊断中的应用

在现代工业生产中,电机是许多设备和机械的核心驱动力。然而,电机故障可能会导致生产中断、设备损坏以及生产成本的增加。为了避免这些问题,工业设备状态监测技术应运而生。本文将探讨如何利用先进的设备状态监测技术,尤其是温度和…

各种业务场景调用API代理的API接口教程

API代理的API接口在各种业务场景中具有广泛的应用,本文将介绍哪些业务场景可以使用API代理的API接口,并提供详细的调用教程和代码演示,同时,我们还将讨论在不同场景下使用API代理的API接口所带来的好处。 哪些业务场景可以使用AP…

C# 字符串和正则表达式

C# 字符串和正则表达式 System.String 类StringBuilder 成员格式化字符串正则表达式 System.String 类 StringBuilder 成员 格式化字符串 正则表达式

央国企信创改造难在何处?先行建设国产身份域管可少走弯路

据统计,全球超 91% 的具规模企业将 Microsoft Active Directory (微软AD)作为数字化身份的基础底座。通常企业达到 300 人以上规模开始建设 AD,而高科技企业早在 50-60 人左右规模时就开始搭建。 AD身份域管是企业身份的事实标准 …

货物寄到英国选择什么物流比较划算?

随着全球化的发展,越来越多的企业开始将产品销售到海外市场,其中英国作为一个重要的贸易伙伴,吸引了大量的中国企业的关注。然而,如何将货物安全、快速地运送到英国,成为了众多企业面临的一个问题。那么,货…

拥抱数字正义时代,看AIGC如何驱动法律变革

人工智能,作为科技领域的代表,目前正在逐步渗透并应用于各个领域。大到政府的社会治理,小到提问的识别延伸,AI已悄然走进了生活中的各个领域。单说人民法院的司法工作中,随机分案的程序设计、音字转换的功能实现、裁判…

04、EL和JSTL核心技术

目录 1 EL表达式(熟悉) 1.1 基本概念 1.2 主要功能 1.3 访问内置对象的数据 1.3.1访问方式 1.3.2 执行流程 1.4 访问请求参数的数据 1.5 访问Bean对象的属性 1.5.1 访问方式 1.5.2 主要区别 1.6 访问集合中的数据 1.7 常用的内置对象 …

二叉树题目:二叉树剪枝

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题:二叉树剪枝 出处:814. 二叉树剪枝 难度 4 级 题目描述 要求 给定二叉树的根结点 root \texttt{root} root,返回移除了所有…

LEO天线,全球市场总体规模,头部前八大厂商排名及市场份额

LEO天线全球市场总体规模 据QYResearch调研团队最新报告“全球LEO天线市场报告2023-2029”显示,预计2029年全球LEO天线市场规模将达到3545.3百万美元,未来几年年复合增长率CAGR为29.6%。 主要驱动因素: 近年来,全球航天工业的投资激增&#…

React antd Table点击下一页后selectedRows丢失之前页选择内容的问题

一、问题 使用了React antd 的<Table>标签&#xff0c;是这样记录选中的行id与行内容的&#xff1a; <TabledataSource{data.list}rowSelection{{selectedRowKeys: selectedIdsInSearchTab,onChange: this.onSelectChange,}} // 表格是否可复选&#xff0c;加 type: …

NFT Insider#109:The Sandbox推出了首个足球小将 NFT 作品集,YGG Web3 游戏峰会即将开启!

引言&#xff1a;NFT Insider由NFT收藏组织WHALE Members、BeepCrypto联合出品&#xff0c;浓缩每周NFT新闻&#xff0c;为大家带来关于NFT最全面、最新鲜、最有价值的讯息。每期周报将从NFT市场数据&#xff0c;艺术新闻类&#xff0c;游戏新闻类&#xff0c;虚拟世界类&#…

情满中秋᛫欢度国庆 | 联诚发与你共度佳节!

转眼九月份又走到尽头 国庆和中秋正好撞了个满怀 随风飘扬的国旗与满街飘香的月饼 国泰民安与阖家团圆 这是大家与小家最美好的祈愿 当中秋遇上国庆&#xff0c;当团圆遇上国诞 双节来临之际 为庆祝传统佳节与祖国生日 也为感谢联诚发每位员工的辛勤付出 9月28日下午 …

国庆旅游带什么东西?国庆数码必备数码清单请收好

出游度假少不了的是啥&#xff1f;当然是拍美美的照片记录快乐啦&#xff01;So&#xff0c;今天就特意给大家准备了一期数码产品&#xff0c;出游必备&#xff0c;而且还可以作为出片道具。 【umelody 轻律 U1头戴式耳机】——269元 产品外观定位时下流行之‘Retro Futurism’…

一朵华为云,如何做好百模千态?

点击关注 文丨刘雨琦、郝鑫 2005年华为提出网络时代的“All IP”&#xff0c;2011年提出数字化时代的“All Cloud”&#xff0c;2023年提出智能时代的“All Intelligence”。 截至目前&#xff0c;华为的战略升级经历了三个阶段。 步入智能化&#xff0c;需要迎接的困难依然…