kafka架构

2.基础概念
Producer(生产者) : 产生消息的一方。
Consumer(消费者) : 消费消息的一方。
Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partion 这两个重要的概念:
Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
Partion(分区) : Partion 属于 Topic 的一部分。一个 Topic 可以有多个 Partion ,并且同一 Topic 下的 Partion 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。这正如我上面所画的图一样。
3.消息队列模型知道吗?kafka是怎么做到支持这两种模型的?
对于传统的消息队列系统支持两个模型:
点对点:也就是消息只能被一个消费者消费,消费完后消息删除
发布订阅:相当于广播模式,消息可以被所有消费者消费
上面也说到过,kafka其实就是通过Consumer Group同时支持了这两个模型。
如果说所有消费者都属于一个Group,消息只能被同一个Group内的一个消费者消费,那就是点对点模式。
如果每个消费者都是一个单独的Group,那么就是发布订阅模式。
实际上,Kafka通过消费者分组的方式灵活的支持了这两个模型。
4.为什么要引入消息中间件(kafka)
系统解耦,不同系统之间互相不影响。(举个例子,商品信息上带有店铺的名称等信息,某一天修改了店铺名字,微服务之间库是互不影响的,怎么改商品上的店铺名字呢?发kafka消息异步去改商品表的信息。)
控制流量,削峰填谷,根据下游能力控制流量。(秒杀场景下,秒杀的商品信息异步入库,防止流量过大导致数据库崩溃)
5.Kafka 为何如此之快
Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。
批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费。
总结一下其实就是四个要点
顺序读写
零拷贝(kafka零拷贝实际是减少不必要的复制过程。减少中间商赚差价)
消息压缩
分批发送
不用0拷贝的示意图:

0拷贝示意图:

6.那么发送消息时如何选择分区的?
主要有两种方式:
轮询,按照顺序消息依次发送到不同的分区
随机,随机发送到某个分区
7.为什么需要分区?有什么好处?
这个问题很简单,如果说不分区的话,我们发消息写数据都只能保存到一个节点上,这样的话就算这个服务器节点性能再好最终也支撑不住。
实际上分布式系统都面临这个问题,要么收到消息之后进行数据切分,要么提前切分,kafka正是选择了前者,通过分区可以把数据均匀地分布到不同的节点。
分区带来了负载均衡和横向扩展的能力。
发送消息时可以根据分区的数量落在不同的Kafka服务器节点上,提升了并发写消息的性能,消费消息的时候又和消费者绑定了关系,可以从不同节点的不同分区消费消息,提高了读消息的能力。
另外一个就是分区又引入了副本,冗余的副本保证了Kafka的高可用和高持久性。
8.新版本Kafka为什么抛弃了Zookeeper吗?
我认为可以从两个个方面来回答这个问题:
首先,从运维的复杂度来看,Kafka本身是一个分布式系统,他的运维就已经很复杂了,那除此之外,还需要重度依赖另外一个ZK,这对成本和复杂度来说都是一个很大的工作量。
其次,应该是考虑到性能方面的问题,比如之前的提交位移的操作都是保存在ZK里面的,但是ZK实际上不适合这种高频的读写更新操作,这样的话会严重影响ZK集群的性能,这一方面后来新版本中Kafka也把提交和保存位移用消息的方式来处理了。
另外Kafka严重依赖ZK来实现元数据的管理和集群的协调工作,如果集群规模庞大,主题和分区数量很多,会导致ZK集群的元数据过多,集群压力过大,直接影响到很多Watch的延时或者丢失。
9.引入消息中间件引发的问题
数据一致性(如何保证数据不丢失)
消息积压
10.如何避免数据丢失?
思路:先确定哪些环节可能丢消息,如何知道消息是否有丢失,最后是如何保证消息不丢失。
消息的产生到消费经理三个阶段:

1.消息生产者:
参数 acks
该参数表示有多少个分区副本收到消息,才认为本次发送是成功的。
acks=0,只要发送消息就认为成功,生产端不等待服务器节点的响应
acks=1,表示生产者收到 leader 分区的响应就认为发送成功
acks=-1,只有当 ISR 中的副本全部收到消息时,生产端才会认为是成功的。这种配置是最安全的,但由于同步的节点较多,吞吐量会降低。
参数 retries
表示生产端的重试次数,如果重试次数用完后,还是失败,会将消息临时存储在本地磁盘,待服务恢复后再重新发送。建议值 retries=3
参数 retry.backoff.m
消息发送超时或失败后,间隔的重试时间。一般推荐的设置时间是 300 毫秒。
这里要特别注意一种特殊情况,如果MQ服务没有正常响应,不一定代表消息发送失败,也有可能是响应时正好赶上网络抖动,响应超时。
2.MQ服务端
MQ服务端作为消息的存储介质,也有可能会丢失消息。比如:一个分区突然挂掉,那么怎么保证这个分区的数据不丢失,我们会引入副本概念,通过备份来解决这个问题。
具体可设置哪些参数?
参数 replication.factor
表示分区副本的个数,replication.factor >1 当leader 副本挂了,follower副本会被选举为leader继续提供服务。
参数 min.insync.replicas
表示 ISR 最少的副本数量,通常设置 min.insync.replicas >1,这样才有可用的follower副本执行替换,保证消息不丢失
参数 unclean.leader.election.enable
是否可以把非 ISR 集合中的副本选举为 leader 副本。
如果设置为true,而follower副本的同步消息进度落后较多,此时被选举为leader,会导致消息丢失,慎用。
3.消息消费端:
消费端要做的是把消息完整的消费处理掉。但是这里面有个提交位移的步骤。
有的同学,考虑到业务处理消耗时间较长,会单独启动线程拉取消息存储到本地内存队列,然后再搞个线程池并行处理业务逻辑。这样设计有个风险,本地消息如果没有处理完,服务器宕机了,会造成消息丢失。
正确的做法:拉取消息 --- 业务处理 ---- 提交消费位移
关于提交位移,kafka提供了集中参数配置
参数 enable.auto.commit
表示消费位移是否自动提交。
如果拉取了消息,业务逻辑还没处理完,提交了消费位移但是消费端却挂了,消费端恢复或其他消费端接管该分片再也拉取不到这条消息,会造成消息丢失。所以,我们通常设置 enable.auto.commit=false,手动提交消费位移。
11.怎么解决消息被重复消费的问题?
重复消费也不是问题,只要保证消费一次和消费多次的结果一致即可,即幂等性问题(幂等性,就是一条命令,任意多次执行所产生的影响均与一次执行的影响相同)。一种方式是在消费的时候进行逻辑判断校验,比如我们的业务订单,每个消息体都有一个唯一订单id,以订单id为数据库唯一标识,已有就不再执行入库操作。这种是业务逻辑上来处理。
另一种方式是建立一张消息日志表,每条消息都有一个唯一的id,表中记录消息id和消息执行状态,执行过程中设置为执行中,执行结束设置为结束,每次消费前先检查消息的状态。
12.消息乱序
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了
所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。
13.消息积压
如果出现积压,那一定是性能问题,想要解决消息从生产到消费上的性能问题,就首先要知道哪些环节可能出现消息积压,然后在考虑如何解决。因为消息发送之后才会出现积压的问题,所以和消息生产端没有关系,又因为绝大部分的消息队列单节点都能达到每秒钟几万的处理能力,相对于业务逻辑来说,性能不会出现在中间件的消息存储上面。毫无疑问,出问题的肯定是消息消费阶段,消费端出现积压!!
如果是线上突发问题,要临时扩容,增加消费端的数量,与此同时,降级一些非核心的业务。通过扩容和降级承担流量,这是为了表明你对应急问题的处理能力。
其次,才是排查解决异常问题,如通过监控,日志等手段分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。
最后,如果是消费端的处理能力不足,可以通过水平扩容来提供消费端的并发处理能力,但这里有一个考点需要特别注意, 那就是在扩容消费者的实例数的同时,必须同步扩容主题 Topic 的分区数量,确保消费者的实例数和分区数相等。如果消费者的实例数超过了分区数,由于分区是单线程消费,所以这样的扩容就没有效果。
14.如何进行消息检测?
如何判断消息是否发生了消息丢失?
在发消息之前,通过拦截器将消息唯一id注入消息体中,消费端收到消息后,先判断id的连续性或者消费状态,如果发生丢失则告警。
在生产端,每个消息固定一个全局唯一ID,或者附加一个连续递增的版本号,然后在消费端做对应的版本校验。
如何实现?
在生产端发送消息之前,通过拦截器将消息版本号注入消息中,版本号可采用联系id生成,或者采用分布式生成唯一全局id,在消费端收到消息后,通过拦截器检测版本号的连续性或者消费状态,这样把消息检测与业务代码分离,可以通过单独的任务来定位丢失的消息。