Kafka系列整体栏目
内容 | 链接地址 |
---|---|
【一】afka安装和基本核心概念 | https://zhenghuisheng.blog.csdn.net/article/details/142213307 |
【二】kafka集群搭建 | https://zhenghuisheng.blog.csdn.net/article/details/142253288 |
【三】springboot整合kafka以及核心参数详解 | https://zhenghuisheng.blog.csdn.net/article/details/142346016 |
【四】kafka线上问题以及高效原理 | https://zhenghuisheng.blog.csdn.net/article/details/142371986 |
kafka线上问题以及高效原理
- 一,kafka线上问题以及高效原理
- 1,如何保证kafka消息不丢失问题
- 1.1,生产者方面
- 1.2,broker方面
- 1.3,消费者方面
- 2,如何保证不重复消费
- 3,如何解决消息积压问题
- 4,kafka如何设计一个延时队列
- 5,kafka事务使用
- 6,kafka高性能原因
- 6.1,零拷贝
- 6.2,顺序写
- 6.3,批量操作
一,kafka线上问题以及高效原理
在前面几篇中,讲解了kakfa的安装部署,以及springboot如何整合kafka的使用,然而在实际的开发中,往往会遇到很多生产中的问题中需要解决,往往需要提前配置好参数,以及根据不同场景解决不同的业务
1,如何保证kafka消息不丢失问题
虽然说kafka是更加趋向于大数据方面,用于数据分发和流失计算等,允许丢失部分数据,但是在某些场合中,也是需要保证消息是不能丢失的,如金融行业,订单数据等。为了保证数据不丢失,不管是任何一种消息队列,都需要考虑三个方面,生产者方面、broker方面和消费者三个方面进行考虑。
1.1,生产者方面
在生产者方面,当消息往broker中投递时,需要通过设置ack机制来保证消息确认机制,通过设置不同的参数来决定消息是否会丢失。为了保证消息不丢失,可以在配置文件中奖ack的值设置成-1
- ack=0时, 性能最高,消息直接异步给完broker就行,不需要broker任何答复,缺点就是容易丢消息
- ack=1时, 性能其次,需要leader结点将数据成功写入到本地日志,但是不需要等待集群中的follower写入,如果出现leader挂掉,但是follower未及时同步,那么在follower变成leader之后,就会丢失这部分消息
- ack=-1时,性能最低,但是安全性最高, 生产者端需要等待broker集群中的leader和副本都成功写入日志
1.2,broker方面
在架构层面,可以部署一套多个结点的broker集群模式,最好是每台机器对应一个broker,然后每个broker配置一个副本,从而保证broker的高可用性,即使某个节点甚至是leader主节点挂了,也能通过选举出其他结点进行故障转移,从而保证消息不丢失
1.3,消费者方面
在kafka中,消费者默认会使用自动提交的方式提交消费的偏移量到broker中,比如5s提交一次,那么如果消费者消费完了某段偏移量的消息之后,在提交到topic中时结点突然挂了,导致topic那边没有成功的记录该消费者的偏移量,导致服务重启之后,新的消费者重新消费这5s的消息,也可能造成消息丢失。
因此最好设置成自动提交偏移量到broker中,需要关闭默认的自动提交,然后设置这个 ack-mode的value值为manual , 指定手动提交确认模式,使用 Acknowledgment 对象来手动确认消费。
enable-auto-commit: false
ack-mode: manual
2,如何保证不重复消费
消息重复消费,主要分为两个方面:一个是生产者重复投递,一个是消费者重复消费同一条
生产者因为网络抖动的原因,导致broker没有及时的给生产者反馈,导致生产者触发了重试策略,又往broker中投递了同一条数据,这就导致了重复投递,因此可以设置以下参数保证生产者的幂等性
enable.idempotence=true
消费者这边,首先还是这个自动提交的问题,需要改成手动提交偏移量到topic,需要将这个自动提交
enable-auto-commit: false
另一个就是可以通过幂等性来保证消息不重复消费,比如给每条消息设置一个唯一id,将消息的id加入的缓存中,或者加入到数据库设置唯一索引等等方式
3,如何解决消息积压问题
在解决积压问题之前,首先需要明白kafka内部消费的几个原理,首先是关于消费者组的问题,一个partition分区中数据只能被消费者组中的一个消费者消费,也就是说最好的情况就是如有10个分区,那么最好就有10个消费者,如果消费者组中增加消费者也没用,因为所有已有的分区都已有消费者进行消费。因此假设说如果消费者小于partition分区的个数,那么就可能出现一个消费者得消费两个分区或者以上,因此这种情况就可以增加消费者的个数来解决积压问题。
第二个问题就是业务场景是否支持rebanlance机制,举个例子,假设已有10个分区,10个消费者,但是突然消费者被移除了一个,那么kakfa就会触发这种机制,就是说此时有一个分区没有消费者消费了,那么kafka就会将这个分区通过这种rebanlance机制分配给其他消费者消费,那么此时就是存在一个消费者得消费两个分区的消息,那么就会降低整个系统的吞吐量
如果不支持这种rebanlance机制,就是不影响其他的消费者消费以及不影响整体系统的吞吐量。比如这个order的topic主题,已有4个分区和4个消费者,此时都在正常消费,c1对应partition1,依次对应,假如此时c3这个消费者突然消费不过来,然后消息一直是挤压在partition中,此时加消费者也没用,但是又不能影响其他消费者的支持消费,那么需要如何设计和解决这种积压问题
其解决方案如下,就是新加一个order2的主题,然后c3这个消费者不做具体的信息消费,而是只作一个转发,将数据转发到新的order2的这个主题中,将原先一个partition和消费者拆分成多个partition和多个消费者。从而在不进行rebanlance的情况下,既不影响整体的吞吐量,也解决了这个消息积压的问题。
除此之外,消费者这边也可以采用多线程,优化硬件,如cpu、内存和磁盘等。
4,kafka如何设计一个延时队列
在kafka中,内部并没有设计这个延时队列,因此如果想通过kakfa实现一个延时队列,那么需要内部自定义设计。比如存在一个支付场景,假设某个订单30分钟没有支付,那么需要主动的关闭这个订单,需要如何设计和实现。
针对上面这个场景,首先需要知道kafka的存储原理,在kakfa中,内部采用的是顺序写的原理,也就是说partition前面写入的内容的时间一定是比partition后面写入的内容的时间是更早的,明白这个原理之后,那么多就可以采用定时任务的形式来进行消费
如创建一个order_30的topic主题,专门存储用户下单后未立马支付的订单。假设在第一个分区中,已经有1000条这种信息,那么就可以开启一个定时任务,每隔30s检测一次,从头开始轮询,如第一条数据到期,那么就对这条消息进行消费,并且修改partition的偏移量,依次下去。
@Service
public class RetryTask {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedDelay = 5000)
public void retryMessages() {
xxx
}
}
5,kafka事务使用
在kakfa中,内部也存在事务概念,但是和mysql不同,kafka的事务只支持批量消息整体,也就是说批量消息要么同时成功,要么同时失败。 Kafka事务主要用于保证消息的一致性和避免消息丢失或重复消费。
核心概念
- 事务:一组生产操作的集合,要么全部成功,要么全部失败。
- 事务ID:每个事务都有一个唯一的事务ID,Kafka用来标识和跟踪事务。
- 事务日志:Kafka维护事务日志以跟踪事务的状态和相关的偏移量。
- 原子性:事务中的消息要么全部提交,要么全部回滚,保证数据的一致性。
在配置文件中,需要配置以下参数
spring:
kafka:
producer:
acks: all
enable-idempotence: true
transaction-id-prefix: my-transactional-id
事务的实现方式如下,也是需要在方法上面加上 Transactional 注解
@Transactional
public void sendTransactionalMessages(String topic, String message) {
kafkaTemplate.executeInTransaction(kafkaTemplate -> {
kafkaTemplate.send(topic, message);
// 可以发送多条消息
return true; // 返回true表示事务成功提交
});
}
Kafka的事务机制适用于需要保证消息处理一致性的场景,主要适用于金融交易系统和订单处理系统
6,kafka高性能原因
6.1,零拷贝
首先内部采用了零拷贝:https://blog.csdn.net/zhenghuishengq/article/details/140721001
kafka采用的是sendfile的零拷贝方式, 只需要两次上下文切换,两次DMA拷贝,0次或者1次的CPU拷贝 。解决了最初的4次上下文,4次DMA,2次CPU拷贝,详细参考上面那篇文章。
6.2,顺序写
内部采用了顺序写:https://zhenghuisheng.blog.csdn.net/article/details/129080088
通过顺序写和顺序读,减少 寻找磁道和盘面时间 ,从而提高整体的性能,详情参考上面这篇文章
6.3,批量操作
kafka内部采用的都是批量的操作,如生产者批量将数据写入缓冲区中,然后批量的缓冲区数据投递到broker中,消费者批量的上报偏移量等。都是通过大量的批量操作来实现数据的传输