Kafka 安装 快速入门
Apache Kafka是流行的用于大规模收集、处理、存储和分析数据的开源流处理系统。它以其卓越的性能、低延迟、容错和高吞吐量而闻名,能够每秒处理数千条消息。常用来构建数据管道、利用实时数据流、实现系统监控、数据集成。
如上图,在kafka系统中,一个完整的事件消息处理流程,由以下3个部分组成:
- Producer - 消息生产者,作用是发布、写入事件消息,包括从其他系统导入、导出事件流消息
- Topic - 消息主题,kafka中的消息按照topic的维度进行数据存储(持久化到磁盘中),每一个Message必须归属topic
- Consumer - 消息消费者 对消息进行业务处理
整个事件处理流程是以异步、分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka提供多种部署方式,满足不同的部署场景,它可以部署在物理机、虚拟机、容器上,也可以部署在本地和云中。
工作机制
kafka是CS架构的分布式系统,由客户端、服务器端组成,基于高性能的TCP网络协议进行数据通信。通常情况下,我们将服务器端称之为Broker,上面提到的生产者、消费者都属于客户端。
-
服务器端
kafka可以由多个运行实例组成集群, 集群中节点可以跨越多个数据中心。服务器端负责对消息数据进行持久化操作,Kafka集群具有高度的可扩展性和容错性:如果它的任何服务器发生故障,其他服务器将接管它们的工作,以确保连续运行而不会丢失任何数据。
-
客户端
kafka客户端允许编写分布式应用程序和微服务,即即使在网络问题、机器故障的情况下,也可以并行、大规模、容错的读取、写入、处理事件流消息。kafka官方提供多种语言的客户端集成工具,如Java、Go、Python、C/C++等其他编程工具
事件消息
事件消息记录真实世界发生的业务信息,也成为消息或者事件记录。向kafka读写数据时,数据会以事件消息的形式进行操作,从概念上讲,事件消息至少包含键、值、时间戳等相关信息。如
- 键名称: “Alice”
- 值信息: “Made a payment of $200 to Bob” (向bob支付200美元)
- 时间戳: “Jun. 25, 2020 at 2:06 p.m.”
应用场景
kafka典型的应用场景如下
-
消息中间件
-
网站行为追踪
-
系统监控
-
日志聚合
-
流处理
-
事件源
-
提交日志
快速入门
下载
wget https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
启动
启动kafka必须安装JDK8及以上的版本,本机安装JDK17;同时使用第三方注册中心才能启动,目前Kafka支持使用ZooKeeper、KRaft启动。
-
Zookeeper
# 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动kafka bin/kafka-server-start.sh config/server.properties
-
Kraft
# 创建集群ID KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# 格式化日志目录 $ ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties Formatting /tmp/kraft-combined-logs with metadata.version 3.3-IV3.
# 启动服务 bin/kafka-server-start.sh config/kraft/server.properties
创建topic
Kafka是一个分布式事件流平台,它允许您在多台机器上读、写、存储和处理事件(文档中也称为记录或消息)。
事件流消息包括支付交易、移动电话的地理位置更新、发货订单、物联网设备或医疗设备的传感器测量等。这些事件按TOPIC(主题)组织和存储。非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。因此,在发送消息之前,必须创建一个主题:
# 启动另外一个终端 在kafka的安装目录执行
$ ./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
验证下之前创建的 quickstart-events主题
$ ./bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: A73vhZBfTZiSMhnIoZCm4Q PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1
如上信息显示主题相关的分区信息,暂时忽略,后面再详细介绍。
生产消息
使用Kafka自带的脚本,作为生产者客户端,将事件消息写入之前创建的主题 - quickstart-events中,默认情况下,输入的每一行都将生成单独的事件消息写入主题。
$ ./bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
按住 Ctrl-C 停止生产者写入消息。
消费消息
启动另一个终端会话,运行控制台使用消费者客户端 读取刚刚创建的事件消息:
$ ./bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
hello world
this is kafka
# --from-beginning 表示每次都是从头开始消费数据;不指定的话默认从上一次消费的位置开始
现在可以一边在生产者生产消息,一边观察到消费者消费到最新的数据。
导入导出
存在的业务场景之一:需要与现有的系统(如关系数据库、文件、其他消息中间件)中的数据进行集成(导入、导出操作)。Kafka Connect允许开发者将外部系统中的数据连续导入到kafka,反之亦然。
接下来看如何使用简单的连接器运行kafka connect,将外部的数据导入到kafka主题中。
-
添加插件
# 编辑配置文件 vim config/connect-standalone.properties # 添加配置 保存并退出 plugin.path=libs/connect-file-3.3.1.jar
-
添加模拟数据
echo -e "foo\nbar" > test.txt
-
模拟导入
# 指定运行的数据来源、数据存储 # connect-file-source.properties 文件指明了数据来源 # connect-file-sink.properties 文件指明了数据存储的终点 ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
# connect-file-source.properties 指定工作线程数、来源文件、topic ... tasks.max=1 file=test.txt topic=connect-test
# connect-file-sink.properties 指定工作线程数、来源存储文件名、topic tasks.max=1 file=test.sink.txt topics=connect-test
-
查看结果
$ more test.sink.txt foo bar
性能测试脚本
Kafka除了之前提到过的生产者、消费者脚本用于简单的消息发送、消费测试之外。在bin目录下,还提供了用于性能吞吐量测试脚本,分别为
- kafka-producer-perf-test.sh
- kafka-consumer-perf-test.sh
生产环境中,Kafka性能需要硬件的支持,包括cpu、带宽、内存、磁盘IO,同时也需要对系统进行调优,如Linux文件句柄大小、JVM参数等等。此处仅介绍相关脚本的使用。
生产者脚本
kafka-producer-perf-test.sh 脚本是kafka提供用于测试消息生产者性能的工具,该脚本可以计算出指定时间内系统的吞吐、和平均延迟。
$ bin/kafka-producer-perf-test.sh --topic perf-test --num-records 100000 --record-size 100 --throughput -1 --producer-props bootstrap.servers=127.0.0.1:9092 acks=-1 compression.type=lz4
100000 records sent, 82850.041425 records/sec (7.90 MB/sec), 108.13 ms avg latency, 519.00 ms max latency, 110 ms 50th, 133 ms 95th, 135 ms 99th, 142 ms 99.9th.
成功发送10万条数据,吞吐量为 82850条数据/s; 7.90 MB/sec.平均时延为 108.13 ms,最大时延为 519.00 ms,50 % 的消息延时在 110ms 内,95 % 的消息延时在 133 ms 内,99 % 的消息延时在 142 毫秒内。
消费者脚本
跟生产者性能测试脚本类似,kafka-consumer-perf-test.sh用于测试消费端的性能,可以便捷的计算出消费者的性能信息。
./bin/kafka-consumer-perf-test.sh --bootstrap-server 127.0.0.1:9092 --topic quickstart-events --messages 100000 --threads 8 --reporting-interval 1000