Kafka的基本介绍以及扩展

news2025/1/10 22:31:17

文章目录

    • 基本操作
      • 新增Topic
      • 查询Topic
      • 修改Topic
      • 删除Topic
    • 生产者和消费者
      • 创建生产者
      • 创建消费者
    • Broker扩展
    • Producer扩展
    • Topic、Partition、Message扩展
    • 存储策略
    • 容错机制

基本操作

新增Topic

指定两个分区,两个副本,replication不能大于集群中的broker数

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 2 --replication-factor 2 --topic hello
Created topic hello.

查询Topic

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --list --zookeeper hadoop01:2181
hello

# 查看详细信息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181
Topic: hello	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: hello	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: hello	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0

image-20240312094710653

修改Topic

修改partition的数量,只能增加

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --alter --zookeeper hadoop01:2181 --partitions 5 --topic hello
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!


[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181
Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: 
	Topic: hello	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: hello	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0
	Topic: hello	Partition: 2	Leader: 0	Replicas: 0,2	Isr: 0,2
	Topic: hello	Partition: 3	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: hello	Partition: 4	Leader: 2	Replicas: 2,0	Isr: 2,0

删除Topic

删除topic,删除操作是不可逆的,从1.0开始默认开启删除功能,之前的版本只会标记为删除状态,需要设置delete.topic.enable为true才可以真正删除

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --delete --zookeeper hadoop01:2181 --topic helloTopic hello is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

生产者和消费者

创建生产者

bin/kafka-console-prodecer.sh

创建消费者

bin/kafka-console-consumer.sh
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 5 --replication-factor 2 --topic hello
Created topic hello.

# producer
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic hello

# consumer 这个只消费最新的消息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh  --bootstrap-server hadoop01:9092 --topic hello

# 消费之前的消息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh  --bootstrap-server hadoop01:9092 --topic hello --from-beginning

Broker扩展

配置文件server.properties

# The number of messages to accept before forcing a flush of data to disk
# 根据条数选择刷新磁盘的时机
log.flush.interval.messages=10000

# 根据消息的间隔时间刷新
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000



# The minimum age of a log file to be eligible for deletion due to age  日志保存时间
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies  每隔5分钟检查文件是否满足删除的条件
log.retention.check.interval.ms=300000


Producer扩展

  • Partitioner:根据用户设置的算法(比如根据消息的key来设计算法到底分发到哪个分区里面)来计算发送到哪个分区-Partition,默认是随机
  • 数据通信方式:同步发送和异步发送,同步是指生产者发送数据后,要等待接收方发回响应后再发送下一个数据的通讯方式;异步指发送生产者发送消息后不等接收方响应就立即发送下一条数据的方式,通信方式通过acks的配置来控制。
    • acks:默认为1.表示需要Leader节点回复收到消息
    • acks:all,表示需要所有的Leader节点以及所有的副本节点回复收到消息(acks=-1)
    • acks:0,不需要回复

Topic、Partition、Message扩展

  • 每个Partition在存储层面是Append Log文件,新消息都会被直接追加到log文件的尾部,每条消息在log文件中的位置称为offset(偏移量)
  • 越多的Partition可以容纳更多的Consumer,有效提升并发消费的能力
  • 业务类型增加了可以增加Topic,数据量大需要增加Partition
  • Message:offset,类型是long表示此消息在一个Partition中的起始位置,可以认为offset是Partition中的messageId,自增;MessageSize,类似为int32,表示此消息的字节大小;data,类型为bytes,表示message的具体内容

image-20240312125048098

存储策略

  • 在kafka中每个topic包含1到多个partition,每个partition存储一部分Message,每条Message包含三个属性,其中有一个是Offset
  • Offset相当于这个partition中的message的唯一ID,可以通过分段+索引的方式去找到这个message;分段就是segment文件,每个partition由多个segment文件组成;索引就是index,每个index里面都会记录每个segment文件中的第一条数据的偏移量,然后根据这个偏移量就可以去segment文件中找到对应的消息
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 这个配置就表示每个segment文件的大小,超过这个大小就会再创建一个新的文件
log.segment.bytes=1073741824

kafka消息的存储流程:producer生产的消息会被发送到Topic的多个Partition上面,Topic收到消息之后会往partition的最后一个segment文件中添加这条消息,文件达到一定大小后会创建新的文件

image-20240312130302894

image-20240312125947693

容错机制

  • 一个Broker宕机后对集群的影响不大

    # 模拟节点宕机
    [root@hadoop01 config]# jps
    41728 NameNode
    53523 Kafka
    42246 ResourceManager
    59789 Jps
    41998 SecondaryNameNode
    52655 QuorumPeerMain
    [root@hadoop01 config]# kill 53523
    [root@hadoop01 config]# jps
    41728 NameNode
    59809 Jps
    42246 ResourceManager
    41998 SecondaryNameNode
    52655 QuorumPeerMain
    
    # 连接到kafka
    [root@hadoop01 zookeeper3.8.4]# bin/zkCli.sh
    [zk: localhost:2181(CONNECTED) 0] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers 
    [ids, seqid, topics]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids 
    [1, 2]
    [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://hadoop02:9092"],"jmx_port":-1,"host":"hadoop02","timestamp":"1710206078306","port":9092,"version":4}
    [zk: localhost:2181(CONNECTED) 5] 
    
    
    
    # zookeeper会重新选举leader
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
    Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: 
    	Topic: hello	Partition: 0	Leader: 2	Replicas: 0,2	Isr: 2
    	Topic: hello	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1
    	Topic: hello	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1
    	Topic: hello	Partition: 3	Leader: 1	Replicas: 0,1	Isr: 1
    	Topic: hello	Partition: 4	Leader: 1	Replicas: 1,2	Isr: 1,2
    You have new mail in /var/spool/mail/root
    
    
    • 当kafka集群中新增一个Broker节点,zookeeper会自动识别并在适当的时机选择此节点提供Leader服务
    # 重新启动
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-server-start.sh -daemon config/server.properties 
    You have new mail in /var/spool/mail/root
    [root@hadoop01 kafka_2.12-2.4.0]# jps
    41728 NameNode
    60640 Kafka
    60707 Jps
    42246 ResourceManager
    41998 SecondaryNameNode
    52655 QuorumPeerMain
    
    # 进入zookeeper观察
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
    [0, 1, 2]
    [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://hadoop01:9092"],"jmx_port":-1,"host":"hadoop01","timestamp":"1710221534732","port":9092,"version":4}
    
    # 查询kafka topic信息
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
    Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: 
    	Topic: hello	Partition: 0	Leader: 2	Replicas: 0,2	Isr: 2,0
    	Topic: hello	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0
    	Topic: hello	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1
    	Topic: hello	Partition: 3	Leader: 1	Replicas: 0,1	Isr: 1,0
    	Topic: hello	Partition: 4	Leader: 1	Replicas: 1,2	Isr: 1,2
    You have new mail in /var/spool/mail/root
    
    
  • 新启动的几点不会是任何分区的leader,所以要重新均匀分配,其实不分配也可以,在kafka中有对应的配置

     [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-leader-election.sh --bootstrap-server hadoop01:9092 --election-type preferred --all-topic-partitions
    You have new mail in /var/spool/mail/root
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
    Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: 
    	Topic: hello	Partition: 0	Leader: 0	Replicas: 0,2	Isr: 2,0
    	Topic: hello	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0
    	Topic: hello	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1
    	Topic: hello	Partition: 3	Leader: 0	Replicas: 0,1	Isr: 1,0
    	Topic: hello	Partition: 4	Leader: 1	Replicas: 1,2	Isr: 1,2
    
    

在kafka中的Broker是无状态的,本身是不保存任何信息的,Broker的所有信息都放在zookeeper里面了,所以,Broker进程挂掉或者启动,对集群的影响不大!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1511973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HarmonyOS预览功能报错:[webpack-cli] SyntaxError: Unexpected end of JSON input

harmonyos预览功能报错 在使用DevEco Studio写页面&#xff0c;进行预览的时候报错&#xff1a; [Compile Result] [webpack-cli] SyntaxError: Unexpected end of JSON input [Compile Result] at JSON.parse (<anonymous>) [Compile Result] at updateCached…

Fair Data Exchange:区块链实现的原子式公平数据交换

1. 引言 2024年斯坦福大学和a16z crypto research团队 论文 Atomic and Fair Data Exchange via Blockchain 中&#xff0c;概述了一种构建&#xff08;包含过期EIP-4844 blobs的&#xff09;fair data-markets的协议。该论文源自a16z crypto的暑期实习计划&#xff0c;与四名…

第四弹:Flutter图形渲染性能

目标&#xff1a; 1&#xff09;Flutter图形渲染性能能够媲美原生&#xff1f; 2&#xff09;Flutter性能优于React Native? 一、Flutter图形渲染原理 1.1 Flutter图形渲染原理 Flutter直接调用Skia 1&#xff09;Flutter将一帧录制成SkPicture&#xff08;skp&#xff…

2023 收入最高的十大编程语言

本期共享的是 —— 地球上目前已知超过 200 种可用的编程语言&#xff0c;了解哪些语言在 2023 为开发者提供更高的薪水至关重要。 过去一年里&#xff0c;我分析了来自地球各地超过 1000 万个开发职位空缺&#xff0c;辅助我们了解市场&#xff0c;以及人气最高和收入最高的语…

判断对象是否可以被回收:引用计数法,可达性分析,finalize()判定

引用计数法 对象每次被赋值给变量时&#xff0c;该对象的计数1&#xff0c; 若将该变量置为null,则该对象的计数-1 若该对象的计数器为0 &#xff0c;则该对象就会判定为垃圾对象 可达性分析 遍历内存中的所有变量&#xff0c;静态变量&#xff0c;然后将该变量当作GCroot根…

安装配置HBase

HBase集群需要整个集群所有节点安装的HBase版本保持一致&#xff0c;并且拥有相同的配置&#xff0c;具体配置步骤如下&#xff1a; 1. 解压缩HBase的压缩包 2. 配置HBase的环境变量 3. 修改HBase的配置文件&#xff0c;HBase的配置文件存放在HBase安装目录下的conf中 4. 首…

在没有推出硬盘的情况下,重启mac电脑,外接移动硬盘无法加载显示?

一、mac磁盘工具显示未装载 1.打开终端&#xff0c;输入 diskutil list查看当前硬盘列表&#xff0c;大多数时候&#xff0c;可以解决。 二、使用命令行装载硬盘 执行上面命令后&#xff0c;仍不起作用&#xff0c;则手动挂载&#xff0c;在命令行输入如下内容&#xff1a; …

数学建模理论与实践国防科大版

目录 1.数学建模概论 2.生活中的数学建模 2.1.行走步长问题 2.2.雨中行走问题 2.3.抽奖策略 2.4.《非诚勿扰》女生的“最优选择” 3.集体决策模型 3.1.简单多数规则 3.2.Borda数规则 3.3.群体决策模型公理和阿罗定理 1.数学建模概论 1.数学模型的概念 2.数学建模的概…

WAServiceMainContext.js:2 ReferenceError: result is not defined

WAServiceMainContext.js:2 ReferenceError: result is not defined at success (index.js? [sm]:280) at Function.forEach.u.<computed> (WASubContext.js?twechat&s1710205354985&v2.16.1:2) at :22955/appservice/<api request success callback fun…

最好用的流程编辑器bpmn-js系列之基本使用

BPMN&#xff08;Business Process Modeling Notation&#xff09;是由业务流程管理倡议组织BPMI&#xff08;The Business Process Management Initiative&#xff09;开发的一套标准的业务流程建模符号规范。其目的是为用户提供一套容易理解的标准符号&#xff0c;这些符号作…

Android audiotrack尾帧无声

前言 产品一直有用户反馈音频截断问题。在机遇巧合下现学现卖音频知识处理相关问题。 问题描述 我们查看以下简化播放器代码&#xff1a; class AACPlayer(private val filePath: String) {private val TAG "AACPlayer"private var extractor: MediaExtractor? …

nRF52832——串口 UART 和 UARTE 外设应用

nRF52832——串口 UART 和 UARTE 外设应用 UART 和 UARTE 原理UART 功能描述UARTE 功能介绍 应用实例串口打印实例串口输入与回环UART 模式串口中断 UART 和 UARTE 原理 UART 功能描述 串口 UART 也称为通用异步收发器。是各种处理器中常用的通信接口&#xff0c;在 nRF52 芯…

微信小程序实现上下手势滑动切换

效果图 思路 实现一个微信小程序的复合滚动页面&#xff0c;主要通过Swiper组件实现垂直方向的轮播功能&#xff0c;每个轮播项内部使用Scroll-View组件来展示可垂直滚动的长内容&#xff0c;如图片和文本。 代码 <!-- wxml --> <view class"swiper-container…

Spring Boot+Vue前后端分离项目如何部署到服务器

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

搜索引擎SEO策略介绍

baidu搜索&#xff1a;如何联系八爪鱼SEO baidu搜索&#xff1a;如何联系八爪鱼SEO baidu搜索&#xff1a;如何联系八爪鱼SEO 第一、 关键词的选择策略&#xff1a; 1、门户类的网站关键词选择策略&#xff1a; 网站每个页面本身基本都包含有关键词&#xff1a;网站拥有上百…

STM32-PWR电源控制

PWR(Power Control)电源控制 管理STM32内部的电源供电部分&#xff0c;可以实现可编程电压检测器和低功耗模式的功能。 电源管理器 上电复位&#xff08;POR&#xff09;和掉电复位&#xff08;PDR&#xff09; STM32内部有一个完整的上电复位(POR)和掉电复位(PDR)电路&…

免费搭建导航网站教程带免费空间域名源码

使用免费空间和免费域名免费搭建一个导航网站 手把手视频教程 https://pan.xunlei.com/s/VNsoMehs7RCjz3IClV6h2vNMA1?pwdq596#

【阿里云系列】-部署ACK集群的POD应用日志如何集成到日志服务(SLS)中

介绍 我们在实际部署应用到阿里云的ACK集群后&#xff0c;由于后期应用服务的持续维护诉求可能需要跟踪排查问题&#xff0c;此时就要具备将应用的历史日志存档便于后期排查问题 处理方式 为了解决以上的普遍需求&#xff0c;需要将ACK中的应用日志采集到SLS的Logstore中,然…

照片怎么调到100kb以下?图片压缩可以这样做

在需要通过网络传输或共享图片的场景中&#xff0c;限制文件大小对于提高传输速度和节省带宽非常重要&#xff0c;将图片压缩到100k的文件大小可以确保更快地上传、下载和共享图片&#xff0c;适用于电子邮件、社交媒体、在线相册等网络传输场景&#xff0c;那么如何快速的将图…

云原生之容器编排实践-ruoyi-cloud项目部署到K8S:Nginx1.25.3

背景 前面搭建好了 Kubernetes 集群与私有镜像仓库&#xff0c;终于要进入服务编排的实践环节了。本系列拿 ruoyi-cloud 项目进行练手&#xff0c;按照 MySQL &#xff0c; Nacos &#xff0c; Redis &#xff0c; Nginx &#xff0c; Gateway &#xff0c; Auth &#xff0c;…