kafka入门到实战一(认识kafka)
本次kafka的学习打算从三方面开始
- 认识kafka,掌握基础知识
- 搭建kafka,为使用kafka创建条件
- 使用kafka,基本的使用和解决顺序消费等问题
kafka消费流程
kafka大体消费流程:生产者生产消息—>topic—>分配到制定的分区/自动分配到对应分区(均匀的把消息分配到每个partition上,相当于做了一个轮询)—>consumer消费消息,然后提交一个offset到内部自带的Broker里去,以供下次相同消息快速使用。
Kfaka基础
Kafka是一个分布式的基于发布/订阅模式的消息引擎系统,类似产品Rabbit mq,其特点:
-
削峰填谷:对于突然到来的大量请求,可以配置流控规则,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。
-
应用解耦:使用消息中间件后,主服务模块可以将其他模块需求的消息先发送到消息队列,当其他消费者要使用时,再取出即可。
-
异步处理:当模块结算后,可以先让用户继续使用其他模块,不用等待模块完毕。等到消息处理完毕后,再将最终结果返回给用户,减少用户等待时间。
-
高可用:Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。
5、高性能:Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB),即,高qps。
不同于Rabbit mq的推和拉,kafka只有拉。原因:
- kafka性能特别高,qps可以达到十几二十万,如果消费者达不到这样的能力,还去推,这时候很容易把consumer打挂。
- 很容易导致消息的堆积
Kafka消息队列的两种模式
点对点模式
点对点模式下包括三个角色:
- Queue:消息队列
- Sender:发送者 (生产者)
- Receiver:接收者(消费者)
点对点模式的特点:
- 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
- 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
- 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
发布-订阅模式
点对点模式下包括三个角色:
- Topic:主题
- Sender:发送者 (生产者)
- Receiver:接收者(消费者)
发布/订阅模式的特点:
- 每个消息可以有多个订阅者;
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;
kafka两种消息模型:
- 点对点模型(Peer to Peer,P2P)
- 发布/订阅模型
这里面的点对点指的是同一条消息只能被下游的一个消费者消费,其他消费者则不能消费
在Kafka中实现这种P2P模型的方法就是引入了消费者组(Consumer Group),所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一组主题,这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它为什么要引入消费
者组呢?主要是为了提升消费者端的吞吐量,多个消费者实例同时消费,加速整个消费端的吞吐量(TPS)
另外这里的消费者实例可以是运行消费者应用的进程,也可以是一个线程,它们都称为一个消费者实例(Consumer Instance)
消费者组里面的所有消费者实例不仅“瓜分”订阅主题的数据,而且它们还能彼此协助,假设组内某个实例挂掉了,Kafka能够自动检测到,然后把这个Failed实例之前负责的分区转移给其他活着的消费者
这个过程就是Kafka中大名鼎鼎的**“重平衡”(Rebalance)**,其实既是大名鼎鼎,也是臭名昭著,因为由重平衡引发的消费者问题比比皆是,事实上,目前很多重平衡的Bug社区都无力解决
每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)
注意,这和上面所说的位移完全不是一个概念
上面的“位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了
而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器嘛,另外每个消费者有着自己的消费者位移,因此一定要区分这两类位移的区别
我个人把消息在分区中的位移称为分区位移,而把消费者端的位移称为消费者位移。
Kafka术语
Record:
Kafka处理的主要对象
Topic :
在Kafka中,发布订阅的对象是主题(Topic),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题
**Producer **:
向主题发布消息的客户端应用程序称为生产者(Producer),生产者程序通常持续不断地向一个或多个主题发送消息
Consumer :
而订阅这些主题消息的客户端应用程序就被称为消费者(Consumer)
和生产者类似,消费者也能够同时订阅多个主题的消息
Clients :
将把生产者和消费者统称为客户端(Clients)
可以同时运行多个生产者和消费者实例,这些实例会不断地向Kafka集群中的多个主题生产和消费消息
有客户端自然也就有服务器端
**Broker **:
Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成
Broker负责接收和处理客户端发送过来的请求,以及对消息进行持久化
虽然多个Broker进程能够运行在同一台机器上,但更常见的做法是将不同的Broker分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有Broker进程都挂掉了,其他机器上的Broker也依然能够对外提供
服务,这其实就是Kafka提供高可用的手段之一
Replication:
实现高可用的另一个手段就是备份机制(Replication)
备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在Kafka中被称为副本(Replica)
副本的数量是可以配置的,这些副本保存着相同的数据,但却有不同的角色和作用
Kafka定义了两类副本:
- 领导者副本(Leader Replica):对外提供服务,即与客户端程序进行交互(生产者总是向领导者副本写消息)
- 追随者副本(Follower Replica):只是被动地追随领导者副本而已(向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步),不能与外界进行交互
当然了,你可能知道在很多其他系统中追随者副本是可以对外提供服务的,比如MySQL的从库是可以处理读操作的,但是在Kafka中追随者副本不会对外提供服务
对了,一个有意思的事情是现在已经不提倡使用Master-Slave来指代这种主从关系了,毕竟Slave有奴隶的意思,在美国这种严禁种族歧视的国度,这种表述有点政治不正确了,所以目前大部分的系统都改成Leader-Follower了
Partitioning
虽然有了副本机制可以保证数据的持久化或消息不丢失,但没有解决伸缩性的问题
伸缩性即所谓的Scalability,是分布式系统中非常重要且必须要谨慎对待的问题
那么,什么是伸缩性呢?
我们拿副本来说,虽然现在有了领导者副本和追随者副本,但倘若领导者副本积累了太多的数据以至于单台Broker机器都无法容纳了,此时应该怎么办呢?
一个很自然的想法就是,能否把数据分割成多份保存在不同的Broker上?这个想法听起来很简单,但kafka就是这么做的,这种机制就是所谓的分区(Partitioning)
如果你了解其他分布式系统,你可能听说过分片、分区域等提法,比如MongoDB和Elasticsearch中的Sharding、HBase中的Region,其实它们都是相同的原理,只是Partitioning是最标准的名称
分区机制:
Kafka中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志
生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区0中,要么在分区1中
Kafka的分区编号是从0开始的,如果Topic有100个分区,那么它们的分区号就是从0到99
看到这里,你可能有这样的疑问:刚才提到的副本如何与这里的分区联系在一起呢?
实际上,每个分区下可以配置若干个副本,其中只能有1个领导者副本和N-1个追随者副本
生产者向分区写入消息,每条消息在分区中的位置信息由一个叫**位移(Offset)**的数据来表征,分区位移总是从0开始,假设一个生产者向一个空分区写入了10条消息,那么这10条消息的位移依次是0、1、2、…、9
至此我们能够完整地串联起Kafka的三层消息架构:
- 第一层是主题层,每个主题可以配置M个分区,而每个分区又可以配置N个副本
- 第二层是分区层,每个分区的N个副本中只能有一个充当领导者角色,对外提供服务;其他N-1个副本是追随者副本,只是提供数据冗余之用
- 第三层是消息层,分区中包含若干条消息,每条消息的位移从0开始,依次递增
- 最后,客户端程序只能与分区的领导者副本进行交互。
数据持久化:
Kafka使用**消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)**消息的物理文件
因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作,这也是实现Kafka高吞吐量特性的一个重要手段
不过,如果不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此Kafka必然要定期地删除消息以回收磁盘,怎么删除呢?
简单来说就是通过**日志段(Log Segment)**机制:在Kafka底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的
每个分区都是有序的,分区存储在磁盘中,分区的数据消费是有序的。如果要保证全局有序,那就只挂一个分区,单分区是全局有序的。
重平衡:Rebalance
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
特别注意
- kafka消息默认存储七天,并不是消费完了就没了
- 分区副本作为其中一个决定了kafka的高可用的元素
- 分区在一定的量下,分区越多,kafka性能越高。因为10条消息放在一个分区里面假设要10s,那10个分区则只需要1s就能消费。
- offset的好处在于,如果我的consumer1在消费消息,从1到10,如果到5,offset会记录下这个值。此后就算consumer1挂掉了变成了consumer2,也可以从记录值开始读取,不用重新再来一次。
- 一个topic设置多个分区(partition)就不能保证顺序消费
总结
总结一下名词:
**消息:**Record。Kafka是消息引擎嘛,这里的消息就是指Kafka处理的主要对象
**主题:**Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务
**分区:**Partition。一个有序不变的消息序列。每个主题下可以有多个分区。(一个partition只能对应一个consumer)
**消息位移:**Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值
**副本:**Replica。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本,副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用
**生产者:**Producer。向主题发布新消息的应用程序
**消费者:**Consumer。从主题订阅新消息的应用程序(consumer可以消费一个topic下的多个partitioning)
**消费者位移:**Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移
**消费者组:**Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐
**重平衡:**Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程,Rebalance是Kafka消费者端实现高可用的重要手段
Zookeeper在kafka中的作用
在讲架构图前一定要介绍一下zookeeper,在kafka的consumer和broker中都有用到:
Broker注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:
/brokers/ids
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
Topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:
/borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
生产者负载均衡
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
(1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。
(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
分区 与 消费者 的关系
消费组 (Consumer Group):
consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。
消息消费进度Offset 记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
特别注意
- offset的好处在于,如果我的consumer1在消费消息,从1到10,如果到5,offset会记录下这个值。此后就算consumer1挂掉了变成了consumer2,也可以从记录值开始读取,不用重新再来一次。
- 如果需要重复消费消息的话,kafka有一个内置topic,这个内置的topic的作用就是建立存储offset的partition分区。当需要重复消费3一个消息的时候,就可以拿到该分区里对应的offset,从而节省消息查找的时间。
核心参数
- Broker参数(暂无)
- 存储类
- 日志文件
log.dirs=/home/kafka1,/home/kafka2,/home/kafka3
- Zookeeper相关
- zookeeper集群端口号
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
- 连接类
- 连接管理和安全策略
# 连接管理
listeners=CONTROLLER: //localhost:9092
#安全策略
listener.security.protocol.map=CONTROLLER:PLAINTEX
- Topic管理
#不能自立为王:是否能自己创建topic
auto.create.topics.enable=true
#当leadcer挂了,从哪里去选择新的副本
unclean.leader.election.enable=true
#能否进行重平衡,当消费者挂了,是否选举一个新的consumer
auto.leader.rebalance.enable=true
- 数据留存
# 数据存储多长时间
log.retention.{hours | minutes | ms}:数据寿命hours=168
# 数据存储多大
log.rentention.bytes:祖宅大小-1表示没限制
# 数据拉取的最大限制
message.max.bytes:祖宅大门宽度,默认1000012=976KB
生产消费
#为1时,代表生产者发送消息到broker,只要能确保一个leader收到即可
#存在问题:在leader1收到后,如果leader同步过程中就挂了, 那么其他的partition将接收不到副本
requet.requil.ack=1
#为0时,代表生产者只管发消息,不管topic下的partition是否接受的到
#存在问题:保证了效率,但是舍弃了高可用这个特点
requet.requil.ack=0
#为-1时,代表生产者发送给第一个partition,随后一直等待到所有其他的partition都同步了这个副本才算完成
# 存在问题:高可用得到了保证,但是效率太低
requet.requil.ack=-1