kafka入门学习

news2024/11/25 18:30:16

kafka官网:

Apache Kafka

Index of /34

kafka学习视频:

05_尚硅谷_Kafka_概述_基础架构_哔哩哔哩_bilibili

学习资料:

(1)【万字长文】浅谈Apache Kafka --- 入门须知

https://km.woa.com/articles/show/516284?kmref=search&from_page=1&no=4

(2)Kafka再均衡原理及源码分析

https://km.woa.com/articles/show/517539?kmref=search&from_page=1&no=5

(3)kafka核心技术与实战 笔记

https://km.woa.com/articles/show/541903?kmref=search&from_page=1&no=6

(4)《kafka源码解析与实战》

1、kafka集群架构

1.0、kafka简介 

kafka是一个分布式的基于发布订阅的消息队列;当然官方的期望比较大将其定位为"流平台"。
kafka的broker是由 scala语言编写; producer 和 consumer 是由java编写。

所谓分布式:简单理解,很多台机器一起去做一件事。

1.1、kafka集群架构

1、一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。 

整个 Kafka 体系结构中引入了以下3个术语:

        (1)Producer: 生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到 Kafka 中。

        (2)Consumer: 消费者,也就是接收消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。

        (3)Broker: 服务代理节点(就看做一台机器)。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。一般而言,我们更习惯使用首字母小写的 broker 来表示服务代理节点。

        (4)zookeeper: kafka集群另外一个非常重要的组成部分就是zk。记录集群中服务器的状态(哪些服务器上线、哪些服务器下线)、记录谁是leader。kafka2.8.0之前必须要有zk配合使用才行,之后就是可选的了。就大方向来说应该是去zk化,因为zk已经称为kafka的瓶颈。但是如果要配合hadoop等使用的话可能还是要配一个大家都能依赖的外部zk,而不是一个框架内部的。

1.2、kafka几个重要概念

AR&ISR&&OSR:

(1)AR(Assigned Replicas),分区内的所有副本统称为AR。
(2)ISR(In-Sync Replicas),意为和leader保持同步的follower+leader集合(leader:0, isr:0,1,2)。
如果follower长时间没有同步数据(没有任何同步数据请求)则该follower将别踢出ISR。这个时间阈值由 replica.lag.time.max.ms 参数设置,其默认值为30s。 例如副本2超时了,此时ISR就变成了 (leader:0, isr:0,1)。
(3)OSR(Out-Of-Sync Replicas),与leader副本同步之后过多的副本组成OSR。
显然可以知道 AR=ISR+_OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合应该为空。

leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

LEO&HW:

LEO(Log End Offset)它标识当前日志文件中下一条待写入消息的 offset,下图中 offset 为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW(High Watermark),对消费者而言只能消费 HW 之前的消息

ISR 与 HW 和 LEO 也有紧密的关系。HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。

如上图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的 offset(LogStartOffset)为0,最后一条消息的 offset 为8,offset 为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取到 offset 在0至5之间的消息,而 offset 为6的消息对消费者而言是不可见的。

1.3、kafka分区机制

1.3.1、kafka分区介绍

        在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题。

        kafka就是通过分区来解决伸缩性(Scalability)问题。试想如果一个topic只对应一个分区,随着topic中数据的积累,很容易就超出了单台 Broker 机器的最大承受范围,此时应该怎么办呢?一个很自然的想法就是把一个topic的数据分割成多份保存在不同的 Broker 上?这种机制就是所谓的分区(Partitioning)。其他分布式系统里,你可能听说过分片、分区域等提法,比如 MongoDB 和 Elasticsearch 中的 Sharding、HBase 中的 Region,其实它们都是相同的原理。Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。

        同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。

        如上图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。

        每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

        不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。下图描绘了主题、分区、副本、Log 以及 LogSegment 之间的关系。

1.3.2、数据的分区分配规则 

Kafka生产者的分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供默认的分区策略,同时它也支持自定义分区策略。常见的分区策略如下:

1. 轮询策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

2. 随机策略

也称 Randomness 策略,所谓随机就是我们随意地将消息放置到任意一个分区上。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。

3. 按消息键保序策略

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。Kafka的主题会有多个分区,分区作为并行任务的最小单位,为消息选择分区要根据消息是否含有键来判断。

1.4、多副本机制(保证高可用)

        Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力(是分区维度的)。备份的思想,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。

        同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样)。副本之间是“一主多从”的关系,其中仅 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。

kafka默认的副本数是1个,生产环境一般配置为2个副本居多。

选主规则:在ISR中,但是又在AR中顺序靠前的作为failover之后的leader。

当然了,我们知道在很多其他系统中follower副本是可以对外提供服务的,比如 MySQL 的从库是可以处理读操作的,但是在 Kafka 中追随者副本不会对外提供服务。

 综合起来后的效果如下:

关于这个架构图,还有如下解释:

(1)、为了方便扩展并提高吞吐量,一个topic可以分为多个partition
(2)、配合分区的设计,提出消费者组的概念,组内每个消费者并行消费。
(3)、一个分区的数据只能由一个消费者消费,不会被多个消费者占有。
(4)、为提高可用性,为每个partition会有若干副本冗余存储。即kafka的副本分为leader和follower,其中只有leader才处理生产消费,follower不处理。当leader挂了后follower可以成为leader。
(5)、kafka集群另外一个非常重要的组成部分就是zk。记录集群中服务器的状态(哪些服务器上线、哪些服务器下线)、记录谁是leader。
注意:kafka2.8.0之前必须要有zk配合使用才行,之后就是可选的了。就大方向来说应该是去zk化,因为zk已经称为kafka的瓶颈。
 

1.4.1、副本故障处理流程

1)Follower故障:

(1)Follower发生故障后会被临时剔除ISR
(2)这个期间Leader和其他Follower继续接收数据
(3)待故障follower恢复后,他会读取本地磁盘记录的上次HW,将log文件高于HW的部分数据截取掉(认为都不可信),从HW开始向Leader进行同步。
(4)等该follower的LEO大于等于该Partition的HW后就认为follwer追上leader,然后就会将此follower重新加入ISR。

2)Leader故障:

(1)Leader发生故障后也同样会从ISR中剔除,同时会选一个一个新的Leader。
(2)为了保证多副本之间数据的一致性,其余的follwer会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这种方式重点在于保证副本间的数据的一致性,并不能保证数据不丢失或者不重复。

1.4.2、分区副本分配(不同的副本落在哪些broker)

(1)默认分配。 尽量均匀分布,同时保证数据可靠。

(2)kafka-reassign-partitions.sh 手动分布kafka分区

具体来说可以指定每个分区的数据都分别落在哪些broker上(例如,不同broker的机器配置差异很大,可能就需要手动调整)。操作步骤如下:

1、创建一个主题,4个分区,2个副本
./kafka-topics.sh --bootstrap-server IP:9092 --create --topic test_topic --partitions 4 --replication-factor 2

2、查看当前topic的信息
./kafka-topics.sh --bootstrap-server IP:9092 --describe --topic test_topic
从副本的分布情况来看,副本目前尽可能的分布在不同的机器上,而我们的需求是期望所有的数据尽可能存储在broker1和broker2上面

3、创建副本存储计划
假设现在有四个broker(1~4)由于broker3和broker4配置较差所以希望副本尽可能的分配到broker1和broker2上。
在bin目录下创建一个increase-replication-factor.json文件,将如下内容粘贴到increase-replication-factor.json并执行。
注:关于如下json非常直观,使用时结合实际改改就好了。
{
    "version":1,
    "partitions":[{"topic":"test_topic","partition":0,"replicas":[1,2]},
    {"topic":"test_topic","partition":1,"replicas":[1,2]},
    {"topic":"test_topic","partition":2,"replicas":[2,1]},
    {"topic":"test_topic","partition":3,"replicas":[2,1]}]
}

4、执行存储计划
./kafka-reassign-partitions.sh --bootstrap-server IP:9092 --reassignment-json-file increase-replication-factor.json --execute

看到下面的内容,说明计划执行完毕,

5、验证第四步中的执行计划
./kafka-reassign-partitions.sh --bootstrap-server IP:9092 --reassignment-json-file increase-replication-factor.json --verify

6、再次查看副本存储情况
./kafka-topics.sh --bootstrap-server IP:9092 --describe --topic test_topic
与第一次的对比,重新分配之后,副本都转移到了broker1和broker2上面了
1.4.3、leader partition的自动平衡机制(避免leader副本过分集中与某些broker上)

        正常情况下,Kafka会自动把leader partition均匀分散在各个机器上来保证没太机器的读写吞吐量都是相对均匀的。但是如果某些broker发生过宕机会可能会导致Leader partition过于集中在其他少部分几台broker上(宕机的broker重启后都默认作为follwer partition)。我们知道kafka的读写操作只会作用Leader副本(不作用follwer副本),显然这就会导致少数几台broker的读写请求压力非常高,造成机器负载不均衡。

为此kafka提供了自动平衡的机制,受控于如下参数。

(1)auto.leader.rebalance.enable: 用于控制是否进行自动平衡,某人为true。
(2)leader.imbalance.per.broker.percentage: 重平衡的阈值。表示允许的不平衡leader的比率,默认是10%。即超过10%就会触发自动平衡,否则不触发。
(3)leader.imbalance.check.interval.seconds: 检查数leader负载是否均衡的间隔时间,默认是300秒。

关于不平衡率的计算:
如果分区2的AR的优先副本是broker0,但实际上broker0却不是leader节点,那么不平衡数就加1;由于AR的副本总数是4,所以broker0的不平衡率为1/4=25%。显然默认配置情况下就需要再平衡。

建议:均衡需要等待(leader好了后才能重新继续生产和消费)对性能肯定也是会有短暂影响的。一般情况我们不希望频繁的自动均衡,所以如果非要保留自动均衡特性的话也要把阈值调高些。
 

1.4.4、增加副本数


在生产环境中某些topic的重要等级提升可以考虑增加副本。副本数的增加不能通过命令行实现,需要制定定计划然后根据计划执行(其实就是前面演示的 kafka-reassign-partitions.sh)。

为什么Kafka不像MySQL和Redis那样允许follwer副本对外提供读服务呢?

首先,Redis和MySQL都支持主从读写分离,这和它们的使用场景有关。对于那种读操作很多而写操作相对不频繁的负载类型而言,采用读写分离是非常不错的方案——我们可以添加很多follower横向扩展,提升读操作性能。反观Kafka,它的主要场景还是在消息引擎而不是以数据存储的方式对外提供读服务,通常涉及频繁地生产消息和消费消息,这不属于典型的读多写少场景,因此读写分离方案在这个场景下并不太适合。

第二,Kafka副本机制使用的是异步消息拉取,因此存在leader和follower之间的不一致性。如果要采用读写分离,必然要处理副本lag引入的一致性问题,比如如何实现read-your-writes、如何保证单调读(monotonic reads)以及处理消息因果顺序颠倒的问题。相反地,如果不采用读写分离,所有客户端读写请求都只在Leader上处理也就没有这些问题了——当然最后全局消息顺序颠倒的问题在Kafka中依然存在,常见的解决办法是使用单分区,其他的方案还有version vector,但是目前Kafka没有提供。

第三,主写从读无非就是为了减轻leader节点的压力,将读请求的负载均衡到follower节点,如果Kafka的分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增加代码实现的复杂程度。

下图是Kafka分区和副本的架构图

如上图所示,Kafka 集群中有4个 broker,某个主题中有3个分区,且副本因子(即副本个数)也为3,如此每个分区便有1个 leader 副本和2个 follower 副本。生产者和消费者只与 leader 副本进行交互,而 follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后。

Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。

2、kafka的安装 

1、首先安装jdk环境。
2、官网下载。kafka本身解压后可以直接运行。不过感觉依赖要搞搞好,例如环境、zk之类的。
3、配置修改
1)broker.id: 必须全局唯一
2)log.dirs: 修改数据存储的目录,默认在tmp下坚决不允许。
3)zk/kafka: 在zk下创建一个kafka专用目录,连接zk。
4、停kafka的时候要先停kafka然后在停zk,否则就只能kill -9来关kafka了。

3、感觉可以学习很多东西。
 (1)文件分发:
 (2)xcall: 在集群管理和分布式系统中非常有用。 

3、kafka常用命令 


关于kafka常用命令参见  这里。

4、kafka生产者发送原理

4.1、kafka生产者原理示意图。

1、整体过程见上图.
(1)主线程: main线程会读消息进行拦截器、序列化器、分区器等环节的处理;

(2)内存池:之后把消息放到和分区对应的内存队列中(这个内存队列的大小是32M);
(3)sender线程批量发送:之后会有一个sender线程从内存队列中一批批的取数据然后发送到broker,发送条件主要受如下两个参数控制(或的关系):
如果应答成功就会从内存队列中请求对应消息,如果一直没有应答成功就一直重试。
①batch.size: 数据达到batch.size之后,sender就会发送数据。默认16k
②linger.ms: 等待时间达到 linger.ms(可配置)也会发送。linger: 徘徊

4.2、生产者应答 acks 机制:   官网说明

为了保证producer发送的数据可靠地发送到指定topic,topic的每个partition收到数据后都要向producer发送ack(acknowledgement确认收到),有ack后发送者才可以做控制消息发送的逻辑(实际上类似于滑动窗口:例如最多发送5条消息,直到最前面以后有ack了才会继续发送)。
根据对数据可靠性的不同要求,ack有多重应答机制:

①acks=0,生产者发送过来就不管了,可靠性差,效率高。

—— 生产者发送过来的数据到达broker就立即返回ack,不需要等待落盘;现在数据可能会丢失。
②acks=1,生产者发送过来数据Leader落盘成功,可靠性中等,效率中等。

—— partition的leader落盘成功后就返回ack,不要求follower也成功落盘;现在如果leader在follower同步成功之前故障了就会丢数据。
③acks=2,生产者发送过来数据Leader和ISR队列里面所有follower都落盘成功才应答,可靠性搞,效率低。

—— 需要partition的leader和follower全部落盘成功后才进行ack;但是如果在follower同步完成后,broker发送ack之前,leader发生故障就会造成数据重复。默认。

         生产环境中,acks=0很少使用。acks=1一般用于传输普通日志(允许丢个别数据)。acks=all一般用于传输重要的对可靠性要求比较高的数据的场景,但是可能存在数据重复性的问题。

        对于acks=-1(all)时:生产者发送过来的数据follower同步完成了,但是在leader进行ack前的一瞬间leader挂了。根据kafka的自动恢复机制备份follower会被选举成为新的leader,显然它是有这条数据的。但是由于发送者并没有收到ack所以发送者认为broker没有收到数据就会进行数据重发,显然这个时候就重复了。

3、同步发送与异步发送(带回调的异步发送)
这里说的同步发送主要是指消息被放到内存队列后还需要真正的被sender发送到broker(内存队列的消息被处理完了),之后才能继续的向内存队列中塞数据。
异步则是直接将消息放到内存队列,至于内存队列里面的消息是否被发送到broker并不关注。
 

4.4、生产者吞吐量调优(kafka高性能 —— 发送消息高性能)

问题: kafka发送消息性能为什么这么高?

(0)多分区机制。

(1)批量发送消息。一次网络传输只发送一条消息,显然效率很低。

(2)消息压缩。目的是进一步减少传输带宽。

(3)内存池复用。就是上图的 recordAccumulator(消息收集器),kafka专门设计的一个不涉及垃圾回收可以服用的内存池。

注:如果是普通的内存当数据发送完毕后数据就还会停留在Producer端的JVM中,由于不在被引用所以会被回收掉。而JVM的GC一定会有一个Stop The World的过程,即便是最先进的垃圾回收器也会有短暂停顿这对kafka高并发场景的性能肯定会代码负面影响。 

①batch.size:  批次大小,默认16k。
②linger.ms:  等待时间,修改为5~100ms。
ps:显然批次更大、间隔更长传输的整体的效率会更高;与之对应的实时性就会降低;③compression.type: 压缩 snappy 等压缩方式可选;
④recordaccumulator: 缓冲区大小,例如修改为 64m;
ps:尤其是当分区数特别多的时候提高缓冲区大小还是比较有用的;

4.5、数据的重复、丢失问题

4.5.1、关于数据传输有如下标准。

至少一次(At Least Once): ack级别设置为-1 + 分区副本≥2 + ISR里应答的最小副本数量≥2. 保证数据不丢,但是可能重复。
最多一次(At Most Once): ack级别设置为0。 博阿正数据不重复,但是可能会丢。
精确一次(Exactly Once): 即不重复也不丢失。

kakfa的解决办法: 0.11版本后kafka引入了 幂等性和事务。

4.5.2、生产者幂等性

(1)幂等性原理。producer不论向broker发送多少次重复数据,broker端都只会持久化一条数据,保证不重复。
重复数据的判断标准: <PID,Partition,SeqNumber>相同的消息提交时,broker只会持久化一条。PID是kafka每次重启都会分配;partition表示分区号;seq number是单调递增的。所以我们知道kafka的幂等保证的是单分区单会话内的不重复。ps: 一次启动kafka就是一次会话。

(2)开启幂等性(默认为true): enable.idempotence    参见官网: https://kafka.apache.org/documentation/#producerconfigs_enable.idempotence

4.5.3、生产者事务

(1)为什么需要事务。单纯的幂等只能保证单分区单次会话内的不重复,如果发生重启就没法保证了。

(2)事务的实现原理。
①每一个broker都有一个事务协调器(Transaction Coordinator).
②事务本身是要持久化存储的,它实际上持久化在一个kafka预留的 __transaction_state 的特殊主题中。这个主题默认有50个分区,每个分区负责一部分事务。事务的划分是根据 transaction.id的hashcode值%50,计算出该事务属于哪个分区。改分区Leader副本所在的broker节点即为这个transaction.id对应的 Transaction Coordinator 节点。
③Producer在使用事务功能前,必须先自定义一个唯一的 transactional.id。 有个这个id,即使客户端挂掉了它重启后也能继续处理未完成的事务。 参见 官网

ps:开启事务,必须开启幂等性。

(3)整体过程如下。

注:关于事务原理大致了解下就行。

4.6、kafka如何保证数据有序(高频)

kafka只能保证单分区特定条件内的有序; 对于多分区,分区间的数据是无序的。对于多分区如果一定要有序的话就要将多分区的数据都拉下来后然后再排序,与其这样还不如直接单分区。

注:这里所说的有序其实就是消费到的数据的顺序是否和生产顺序一致。 

4.6.1、kafka分区

(1)kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection = 1(不需要考虑是否开启幂等性)

(2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
1)未开启幂等性。
max.in.flight.requests.per.connection 需要设置为1.

2)开启幂等性。
max.in.flight.requests.per.connection 需要设置小于等于5.
原因说明: 因为在kafka1.x以后,启用幂等后kafka服务端会缓存producer发来的最近5个request的元数据。故无论如何,都可以保证5个reqeust的数据都是有序的。

注: 表示 单个链接客户端在没有收到ack的时候能发送的最大的消息条数,如果没有ack超过这个数后就会阻塞不发送。默认值为5.
max.in.flight.requests.per.connection  参数含义参见 官网

5、kafka中的zookeeper

1、整体过程见上图
 

5.1、kafka生产者原理示意图。

1、整体过程见上图.
(1)主线

6、kafka的服务端(broker)

概述:kafka集群自身是不处理数据的,它只用来存储数据。数据处理过长(序列化/反序列化/拦截器)都在生产端和消费端。其中拦截器可以用于数据的处理监控。

1、整体过程见上图

6.1、 Kafka的协调者(Coordinator)

协调者(Coordinator)专门为 Consumer Group 服务,作用有:

①辅助实现消费者组的初始化和分区分配(组内的哪个消费者占用哪些个分区);

②为消费者组执行 Rebalance 以及提供位移管理和组成员管理等。

关于消费者组初始化的流程参见后续 消费者组部分

        具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。

第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

6.2、kafka的控制器(依赖zookeeper)

控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器,行使其管理和协调的职责。

1、依赖Zookeeper

控制器是重度依赖Zookeeper的,我们需要首先简单了解一下Apache Zookeeper框架。Apache ZooKeeper 是一个提供高可靠性的分布式协调服务框架。它使用的数据模型类似于文件系统的树形结构,根目录也是以“/”开始。该结构上的每个节点被称为 znode,用来保存一些元数据协调信息。如果以 znode 持久性来划分,znode 可分为持久性 znode 和临时 znode(Ephemeral node,又称瞬时节点)。持久性 znode 不会因为 ZooKeeper 集群重启而消失,而临时 znode 则与创建该 znode 的 ZooKeeper 会话绑定,一旦会话结束,该节点会被自动删除(读写zk的客户端会维护和zk集群的连接,当该连接断开的时候通过此连接创建的瞬时节点就会消失)。ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。依托于这些功能,ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。Kafka 控制器大量使用 Watch 功能实现对集群的协调管理。我们一起来看一张图片,它展示的是 Kafka 在 ZooKeeper 中创建的 znode 分布。你不用了解每个 znode 的作用,但你可以大致体会下 Kafka 对 ZooKeeper 的依赖。

我们主要知道一下几个就好了。 

1)/kafka/brokers/ids  [0,1,2]  #记录当前kafka集群主要有哪些服务器(注:一个broker就当成一台服务器)。每个broker启动的时候都会向zk中进行注册,然后zk这个路径就是相应brokerid的列表。
2)/kafka/brokers/topics/{topic_name}/partitions/0/state {"leader":1,"isr":[1,0,2]} #记录每个topic的每个分区谁是leader、有哪些副本可用。
3)/kafka/controller {"brokerid":0}  #辅助选举leader

4)/kafka/consumer #记录offset。0.9版本之前在zk中保存offset信息,0.9版本之后offset就存在kafka的预留topic中了(__consumer_offsets);原因:通讯压力太大(甚至跨节点)。

注:zookeeper的图形化客户端 prettyzoo,可以用用。

2、控制器是如何选出来的

那么控制器是如何被选出来的呢?

—— Broker在启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举控制器的规则是:第一个成功创建/controller节点的Broker会被指定为控制器。

这个是不是就是最初的leader选举??

3、控制器的作用

1. 主题管理(创建,删除,增加分区)
这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。
2. 分区重分配
Kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能。
3. Preferred领导者选举
Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leader的方案。
4. 集群成员管理(新增Broker,Broker主动关闭,Broker宕机)

自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。
控制器组件会利用watch机制检查Zookeeper的/brokers/ids节点下的子节点数量变更。当有新Broker启动后,它会在/brokers下创建专属的znode节点。一旦创建完毕,Zookeeper会通过Watch机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化。进而开启后续新增Broker作业。
侦测Broker存活性则是依赖于刚刚提到的另一个机制:临时节点。每个Broker启动后,会在/brokers/ids下创建一个临时的znode。当Broker宕机或主机关闭后,该Broker与Zookeeper的会话结束,这个znode会被自动删除。同理,Zookeeper的Watch机制将这一变更推送给控制器,这样控制器就能知道有Broker关闭或宕机了,从而进行善后。
5. 数据服务
控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

        控制器中保存的这些数据在Zookeeper中也保存了一份。每当控制器初始化时,它都会从Zookeeper上读取对应的元数据并填充到自己的缓存中。这里面比较重要的数据有:

  • 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

4、控制器故障转移(Failover)

在 Kafka 集群运行过程中,只能有一台 Broker 充当控制器的角色,那么这就存在单点失效(Single Point of Failure)的风险,Kafka 是如何应对单点失效的呢?答案就是,为控制器提供故障转移功能,也就是说所谓的 Failover。故障转移是指:当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来替代之前失败的控制器。

最开始时,Broker 0 是控制器。当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点。之后,所有存活的 Broker 开始竞选新的控制器身份。Broker 3 最终赢得了选举,成功地在 ZooKeeper 上重建了 /controller 节点。之后,Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中。至此,控制器的 Failover 完成,可以行使正常的工作职责了。

5、控制器内部设计

在 Kafka 0.11 版本之前,控制器的内部设计相当复杂。控制器是多线程的设计,会在内部创建很多线程。如:
(1)为每个Broker创建一个对应的Socket连接,然后在创建一个专属的线程,用于向这些Broker发送特定的请求。
(2)控制连接zookeeper,也会创建单独的线程来处理Watch机制通知回调。
(3)控制器还会为主题删除创建额外的I/O线程。
这些线程还会访问共享的控制器缓存数据,为了维护数据安全性,控制在代码中大量使用ReetrantLock同步机制,进一步拖慢了整个控制器的处理速度。

在0.11版对控制器的底层设计进了重构,最大的改进是:把多线程的方案改成了单线程加事件队列的方案。



1)单线程+队列的实现方式:社区引入了一个事件处理线程,统一处理各种控制器事件,然后控制器将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。
2) 单线程不代表之前提到的所有线程都被干掉了,控制器只是把缓存状态变更方面的工作委托给了这个线程而已。

第二个改进:将之前同步操作Zookeeper全部改为异步操作。 Zookeeper本身的API提供了同步写和异步写两种方式。同步操作zk,在有大量主题分区发生变更时,Zookeeper容易成为系统的瓶颈。

6.3、broker端如何处理请求

无论是 Kafka 客户端还是 Broker 端,它们之间的交互都是通过“请求 / 响应”的方式完成的。比如,客户端会通过网络发送消息生产请求给 Broker,而 Broker 处理完成后,会发送对应的响应给到客户端。

这里我们详细讨论一下 Kafka Broker 端处理请求的全流程。关于如何处理请求,我们很容易想到的方案有两个。

1.顺序处理请求

这个方法实现简单,但是有个致命的缺陷,那就是吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。

2. 每个请求使用单独线程处理

完全采用异步的方式。系统会为每个入站请求都创建单独的线程来处理。这个方法的好处是,它是完全异步的,每个请求的处理都不会阻塞下一个请求。但缺陷也同样明显。为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。还是那句话,这个方法只适用于请求发送频率很低的业务场景。

3. kafka采用的方案。既然这两种方案都不好,那么,Kafka 是如何处理请求的呢?用一句话概括就是,Kafka 使用的是 Reactor 模式(不熟悉的可以参考一下Scalable IO in Java)。Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。Reactor模式的架构如下图:

从这张图中,我们可以发现,多个客户端会发送请求给到 Reactor。Reactor 有个请求分发线程 Dispatcher,也就是图中的 Acceptor,它会将不同的请求下发到多个工作线程中处理。在这个架构中,Acceptor 线程只是用于请求分发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现。而这些工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力。 

如果我们来为 Kafka 画一张类似的图的话,那它应该是这个样子的:

Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher。它包括两个部分:一个是Acceptor 线程和一个工作线程池。对于后者在kafka中有个专属的名字,叫网络线程池。Kafka 提供了 Broker 端参数 num.network.threads,用于调整该网络线程池的线程数。其默认值是 3,表示每台 Broker 启动时会创建 3 个网络线程,专门处理客户端发送的请求。

Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。现在我们了解了客户端发来的请求会被 Broker 端的 Acceptor 线程分发到任意一个网络线程中,由它们来进行处理。那么,当网络线程接收到请求后,它是怎么处理的呢?你可能会认为,它顺序处理不就好了吗?实际上,Kafka 在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。

当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列(RequestChannel)中。Broker 端还有个 IO 线程池(KafkaRequestHandlerPool),负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。IO 线程池处中的线程才是执行请求逻辑的线程。Broker 端参数 num.io.threads 控制了这个线程池中的线程数。目前该参数默认值是 8,表示每台 Broker 启动后自动创建 8 个 IO 线程处理请求。你可以根据实际硬件条件设置此线程池的个数。

请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。

我们再来看看刚刚的那张图,图中有一个叫 Purgatory 的组件,它是用来缓存延时请求(Delayed Request)的所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

      总结来讲,broker的数据处理流程如下。

注意:Acceptor线程的作用只是中转,不用于接受请求数据、更不实际处理请求(写磁盘/访问磁盘);网络线程池中的线程用于从网络中接受请求数据也不用于实际处理请求。请求数据ready后实际的活由IO线程池来干。

socketServer(Acceptor):

                ↓

SocketServer(网络线程池):

                ↓

共享请求队列(网络线程池间共享)

                ↓

IO线程池处理具体请求(数据落磁盘/从磁盘缓存取数据)

                ↓

网络线程池请求对应的相应(一个线程池就有一个) + Purgatory组件(缓存延迟请求)

控制类请求和数据类请求

到目前为止,提及的请求处理流程对于所有请求都是适用的,也就是说,Kafka Broker 对所有请求是一视同仁的。但是,在 Kafka 内部,除了客户端发送的 PRODUCE 请求和 FETCH 请求之外,还有很多执行其他操作的请求类型,比如负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求,负责勒令副本下线的 StopReplica 请求等。与 PRODUCE 和 FETCH 请求相比,这些请求有个明显的不同:它们不是数据类的请求,而是控制类的请求。也就是说,它们并不是操作消息数据的,而是用来执行特定的 Kafka 内部动作的。Kafka 社区把 PRODUCE 和 FETCH 这类请求称为数据类请求,把 LeaderAndIsr、StopReplica 这类请求称为控制类请求。当前这种一视同仁的处理方式对控制类请求是不合理的。为什么呢?因为控制类请求有这样一种能力:它可以直接令数据类请求失效!所以控制类请求应该有更高的优先级。举个简单的例子,假设我们删除了某个主题,那么控制器就会给该主题所有副本所在的 Broker 发送一个名为 StopReplica 的请求。如果此时 Broker 上存有大量积压的 Produce 请求,那么这个 StopReplica 请求只能排队等。如果这些 Produce 请求就是要向该主题发送消息的话,这就显得很讽刺了:主题都要被删除了,处理这些 Produce 请求还有意义吗?此时最合理的处理顺序应该是,赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。基于这些问题,社区于 2.3 版本正式实现了数据类请求和控制类请求的分离。那么,社区是如何解决的呢?Kafka Broker 启动后,会在后台分别创建两套网络线程池和 IO 线程池的组合,它们分别处理数据类请求和控制类请求。至于所用的 Socket 端口,自然是使用不同的端口了,你需要提供不同的 listeners 配置,显式地指定哪套端口用于处理哪类请求。

6.4、kafka的存储设计

6.4.1、kafka的日志结构
       

        kafka采用log的形式进行存储,更具体地讲是以 segment 的形式(追加)存储。一个segment的大小是1G,kafka就是以这种方式将数据分成一个个的segment日志文件(*.log)。为了更快速地从segment取出数据又整了一个位移索引文件(*.index)。另外还有时间戳索引文件(*.timeindex)、使用事务功能时对应的终止事务的索引文件(*.txnindex)。文件的命令就是以当前segment的第一条消息的offset进行命令。

注:采用segment的形式还有一个好处就是便于数据淘汰。

        具体组织形式如下。一般一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象(服务器端的的Log对象)具体对应到物理磁盘就是一个子目录({topic_name}-{partition}命令)。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端,这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组.log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

kafka中"log"对应到具体存储就理解为消息存储时对应的子目录或文件,不要死板的理解为就是“日志”;另外“log对象”就理解为服务器端上的log对应的结构对象。

如何查看*.log、*.index文件

*.log和*.index存的是序列化后的结果直接看看不懂。为此kafka为我们提供了查看 *.log *.index 文件的方法。

kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index

关于索引*.index

1.index为稀疏索引,大约每向.log文件写入4kb数据才会想index文件写入一条索引。间隔由 log.index.interval.bytes 参数控制。 
2.index文件中保存的offset为相对offset,尽量优化offset值所占的空间。

3.具体过程也很简单。①首先根据目标offset快速定位到目标segment文件 ②找到小于等于目标offset的最大offset对应的索引项。③定位到log文件索引项对应的位置,向下遍历找到目标记录。

补充:稀疏索引 VS 稠密索引

索引类型的一个划分维度就是 稀疏索引 vs 稠密索引

稠密索引:即每一条记录都对应一个索引字段;传统意义上理解的B+树构建的索引就是这种索引,每条数据在索引中都有对应的索引项。

稀疏索引:不要求每条数据都有对应的索引项,而是把数据分成若干个块,每个块对应一个索引项。

稀疏索引占用的索引存储空间比较小,但是查找时间较长;

稠密索引查找时间较短,索引存储空间较大。

6.4.2、kafka文件清除

kafka默认的日志保存时间是7天,可以通过调整如下参数修改保存时间。

log.retention.hours: 默认7天,优先级最低。
log.retention.minutes: 分钟级别,设了后小时级别作废。
log.retention.ms: ms级别,优先级最高;设了后minutes/hours都作废。
log.retention.check.interval.ms: 检查周期,默认5分钟。显然过期时间如果小于5min的话这里也要配合调小才行,否则最快也要5min才能淘汰一次。

kakfa提供的日志清理策略有 delete 和 compact 两种。

通过 log.cleanup.policy 进行配置,默认为 delete。  
关于kafka的清理器:   https://kafka.apache.org/documentation/#design_compactionconfig

(1)delete
log.cleanup.policy = delete 所有数据启用删除策略。
1)基于时间:基于时间的删除,segment最大消息时间戳作为文件时间戳,超过过期时间就segment维度删除。默认打开。
2)基于大小:当日志总大小超过设置上限,就删除最早的segment(就相当于mongodb的固定集合)。
总大小通过 log.retention.bytes 设置,默认等于-1,表示无穷大。

(2)compact 日志压缩
注意:并不是我们理解的 snappy 之类的压缩。其实际逻辑为:对相同key的不同value值只保留最后一个版本。
log.cleanup.policy = compact

这种策略只适合特殊场景。比如用户消息的key是用户ID,value是用户的资料,通过这种要锁策略消息集就只保存了所有用户最新的资料。用的不多,知道就可以了。

6.4.3、kafka如何高效持久化日志文件(消息)

更常见的文法是kafka的性能为什么这么高?

(1)顺序IO(Sequence IO)
        对于磁盘来说有一个基本常识:顺序读写很快,随机读写很慢。因为磁盘是典型的IO设备,每次读写都会重新寻址。kafka在将数据持久化到磁盘的时候采用只能追加(Append-only)的顺序写,这也是实现 Kafka 高吞吐量特性的一个重要手段。随着不停地向一个日志写入消息最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?没错,就是前面说的日志段(Log Segment)机制。在 Kafka 底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

(2)利用页缓存(page cache)

        kafka重度依赖底层操作系统提供的PageCache功能!!!! 

       现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读(read-ahead)会提前将一个比较大的磁盘块读入内存。后写(write-behind)会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存(disk cache/page cache),所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接I/O会绕过磁盘缓存)。
        普通应用程序写入数据到文件系统的一般做法是:在内存中保存尽可能多的数据,并在需要时将这些数据刷新到文件系统。kafka则充分的利用的了操作系统的PageCache: 当一个消息被写入kafka后他首先被写入内存中的缓存。然后kafka在一定的时间间隔内将缓存中的消息批量写入磁盘。通过利用缓存就可以实现快速的访问以实现低延迟的读写操作。缓存的大小和时间间隔可以通过kafka参数"log.flush.interval.messages"和"log.flush.interval.ms"来配置。 不过kafka官网并不建议通过Broker端的这两个参数强制写盘,因为kafka认为数据的可靠性应该通过Replica来保证,强制Flush数据到磁盘会对整体性能产生影响。

注: 向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。
        除此之外,页缓存(page cache)还有一个巨大的优势。用过 Java 的人都知道:如果不用页缓存,而是用 JVM 进程中的缓存,对象的内存开销非常大(通常是真实数据大小的几倍甚至更多),此外还需要进行垃圾回收,GC 所带来的 Stop The World 问题也会带来性能问题。可见,页缓存确实优势明显,而且极大地简化了 Kafka 的代码实现。

        将生产者和消费者串起来过程如下。producer将消息发送到broker后,数据并不是直接落入磁盘而是先进入PageCache。PageCache中的数据在达到特定条件时会被内核刷盘落存储。consumer在消费的时候也会先从PageCache中获取消息,获取不到才去磁盘读取并且读取的时候回预读出一些相邻的块放入PageCache以便下一次读取。如果kafka producer 的生产效率和consumer的消费效率基本一致的话,那么整个过程对于磁盘的访问是非常少,基本都是走缓存层进行数据处理,所以生产和消费的性能会非常高。
 

page cache 扩展:

注:这里说的的 page cache 就是之前说的 filesystem cache(fs cache)。用户(kafka)可以主动触发刷盘,或者到达特定条件也操作系统也会触发刷盘(唤醒pdflush线程,将内核缓冲区的数据刷入磁盘)。

https://zhuanlan.zhihu.com/p/436313908?utm_id=0
https://zhuanlan.zhihu.com/p/544939363?utm_id=0
https://zhuanlan.zhihu.com/p/554122550

(1)page cache 的本质是由Linux内核关系的内存区域。我们通过mmap以及buffered I/O将文件读取到内存空间实际上都是读取到 Page Cache 中。
(2)page 与 page cahce。并不是所有的 page 都被组织为 page cache。 page是内存管理分配的基本单位,page cache由多个page构成。page在操作系统中通常为4kb,显然 page cache的大小为4KB的证书倍。
(3)Linux系统上供用户访问的内存分为两个类型:
1)File-backed pages:文件备份页 也就是 page cache 中的page。它对应于磁盘上的若干数据块,对于这些页最重要的一个问题就是脏页回盘。
2)Anonymous pages:匿名也不对应磁盘上的任何磁盘数据块,他们是进程运行的内存空间(栈、局部变量表等)。

如果kafka producer的生产效率和consumer的消费效率基本一致的话,那么整个过程对于磁盘的访问是非常少,基本都是走缓存层进行数据处理,所以生产和消费的性能会非常高。

#另外“文件系统”这个词在不同的场景下是有很多含义的。

1、对磁盘的数据做索引(磁盘→磁面→磁道→扇面)的文件系统。nfs/fat32/ext3 …… 
2、分布式文件系统,对文件做索引(有N多的文件,我们对其中的每个文件做索引),是操作系统上的应用程序。  tfs/gfs/fastdfs
3、用户文件系统(fuse)。就是业务服务平时打印日志,日志这个模块也可以称为文件系统。
4、另外就是VFS(Virtual File System)了。
 

(3)零拷贝技术 

彻底搞懂零拷贝技术( DMA、PageCache) - 知乎

常规的跨服务器数据传输要走网络通讯。如果在应用层传输数据的话不可避免的要先将数据取到应用层,即: 应用层 → socket → 网卡 → 网络传输。这个过程涉及多次数据拷贝。

而kafka的数据传输并没有走应用层,于是最终效果就是PageCache的数据直接到网卡(pagecache没有数据就先从磁盘加载到PageCache)。

本质原因:kafka broker应用层不关心存储数据,所以也就可以不走应用层。
kafka broker做的事情只是把数据读取后存下去然后等待消费者来消费;所有对数据的操作都放在了生产者和消费者端(无论是拦截器/序列化器/反序列化),broker(kafka应用层)并没有对数据做任何处理。

kafka用的是零拷贝技术中的sendfile系统调用。简单来说就是吧数据从内核去copy到socket,然后发送到网卡,避免了内核buffer与用户buffer来说拷贝的弊端。

6.6、kafka的定时器(层级时间轮)

Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O(nlogn) 并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1)。

延时请求(Delayed Operation),也称延迟请求,是指因未满足条件而暂时无法被处理的 Kafka 请求。举个例子,配置了 acks=all 的生产者发送的请求可能一时无法完成,因为 Kafka 必须确保 ISR 中的所有副本都要成功响应这次写入。因此,通常情况下,这些请求没法被立即处理。只有满足了条件或发生了超时,Kafka 才会把该请求标记为完成状态。这就是所谓的延时请求。

Kafka中使用的请求被延时处理的机制是分层时间轮算法。想想我们生活中的手表。手表由时针、分针和秒针组成,它们各自有独立的刻度,但又彼此相关:秒针转动一圈,分针会向前推进一格;分针转动一圈,时针会向前推进一格。这就是典型的分层时间轮。和手表不太一样的是,Kafka 自己有专门的术语。在 Kafka 中,手表中的“一格”叫“一个桶(Bucket)”,而“推进”对应于 Kafka 中的“滴答”,也就是 tick。除此之外,每个 Bucket 下也不是白板一块,它实际上是一个双向循环链表(Doubly Linked Cyclic List),里面保存了一组延时请求。由于是双向链表结构,能够利用 next 和 prev 两个指针快速地定位元素,因此,在 Bucket 下插入和删除一个元素的时间复杂度是 O(1)。当然,双向链表要求同时保存两个指针数据,在节省时间的同时消耗了更多的空间。在算法领域,这是典型的用空间去换时间的优化思想。

在 Kafka 中,具体是怎么应用分层时间轮实现请求队列的呢?

如上图所示,Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize 计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 中的所有任务。

若时间轮的 tickMs 为 1ms 且 wheelSize 等于20,那么可以计算得出总体时间跨度 interval 为20ms。初始情况下表盘指针 currentTime 指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的 TimerTaskList 中。随着时间的不断推移,指针 currentTime 不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的 TimeTaskList 中的任务进行相应的到期操作。此时若又有一个定时为8ms的任务插进来,则会存放到时间格10中,currentTime 再过8ms后会指向时间格10。

如果此时有一个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小?Kafka 中不乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个 wheelSize 为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

如上图所示,复用之前的案例,第一层的时间轮 tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即20ms。每一层时间轮的 wheelSize 是固定的,都是20,那么第二层的时间轮的总体时间跨度 interval 为400ms。以此类推,这个400ms也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为8000ms。

对于之前所说的 350ms 的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的 TimerTaskList。如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的 TimerTaskList。注意到在到期时间为 [400ms,8000ms) 区间内的多个任务(比如 446ms、455ms 和 473ms 的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的 TimerTaskList 的超时时间为 400ms。

随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms,60ms) 的时间格中。再经历 40ms 之后,此时这个任务又被“察觉”,不过还剩余 10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。

设计源于生活。我们常见的钟表就是一种具有三层结构的时间轮,第一层时间轮 tickMs=1s、wheelSize=60、interval=1min,此为秒钟;第二层 tickMs=1min、wheelSize=60、interval=1hour,此为分钟;第三层 tickMs=1hour、wheelSize=12、interval=12hours,此为时钟。

那么Kafka中又是如何实现“时间的推移/流逝”这个场景呢?

Kafka 中的定时器借了 JDK 中的 DelayQueue 来协助推进时间轮。具体做法是对于每个使用到的 TimerTaskList 都加入 DelayQueue,DelayQueue 会根据 TimerTaskList 对应的超时时间 expiration 来排序,最短 expiration 的 TimerTaskList 会被排在 DelayQueue 的队头。

Kafka 中会有一个线程来获取 DelayQueue 中到期的任务列表,这个线程所对应的名称叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。当“收割机”线程获取 DelayQueue 中超时的任务列表 TimerTaskList 之后,既可以根据 TimerTaskList 的 expiration 来推进时间轮的时间,也可以就获取的 TimerTaskList 执行相应的操作,对里面的 TimerTaskEntry 该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

我们开头明确指明的 DelayQueue 不适合 Kafka 这种高性能要求的定时任务,为何这里还要引入 DelayQueue 呢?注意对定时任务项 TimerTaskEntry 的插入和删除操作而言,TimingWheel时间复杂度为 O(1),性能高出 DelayQueue 很多,如果直接将 TimerTaskEntry 插入 DelayQueue,那么性能显然难以支撑。

分析到这里可以发现,Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry 的操作,而 DelayQueue 专门负责时间推进的任务。试想一下,DelayQueue 中的第一个超时任务列表的 expiration 为 200ms,第二个超时任务为 840ms,这里获取 DelayQueue 的队头只需要 O(1) 的时间复杂度(获取之后 DelayQueue 内部才会再次切换出新的队头)。如果采用每秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用 DelayQueue 来辅助以少量空间换时间,从而做到了“精准推进”。Kafka 中的定时器真可谓“知人善用”,用 TimingWheel 做最擅长的任务添加和删除操作,而用 DelayQueue 做最擅长的时间推进工作,两者相辅相成。

7、kafka中的消费者

  1. 消费者采用拉取模型带来的优点有哪些?
  2. 为什么要约定“同一个分区只可被一个消费者处理”?
  3. 消费者如何拉取数据
  4. 消费者如何消费消息
  5. 消费者提交分区偏移量
  6. 消费者组再平衡操作
  7. 消费者组是什么
  8. 消费者组的协调者

7.1、kafka消费模型

对于MQ而言,常见的消息队列消费模式有推和拉两种。
1)pull(拉)模式:kafka采用此种方式,由消费者从broker中主动拉取数据。
2)push(推)模式:kafka没有采用这种方式,因为由broker决定消息发送速率很难适应不同消费者的性能。

       基于推送模型的消息系统,由broker记录消费者的消费状态。broker在将消息推送到消费者后,标记这条消息为已消费,这种方式无法很好地保证消息的处理语义。比如,broker把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为己消费了,但实际上这条消息并没有被实际处理) 。如果要保证消息的处理语义,broker发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种方式需要在客户端和服务端做一些复杂的状态一致性保证,比较复杂。另外,由broker决定消息发送速率很难适应性能参差不齐的消费者。

        因此,kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。这种由消费者控制偏移量的优点是消费者可以按照任意的顺序消费消息,比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前时刻开始消费。broker是无状态的,它不需要标记哪些消息被消费者处理过,也不需要保证一条消息只会被一个消费者处理。而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。当然 pull模式也有不足之处。例如,如果kafka没有数据,消费者就会陷入循环之中,一直放回空数据。

        kafka的消费者模型如下:

7.2、消费者组

7.2.1、消费组

(1)数据层面topic只有partition维度的划分。

①具体来说一个topic分成3个partition之后就真的只有3个partition,并不会因为有两个消费组消费就变成6个partition。
②不同消费组间都是从这三个partition消费的数据,同一个消费组内部每个消费者只能独立占有一个partition。

(2)不同消费者组之间互不影响,你可以消费分区1~3 我也可以消费分区1~3;

(3)消费者组就视为逻辑上的订阅者,虽然对内分为不同消费者但对外就是一个整体;

(4)同一个消费组内不同消费者只能唯一独立占有一个分区。就把消费者组的各个消费者当成一个整体,有人消费partition1了,其他的就消费partitin2、partition3而不是一起消费分区1。

(5)对于任意一个消费者都必须属于某个消费者组,创建消费者的时候就必须指定groupid;

(6)对于一个具体的消费者而言它可以只消费一个分区的数据也可以消费多个分区的数据;

        Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

        传统的消息引擎模型是①点对点模型和②分布/订阅模型。点对点模型的伸缩性很差,因为下游的多个consumer都要抢这个共享消息队列的消息。发布 / 订阅模型倒是允许消息被多个 Consumer 消费,但它的问题也是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。这种全量订阅的方式既不灵活,也会影响消息的真实投递效果。

        如果有这么一种机制,既可以避开这两种模型的缺陷,又兼具它们的优点,那就太好了。幸运的是,Kafka 的 Consumer Group 就是这样的机制。当 Consumer Group 订阅了多个主题后,组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。再加上 Broker 端的消息留存机制,Kafka 的 Consumer Group 完美地规避了上面提到的伸缩性差的问题。可以这么说,Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

分区是以消费者级别被消费的,但分区的消费进度要保存成消费者组级别的

一个分区只能属于一个消费者线程,将分区分配给消费者有以下几种场景。

  • 线程数量多于分区的数量 —— 存在线程闲置
  • 线程数量少于分区的数量 —— 部分线程占据多个分区
  • 线程数量等于分区的数量 —— 一个线程占据一个分区,最理想

那么一个Group下该有多少个Consumer实例呢?理想情况下,Consumer实例的数量应该等Group订阅主题的分区总数。

7.2.2、消费者组初始化

(0)依赖前面介绍的协调器(coordinator);

(1)每个broker都有上都有coordinator组件,一个消费组只会依赖一个broker的协调器;

(2)coordinator节点 = hashcode(groupid)%50(注: 50是__consumer_offsets默认的分区数)

例如: groupid的hashcode结果为1,那么__consumer_offsets主题的1号分区所在的broker上的协调器(coordinator)即为改消费者组依赖的协调器。

消费者组的初始化过程如下:

7.2.3、消费者组消费过程

7.2.4、消费组占用分区的分配规则

        一个consumer group有多个consumer组成,一个topic有多个partition组成。那么每个分区应该由哪个消费者消费呢?
        kafka有四种主流的分区分配策略: Range, RoundRobin, Sticky, CooperativeSticky 。可以通过配置参数 partition.assignment.strategy  修改分区的分配策略。 默认的策略是 Range + CooperativeSticky。kafka可以同时使用多个分区分配策略进行叠加。

注: https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy


1、Range
range是针对每个topic而言的。首先对topic的分区进行排序编号,例如7个分区编号为{0,1,2,3,4,5,6};同样消费者也进行排序编号,例如三个消费者编号为{C0,C1,C2}。
然后通过 partition数/consumer数 决定每个消费者应该消费几个分区。如果除不尽,则前面的几个消费者就多消费1个分区。

问题: 容易产生数据倾斜。如果消费组只是消费一个topic c0多消费1个分区问题不大。但是如果有N多个topic对于每个主题c0都要去多消费一个分区就意味着c0比其他分区多消费了N个分区。

在Range方式下。如果某个消费者挂了(假设c0),在 session.timeout.ms 超时时间过了后其对应的分区就会作为整体分配给其他消费者。注意: 如果这个消费者占了3个分区那么这3个分区会整体给到某一个消费者。
再过一定时间(默认15s)故障消费者就会从消费组中移除,待重平衡完成后分区就按照Range最初的规则进行重新分配。


2、RoundRobin 
RoundRobin 则是针对消费者组所订阅的所有topic而言的,其实就是为了避免 range 策略中消费组订阅多个topic时的倾斜问题。其思路也很简单,就是将消费者组订阅的多个topic的消费组进行统一规划然后按照range中同样的方式进行分配;这样均衡性会好很多。

所以 RoundRobin 策略有两个前提条件必须满足:
①同一个消费者组里面的所有消费者的 num.streams(消费者线程数) 必须相等;
②每个消费者订阅的主题必须相同; 注:大家都订阅了这些主题,这样才可以把这些主题的所有分区进行统一规划。
 

3、Sticky
尽量均匀且随机的分配。 粘性分区是kafka从 0.11.x版本之后引入的分配策略。
 

7.2.5、kafka消费者异常引发再平衡(rebalance) 

本质上就是让组内的消费者如何消费这些分区达成共识,不要出现某些分区没人消费。

①心跳超时引发再平衡。每个消费者都会定期的和coordinator进行通讯即心跳(默认3秒),一旦超过 session.timeout.ms=45s没有通讯coordinator就会认为这个消费者挂了将其从消费者组中移除,触发再平衡。

②消费者处理消息时间过长引发再平衡。消费者两次拉取的时间间隔超过max.poll.interval.ms=5min就认为消费者卡住了,此时也会将其从消费者组中移除,触发再平衡。

7.3、重平衡(rebalance)

        Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件(Coordinator)的帮助下,完成订阅主题分区的分配。那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。

  • 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile("t.*c")) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要被“踢出”组呢?

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Rebalance 发生时,Group 下所有的 Consumer 实例都会协调在一起共同参与。你可能会问,每个 Consumer 实例怎么知道应该消费订阅主题的哪些分区呢?这就需要上面提到的分配策略的协助了。

另外,Rebalance有些“缺点”需要我们特别关注,思考更好的设计应该是什么样子的。

首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。

所以,我们尽量避免一些非必要的Rebalance。第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要仔细地设置 session.timeout.ms(决定了 Consumer 存活性的时间间隔)和 heartbeat.interval.ms(控制发送心跳请求频率的参数) 的值。第二类非必要 Rebalance 是 Consumer 消费时间过长导致的,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

7.4、消费者组重平衡过程

        消费者组的重平衡流程,它的作用是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助 Kafka Broker 端的 Coordinator 组件,在 Coordinator 的帮助下完成整个消费者组的分区重分配。注意:这里是消费者组维度的行为。

1. 触发与通知

a)重平衡过程通过消息者端的心跳线程(Heartbeat Thread)通知到其他消费者实例。
b)Kafka Java消费者需要定期地发送心跳请求到Broker端的协调者,以表明它还存活着。
在kafka 0.10.1.0版本之前,发送心跳请求是在消费者主线程完成的,也就是代码中调用KafkaConsumer.poll方法的那个线程。这样做,消息处理逻辑也是在这个线程中完成的 ,因此,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者错判消费者已死。在此版本后,kafka社区引入了单独的心跳线程来专门执行心跳请求发送,避免这个问题。
c) 消费者端的参数heartbeat.interval.ms,从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率。

2. 消费者组状态机
重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。Kafka设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。
a) Kafka消费者组状态
(1)Empty:组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。
(2)Dead:组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者保存着当前向它注册过的所有组信息,所谓元数据就是类似于这些注册信息。
(3)PreparingRebalance:消费者组准备开启重平衡,此时所有成员都要重新请求加消费者组
(4)CompletingRebalance:消费者组下所有成员已经加入,各个成员正在等待分配方案。
(5)stable:消费者组的稳定状态。该状态表明重平衡已经完成,组内成员能够正常消费数据了。

b) 状态机的各个状态流转图如下:

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka定期自动删除过期位移的条件就是,组要处于Empty状态。如果消费者组停了很长时间(超过7天),那么Kafka很可能就把该组的位移数据删除了。

3. 消费者端重平衡流程

重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。在消费者端,重平衡分为以下两个步骤:
1) 加入组:JoinGroup请求
2) 等待领导者消费者分配方案:SyncGroup请求
当组内成员加入组时,他会向协调者发送JoinGroup请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送JoinGroup 请求的成员自动成为领导者。注意区分这里的领导者和之前介绍的领导者副本,不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
选出领导者之后,协调者会把消费者组订阅信息封装进JoinGroup请求的响应中,然后发给领导者,由领导者统一做出分配方案后,进入下一步:发送SyncGroup请求。在这一步中,领导者向协调者发送SyncGroup请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送SyncGroup请求,只是请求体中并没有实际内容。这一步的目的是让协调者接收分配方案,然后统一以SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。

以上是JoinGroup 请求的处理过程。就像前面说的,JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。下面这张图是SyncGroup请求的处理流程。

SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

4. Broker端(协调者端)重平衡场景剖析

分以下几个场景来讨论,这几个场景分别是新成员加入组、组成员主动离组、组成员崩溃离组、组成员提交位移。

Case 1: 新成员入组

新成员入组是指组处于 Stable 状态后,有新成员加入。当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。
Case 2: 组成员主动离组

主动离组就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。

Case 3: 组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数session.timeout.ms控制的。也就是说,Kafka 一般不会超过session.timeout.ms就能感知到这个崩溃。当然,后面处理崩溃离组的流程与之前是一样的。

Case 4: 重平衡时协调者对组内成员提交位移的处理
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后在开启正常JoinGroup/SyncGroup请求发送。

Case 5: 流量上涨导致消费限流,消费限流延时回包触发消费者rebalance
实际业务使用的时候出现过这种重平衡,据说是这个原因。

7.5、位移主题(__consumer_offsets)

        针对 Consumer Group,Kafka 是怎么管理位移的呢?

__consumer_offsets 是Kafka专门为记录位移预留的内部主题,也被称为位移主题,即 Offsets Topic。和你创建的其他主题一样,位移主题就是普通的 Kafka 主题。你可以手动地创建它、修改它,甚至是删除它。为什么要引入位移主题这个概念呢?

        我们知道,0.9版本之前 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。但是,ZooKeeper 其实并不适用于这种高频的写操作。

        0.9及之后版本 Consumer 的位移管理机制发生了变化,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka 的主题设计天然就满足这两个条件,因此,使用 Kafka 主题来保存位移这件事情,实际上就是一个水到渠成的想法了。

        虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。那么位移主题的消息格式是什么样子的呢?所谓的消息格式,可以简单地理解为是一个KV对。Key和Value分别表示消息的键值和消息体。

首先从 Key 说起。一个 Kafka 集群中的 Consumer 数量会有很多,既然这个主题保存的是 Consumer 的位移数据,那么消息格式中必须要有字段来标识这个位移数据是哪个 Consumer 的。这种数据放在哪个字段比较合适呢?显然放在 Key 中比较合适。现在我们知道该主题消息的 Key 中应该保存标识 Consumer 的字段,那么,当前 Kafka 中什么字段能够标识 Consumer 呢?还记得之前我们说 Consumer Group 时提到的 Group ID 吗?没错,就是这个字段,它能够标识唯一的 Consumer Group。我们现在知道 Key 中保存了 Group ID,但是只保存 Group ID 就可以了吗?别忘了,Consumer 提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key 中还应该保存 Consumer 要提交位移的分区。总结一下,位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号>

接下来看看消息体的设计。也许你会觉得消息体应该很简单,保存一个位移值就可以了。实际上,社区的方案要复杂得多,比如消息体还保存了位移提交的一些其他元数据,诸如时间戳和用户自定义的数据等。保存这些元数据是为了帮助 Kafka 执行各种各样后续的操作,比如删除过期位移消息等。但总体来说,我们还是可以简单地认为消息体就是保存了位移值。当然了,位移主题的消息格式可不是只有这一种。事实上,它有 3 种消息格式。除了刚刚我们说的这种格式,还有 2 种格式:

  • 用于保存 Consumer Group 信息的消息。
  • 用于删除 Group 过期位移甚至是删除 Group 的消息。

第 1 种格式非常神秘,以至于你几乎无法在搜索引擎中搜到它的身影。不过,你只需要记住它是用来注册 Consumer Group 的就可以了。第 2 种格式相对更加有名一些。它有个专属的名字:tombstone 消息,即墓碑消息,也称 delete mark。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null,即空消息体。那么,何时会写入这类消息呢?一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

好了,消息格式就说这么多,下面我们来说说位移主题是怎么被创建的。通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。什么地方会用到位移主题呢?我们前面一直在说 Kafka Consumer 提交位移时会写入该主题,那 Consumer 是怎么提交位移的呢?目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

我们来举个极端一点的例子。假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 10,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 10。由于是自动提交位移,位移主题中会不停地写入位移 =10 的消息。显然 Kafka 只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求 Kafka 必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。

Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。我在这里贴一张来自官网的图片,来说明 Compact 过程。

7.6、位移提交

        消费者提交偏移量是为了保存分区的消费进度。Kafka保证同一个分区只会分配给消费者组中的唯一消费者,即使发生再平衡后,分区和消费者的所有权关系发生变化,新消费者也可以接着上一个消费者记录的偏移量位置继续消费消息。

        每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。注意,这和上面所说的位移完全不是一个概念。上面的“位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器。Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
 

1、从用户是否感知的角度来说,位移提交分为自动提交手动提交

2、从 Consumer 端的角度来说,位移提交分为同步提交异步提交

7.6.1、自动提交与手动提交

        所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事。自动提交有两个相关参数:

(1) enable.auto.commit: 是否开启自动提交offset的功能,默认是true。

(2) auto.commit.interval.ms: 自动提交offset的时间间隔,默认是5s。  

        而手动提交,则是指你要自己提交位移 Kafka Consumer 压根不管。虽然自动提交offset简单便捷,但是这种基于时间的提交方式开发人员难以把握offset提交的时机(用户希望消费一批数据提交一次offset)。

        将 enable.auto.commit 设为 false即为手动提交。但是仅仅设置它为 false 还不够,因为你只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。最简单的 API 就是 consumer.commitSync()。该方法会提交 consumer.poll() 返回的最新位移。它是一个同步操作,该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。对于自动提交位移,一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。反观手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。

7.6.2、同步提交与异步提交

        鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。我们不希望程序总处于阻塞状态,影响 TPS。

7.6.3、auto.offset.reset

       上面说的都是offset存在的场景(对于同一个消费者组若已有提交的offset,则从提交的offset开始接着消费)。当kafka中没有初始偏移量(消费组第一次消费)或服务器上不在存在当前偏移量时(例如该数据已被删除),此时行为应该怎样? 

#此时行为由如下参数控制:

auto.offset.reset = earliest | latest | none    #默认是latest
(1)earliest: 自动将偏移量重置为最早的偏移量, --from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常。

参见 官网

7.7、Kafka Java Consumer设计原理

从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。鉴于 KafkaConsumer 不是线程安全的事实,有两套多线程方案。

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。

2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。

方案 2 将任务切分成了消息获取和消息处理两个部分,分别由不同的线程处理它们。比起方案 1,方案 2 的最大优势就在于它的高伸缩性,就是说我们可以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影响。如果你的消费获取速度慢,那么增加消费获取的线程数即可;如果是消息的处理速度慢,那么增加 Worker 线程池线程数即可。但是实现难度比较大,而且该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消息的线程,因此无法保证分区内的消费顺序。

两种方案的优缺点如下:

9、常见问题

问题一:kakfa为什么这么快?

总共有如下这些点:

  • 本身就是分布式集群,采用了分区技术并行度高;
  • 采用稀疏索引(有索引),方便快速定位查询;
  • 批量处理
  • 顺序写盘
  • 页缓存
  • 零拷贝
  • 日志格式
  • 一致性

问题二:kafka生产性能调优?

参见 4.4 

问题三:kafka消费性能调优?

(1)增加分区,也要同步增加消费者数量。
(2)还可以考虑提高每批次拉取的数据量。每批次拉取的数据过少(拉取数据/处理时间<生产速度),也会造成数据积压。
例如从一次最多拉取500条,调整为一次最多拉取1000条。当然,要有个调优的过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1426434.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

yolov8数据标注、模型训练到模型部署全过程

文章目录 一、数据标注&#xff08;x-anylabeling&#xff09;1. 安装方式1.1 直接通过Releases安装1.2 clone源码后采用终端运行 2. 如何使用 二、模型训练三、模型部署3.1 onnx转engine3.2 c调用engine模型3.2.1 main_tensorRT.cpp3.2.2 segmentationModel.cpp 一、数据标注&…

谷粒商城【成神路】-【3】——三级分类

目录 &#x1f37f;1.查询三级分类 &#x1f9c2;2.前端页面搭建 &#x1f35f;3.添加网关 &#x1f373;4.解决跨域 &#x1f9c7;5.显示分类 &#x1f95e;6.显示复选框 1.查询三级分类 1.controller 直接调用service层的接口 RequestMapping("/list/tree&qu…

02.PostgreSQL运算符

1. 算术运算符 算术运算符 描述 示例 + 加法运算符 SELECT A+B - 减法运算符 SELECT A-B * 乘法运算符 SELECT A*B / 除法运算符 SELECT A/B % 取余运算符 SELECT A%B 1.1 加法与减法操作符 SELECT 100,100+11,100-11,100+23.0,100-23.0 运算结果 由此得出结论: 一个整数加上…

C语言·贪吃蛇游戏(下)

上节我们将要完成贪吃蛇游戏所需的前置知识都学完了&#xff0c;那么这节我们就开始动手写代码了 1. 程序规划 首先我们应该规划好我们的代码文件&#xff0c;设置3个文件&#xff1a;snack.h 用来声明游戏中实现各种功能的函数&#xff0c;snack.c 用来实现函数&#xff0c;t…

javaScript的序列化与反序列化

render函数的基本实现 javaScript的序列化与反序列化 一&#xff0c;js中的序列化二&#xff0c;序列化三&#xff0c;反序列化四&#xff0c;总结 一&#xff0c;js中的序列化 js中序列化就是对象转换成json格式的字符串&#xff0c;使用JSON对象的stringify方法&#xff0c;…

R-YOLO

Abstract 提出了一个框架&#xff0c;名为R-YOLO&#xff0c;不需要在恶劣天气下进行注释。考虑到正常天气图像和不利天气图像之间的分布差距&#xff0c;我们的框架由图像翻译网络&#xff08;QTNet&#xff09;和特征校准网络&#xff08;FCNet&#xff09;组成&#xff0c;…

vue3-深入组件-依赖注入

Prop 逐级透传问题 通常情况下&#xff0c;当我们需要从父组件向子组件传递数据时&#xff0c;会使用 props。 如果是多层级嵌套的组件&#xff0c;如何从一级传递到 3 级甚至更远呢。 若使用 props 则必须将其沿着组件链逐级传递下去&#xff0c;这会非常麻烦&#xff0c;所…

canvas路径剪裁clip(图文示例)

查看专栏目录 canvas实例应用100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

jupyter notebook显示的扩展很少,只有四五个--解决方案

如下&#xff1a;安装好只有四五个扩展 可以先删除 conda remove jupyter_nbextensions_configurator 然后使用pip安装 pip install jupyter_contrib_nbextensions jupyter contrib nbextensions install --user pip install jupyter_nbextensions_configurator jupyter nbex…

MySQL-运维-主从复制

一、概述 二、原理 三、搭建 1、服务器准备 2、主库配置 &#xff08;1&#xff09;、修改配置文件/etc/my.cnf &#xff08;2&#xff09;、重启MySQL服务器 &#xff08;3&#xff09;、登录mysql&#xff0c;创建远程链接的账号&#xff0c;并授予主从复制权限 &#xff0…

Kafka-服务端-GroupCoordinator

在每一个Broker上都会实例化一个GroupCoordinator对象&#xff0c;Kafka按照Consumer Group的名称将其分配给对应的GroupCoordinator进行管理&#xff1b; 每个GroupCoordinator只负责管理Consumer Group的一个子集&#xff0c;而非集群中全部的Consumer Group。 请注意与Kaf…

华媒舍:10个与汽车媒体国外传播有关的方向

随着近年来汽车销售市场的不断增加与发展&#xff0c;汽车媒体国外传播已经成为汽车行业里至关重要的一环。下面我们就详细介绍10个与汽车媒体国外传播有关的发展方向&#xff0c;并讨论这些趋势对全世界汽车行业的影响。 1.智能化媒体的兴起伴随着互联网的发展与发展&#xff…

2024年第4届IEEE软件工程与人工智能国际会议(SEAI 2024)

2024年第4届IEEE软件工程与人工智能国际会议(SEAI 2024)将于2024年6月21-23日在中国厦门举办。 SEAI旨在为软件工程与人工智能领域搭建高端前沿的交流平台&#xff0c;推动产业发展。本次会议将汇聚海内外的知名专家、学者和产业界优秀人才&#xff0c;共同围绕国际热点话题、核…

地理坐标系、空间坐标系、epsg查询网站

坐标系可用范围和详细信息的查询网站 简介 epsg.ruiduobao.com是一个可以查询gdal中所有坐标系信息的网站&#xff0c;可查询到坐标系的基准面、椭球体、中央子午线等相关信息&#xff0c;并对每个坐标系的可用范围在地图中进行了显示。详细信息可以看操作视频&#xff1a; e…

bank conflict

前置知识&#xff1a; shared memory 被分成 32 个 bank一个 warp 32 个线程每个 bank 4 byte如果同一 warp 中不同线程访问同一 bank 的不同地址则发生 bank conflict 请注意需要是一个 warp 中的不同线程&#xff01;如果一个线程访问 shared memory 的两个元素&#xff0c;…

【ArcGIS Pro】从0开始

1.导入excel&#xff0c;需要安装驱动程序 安装用于 Microsoft Excel 文件的驱动程序 https://pro.arcgis.com/zh-cn/pro-app/latest/help/data/excel/prepare-to-work-with-excel-in-arcgis-pro.htm 2.修改投影坐标系 点到地图图标上&#xff0c;右键才能设置坐标系。 3.…

MSVC++远程调试

1. 介绍 MSVC的调试功能非常强大&#xff0c;可以下断点&#xff0c;单步调试&#xff0c;查看堆栈变量信息等。实际用于生产的电脑环境复杂&#xff0c;更容易发生Bug。生产电脑&#xff0c;由于各种原因有些可能无法安装MSVC用来现场调试。基于打印日志&#xff0c;查看日志…

Elasticsearch:将文档级安全性 (DLS) 添加到你的内部知识搜索

作者&#xff1a;来自 Elastic Sean Story 你的企业很可能淹没在内部数据中。 你拥有问题跟踪、笔记记录、会议记录、维基页面、视频录制、聊天以及即时消息和私信。 并且不要忘记电子邮件&#xff01; 难怪如此多的企业都在尝试创造工作场所搜索体验 - 为员工提供集中、一站…

如何部署Docker Registry并实现无公网ip远程连接本地镜像仓库

文章目录 1. 部署Docker Registry2. 本地测试推送镜像3. Linux 安装cpolar4. 配置Docker Registry公网访问地址5. 公网远程推送Docker Registry6. 固定Docker Registry公网地址 Docker Registry 本地镜像仓库,简单几步结合cpolar内网穿透工具实现远程pull or push (拉取和推送)…

java程序读取并控制串口设备

监听串口&#xff0c;接收它们发过来的数据&#xff0c;进行处理。 一、概况 前不久做的一个项目&#xff0c;需要读取水下传感器的数据。这些传感器通过串口与外界交互。我们写了一个java程序&#xff0c;接收传感器传送的数据&#xff0c;同时也下发命令&#xff0c;控制部…