kafka
- 概述
- kafka版本
- 流程
- 启动zk
- 配置zk
- 启动命令
- 启动kafka
- 修改server.properties
- 启动命令
- kafka脚本-命令行操作
- 命令行
- 创建主题脚本
- 查看主题
- 主题详情
- 修改主题
- 删除主题
- 大量日志
- 解决方案
- 控制台生产者消费者
- 代码 生产者 消费者
- kafka-tool
- kafka数据文件
- 扩展
- 横向扩展
- 纵向扩展
- 分区
- 消费者组
- 备份
- 分区
- Controller选举
- 图解kafka架构
- Windows集群
- cluster目录
- zk
- broker_1
- broker_2
- broker_3
- kafka & zk启动脚本
- cluster.bat cluster-clear-data.bat
- znode节点
- 临时节点 持久化节点
- watch 节点监听
- kafka在zookeeper中的组成
- zk选举kafka
- 副本分配
- 副本分配策略
- 发送数据流程
- 生产数据
- 消费数据
- 拦截器
- 添加拦截器
- 实现自定义拦截器
- 分区器 分区计算策略
- 自定义分区器
- 分区计算策略
- 数据收集器
- 数据发送者
- 异步数据发送回调
- 同步发送数据回调
- 应答处理级别
- ack == 0
- ack == 1
- ack == -1(all)
- kafka幂等性
- 初始化事务
- 存储文件类型
- 刷写数据条件
- 存储数据
概述
课程地址为 https://www.bilibili.com/video/BV1Gp421m7UN/
kafka版本
2.12-3.6.1
流程
启动zk
配置zk
需要启动zk,作为注册中心
kafka内置了zk,直接命令启动即可
在 kafka 的 config目录下,修改zookeeper.properties 配置文件
# 这里面配置的是有关数据存放的目录
dataDir=E:/kafka/kafka2.xxx/data/zk
启动命令
在 kafka/bin/windows 下,找到 zookeeper-server-start.bat,运行如下命令…/…/config/zookeeper.properties 文件
zookeeper-server-start.bat ../../config/zookeeper.properties
这么一来,可以直接启动zk.出现如下图: 启动成功
启动kafka
修改server.properties
修改 log.dirs 的值
启动命令
kafka-server-start.bat config.properties
kafka脚本-命令行操作
提供了一定的脚本,通过脚本操作kafka
可以进行创建主题 发送消息等操作
命令行
创建主题脚本
相关脚本
kafka-topics.bat
kafka提供的主题脚本
创建 test 主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
查看主题
kafka-topics.bat --bootstrap-server localhost:9092 --list
主题详情
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --describe
修改主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions
将红圈部分改为2
效果如下
删除主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
windows环境下会导致kafka停止运行的错误
由于权限或进程锁定,会导致kafka被关闭
大量日志
由于JDK版本问题导致
解决方案
- 切换JDK17
- 在kafka-run-class.bat 中设置 java_home
控制台生产者消费者
通过控制台控制生产
启动生产者脚本
消费者启动脚本
生产者发送消息,消费者自动获取消息
代码 生产者 消费者
只引入 kafka 的依赖.进行代码
生产者 创建 topic 以及 消息
消费者 订阅 topic 以及消费消息
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
</dependencies>
具体代码请百度.只引入 kafka-client即可.
kafka-tool
这里百度去学习
kafka数据文件
.log结尾的就是数据文件
因为 kafka 以前是做日志传输的.
扩展
横向扩展
增加机器
纵向扩展
增加内存 硬盘 宽带等…
分区
运行多个kafka实例. 对 kafkaBroker 的相同topic进行编号.
这叫做分区
生产者向不同分区传送数据
消费者组
消费者消费所有的分区topic
备份
可以对数据文件进行备份.
分区备份
同一 topic 的是数据文件备份到其他分区的broker上面
但是多个副本,只有一个副本能够提供数据的读写.其他是从节点,负责备份
分区
Leader Follower
Partition 分区
Broker 服务节点集群
Broker Controller: Broker 服务节点集群管理者
Broker Controller Standby : 服务集群管理者备份
Controller选举
BrokerController
down之后,Standby启动.
BrokerStandbyController
启动.
BrokerStandbyController
出现问题,可以继续选择其他主体 Broker ,选举出来一个BrokerController
图解kafka架构
KogManager: 数据管理器
Windows集群
cluster目录
创建cluster目录在硬盘根目录.解压 kafka,复制到该目录.如图
zk
修改kafka自带的zkdata存放目录.然后启动
broker_1
broker_2
和 broker_1 一样
broker_3
和 broker_2 一样
kafka & zk启动脚本
略...
cluster.bat cluster-clear-data.bat
略...
znode节点
临时节点 持久化节点
watch 节点监听
监听kafka的客户端.
kafka在zookeeper中的组成
略…
zk选举kafka
假设一共三个kafka节点
brokerController1 down了.
剩下的两个节点,监听zk,发现broker1掉了之后,会发起请求,谁的请求先到.谁就是 新的 brokerController
副本分配
对副本进行分配,放到不同的broker中
副本分配策略
…
发送数据流程
总的就是这张图
生产数据
消费数据
拦截器
添加拦截器
实现自定义拦截器
public class ValueInterceptorTest implements ProducerInterceptor<String>{
}
分区器 分区计算策略
发送消息.
自定义分区器
实现接口重写方法
public class myKafkaParationer implements Partitioner{}
分区计算策略
略...
数据收集器
数据发送者
异步数据发送回调
producer.send(msg,callBackMethod);
同步发送数据回调
这就同步操作了
应答处理级别
ack 的值
ack == 0
ack == 1
ack == all(-1)
ack == 0
ack == 1
ack == -1(all)
kafka幂等性
初始化事务
prodicer.initTransaction(); // 开启事务
prodicer.commit(); // 提交事务
prodicer.abortTranscation(); // 终止事务
存储文件类型
刷写数据条件
一条数据就从内存刷到硬盘上