中间件(三)- Kafka(一)

news2024/12/23 5:37:11

Kafka

    • 1. Kafka简介
      • 1.1 名字由来
      • 1.2 主要特性
      • 1.3 相关术语
      • 1.4 架构图
      • 1.5 消息队列
      • 1.6 Kafka消费模式
        • 1. 一对一消费模式
        • 2. 一对多消费模式
      • 1.7 消息中间件
    • 2. Kafka安装及使用
      • 2.1 下载kafka
      • 2.2 修改配置文件
      • 2.3 启动
      • 2.4 docker启动
    • 3. 简单指令
      • 3.1 topic相关
      • 3.2 Kafka 生产/消费
    • 4. kafka进阶
      • 4.1 工作流程
      • 4.2 文件存储
      • 4.3 生产者分区策略
      • 4.4 生产者ISR< In-Sync Replicas (同步副本集) >
      • 4.5 生产者ack机制
      • 4.6 数据一致性问题
      • 4.7 Exactly Once
    • 5. 消费者分区分配策略
      • 5.1 分区分配策略
      • 5.2 消费者offset的存储
      • 5.3 消费者组案例

在这里插入图片描述

1. Kafka简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

1.1 名字由来

kafka的架构师jay kreps对于kafka的名称由来是这样讲的,由于jay kreps非常喜欢franz kafka,并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。

kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发;

1.2 主要特性

Kafka 是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

1.3 相关术语

  • Broker: 经纪人,Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic:主题,每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition: Partition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer: 负责发布消息到Kafka broker
  • Consumer: 消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group: 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。某一个分区中的消息只能够一个消费者组中的一个消费者所消费
  • Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
  • Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。

一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader保持同步,消费的话也是在Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。

1.4 架构图

在这里插入图片描述
在这里插入图片描述

1.5 消息队列

Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了

  • 解耦合
    耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能。
    在这里插入图片描述

  • 异步处理
    异步处理替代了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可。
    在这里插入图片描述

  • 流量削峰
    高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力。

1.6 Kafka消费模式

Kafka的消费模式主要有两种:

  • 一对一的消费,也即点对点的通信,即一个发送一个接收。
  • 一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。

1. 一对一消费模式

消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
在这里插入图片描述

2. 一对多消费模式

这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
在这里插入图片描述

1.7 消息中间件

消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka
开发语言javaerlangjavascala
单机吞吐量万级万级10万级100万级
时效性msusmsms级以内
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
功能特性成熟的产品、较全的文档、各种协议支持好并发能力强、性能好、延迟低MQ功能比较完善,扩展性佳只支持主要的MQ功能,主要应用于大数据领域

选择建议

消息中间件建议
Kafka追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验
RabbitMQ性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

2. Kafka安装及使用

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

Zookeeper安装可参考Zookeeper

开始的时候,笔者搞错了,以为需要提前安装zookeeper,实际上kafka自带了zookeeper的程序包,无需额外安装。

2.1 下载kafka

  1. 手动下载
    官网:https://kafka.apache.org/downloads
  2. 服务器下载
    [root@node-251 bin]# mkdir -p /opt/monitor/kafka
    [root@node-251 bin]# cd /opt/monitor/kafka
    [root@node-251 kafka]# wget http://mirrors.hust.edu.cn/apache/kafka/3.4.0/kafka_2.12-3.4.0.tgz
    
    当前最新版本为3.4.0
  3. 解压
    [root@node-251 monitor]# tar -zxvf kafka_2.12-3.4.0.tgz
    [root@node-251 monitor]# mv kafka_2.12-3.4.0 kafka
    

2.2 修改配置文件

修改server.properties

[root@node-251 monitor]# cd kafka/config/
[root@node-251 config]# grep -v ^# server.properties
...
broker.id=0

listeners=PLAINTEXT://192.168.71.251:9092
...
log.dirs=/opt/monitor/kafka/kafka-logs
...
zookeeper.connect=192.168.71.251:2181
...
[root@node-251 config]# mkdir -p /opt/monitor/kafka/kafka-logs

修改zookeeper.properties

[root@node-251 config]# grep -v ^# zookeeper.properties
dataDir=/tmp/zookeeper					#配置zookeeper的时候使用默认路径,笔者未修改
clientPort=2181
maxClientCnxns=100
tickTimes=2000
initLimit=10
syncLimit=5
admin.enableServer=false

2.3 启动

  1. 启动zookeeper
    使用指定的kafka下的config/zookeeper.properties配置文件启动
	bin/zookeeper-server-start.sh config/zookeeper.properties
​	#或
	bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties          #建议使用这种方式,不需要启动多个窗口
[root@node-251 kafka]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[root@node-251 kafka]# ps -ef |grep zookeeper
root      73322      1  0 May19 pts/0    00:02:58 java -Dzookeeper.log.dir=/usr/share/zookeeper/bin/../logs -Dzookeeper.log.file=zookeeper-root-server-node-251.log -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p ...
[root@node-251 kafka]# ss -nltp|grep 2181
LISTEN     0      50        [::]:2181                  [::]:*                   users:(("java",pid=73322,fd=61))
  1. 启动Kafka
bin/kafka-server-start.sh config/server.properties
​#或
bin/kafka-server-start.sh -daemon  config/server.properties                 #建议使用这种方式,不需要启动多个窗口
[root@node-251 kafka]# ss -nlpt|grep 9092
LISTEN     0      50        [::]:9092                  [::]:*                   users:(("java",pid=81895,fd=135))

脚本启动

#!/bin/bash
/opt/monitor/kafka/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh /opt/monitor/kafka/kafka_2.12-2.7.0/config/zookeeper.properties &

/opt/monitor/kafka/kafka_2.12-2.7.0/bin/kafka-server-start.sh /opt/monitor/kafka/kafka_2.12-2.7.0/config/server.properties &

2.4 docker启动

如果使用docker启动,整体就比较简单了

# docker直接拉取kafka和zookeeper的镜像
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper 
# 首先需要启动zookeeper,如果不先启动,启动kafka没有地方注册消息
docker run -it --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest
# 启动kafka容器,注意需要启动三台,注意端口的映射,都是映射到9092
# 第一台
docker run -it --name kafka01 -p 19092:9092 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.129:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第二台
docker run -it --name kafka02 -p 19093:9092 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.129:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第三台
docker run -it --name kafka03 -p 19094:9092 -d -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.129:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

上面端口的映射注意都是映射到Kafka的9092端口上!否则将不能够连接!

3. 简单指令

3.1 topic相关

  • 创建topic

这个是0.9之后的打开方式

[root@node-251 kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.71.251:9092 --replication-factor 1 --partitions 1 --topic demo
Created topic demo.

这个是0.9之前的打开方式

[root@node-251 kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.71.251:9092 --topic demo

创建了一个名为 demo 的主题,其中包含一个分区和一个副本因子。

[root@node-251 kafka]# ll /opt/monitor/kafka/kafka-logs
total 16
-rw-r--r-- 1 root root   0 May 23 21:44 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 167 May 23 23:06 demo-0
...
[root@node-251 kafka]# ll /opt/monitor/kafka/kafka-logs/demo-0/
total 8
-rw-r--r-- 1 root root 10485760 May 23 23:06 00000000000000000000.index
-rw-r--r-- 1 root root        0 May 23 23:06 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 23 23:06 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 May 23 23:06 leader-epoch-checkpoint
-rw-r--r-- 1 root root       43 May 23 23:06 partition.metadata

  • 查询topic列表
[root@node-251 kafka]# bin/kafka-topics.sh --list --bootstrap-server 192.168.71.251:9092
demo
[root@node-251 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.71.251:9092 --describe --topic demo
Topic: demo     TopicId: 7rCFf5XORAyAkCKaw_zCBA PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
  • 删除topic
[root@node-251 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.71.251:9092 --delete --topic demo
[root@node-251 kafka]# bin/kafka-topics.sh --list --bootstrap-server 192.168.71.251:9092

删除的时候只是被标记为删除marked for deletion并没有真正的删除,如果需要真正的删除,需要再config/server.properties中设置delete.topic.enable=true

  • 修改分区数
[root@node-251 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.71.251:9092 --alter --topic demo1 --partitions 2
[root@node-251 kafka]# bin/kafka-topics.sh --bootstrap-server 192.168.71.251:9092 --describe --topic demo1
Topic: demo1    TopicId: N4m4OP-4Qj-gfWlPKdHM1g PartitionCount: 2       ReplicationFactor: 1    Configs:
        Topic: demo1    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: demo1    Partition: 1    Leader: 0       Replicas: 0     Isr: 0

3.2 Kafka 生产/消费

  • 启动生产者

生产者命令行客户端需要两个主要参数:
① 代理列表 - 我们要发送邮件的代理列表。 在这种情况下,我们只有一个代理。 Config/server.properties文件包含代理端口ID,因为我们知道我们的代理正在侦听端口9092,因此您可以直接指定它。
② 主题名称:demo

[root@node-251 kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.71.251:9092 --replication-factor 1 --partitions 1 --topic demo1
Created topic demo1.
[root@node-251 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.71.251:9092 --topic demo1
>halo
>my
>girl
>exit
>tui
>^C
  • 启动消费者
    为了方便测试,另启一个窗口 这样效果更明显。需要注意的是旧版本和新版本的命令是不一样的
bin/kafka-console-consumer.sh --bootstrap-server 192.168.71.251:9092 --topic demo1 --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic 名称 --from-beginning
[root@node-251 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.71.251:9092 --topic demo1 --from-beginning
halo
my
girl
exit
tui

开启两个终端可以看到:一个发送消息,一个接受消息。

  • 查看kafka生产最大位置偏移量
[root@node-251 kafka]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.71.251:9092 --topic demo1 --time -1
demo1:0:5
  • 消费消息查看
[root@node-251 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.71.251:9092 --topic demo1 --from-beginning
halo
my
girl
exit
tui

4. kafka进阶

4.1 工作流程

Kafka中消息是以topic进行分类的,Producer生产消息,Consumer消费消息,都是面向topic的。
在这里插入图片描述
Topic是逻辑上的改变,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件中存储的就是producer生产的数据topic=N*partition;partition=log

Producer生产的数据会被不断的追加到该log文件的末端,且每条数据都有自己的offset,consumer组中的每个consumer,都会实时记录自己消费到了哪个offset,以便出错恢复的时候,可以从上次的位置继续消费。流程:
Producer => Topic(Log with offset)=> Consumer

4.2 文件存储

Kafka文件存储也是通过本地落盘的方式存储的,主要是通过相应的log与index等文件保存具体的消息文件。
在这里插入图片描述
生产者不断的向log文件追加消息文件,为了防止log文件过大导致定位效率低下,Kafka的log文件以1G为一个分界点,当.log文件大小超过1G的时候,此时会创建一个新的.log文件,同时为了快速定位大文件中消息位置,Kafka采取了分片和索引的机制来加速定位。

在kafka的存储log的地方,即文件的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括.index.log文件组成
在这里插入图片描述
分区目的是为了备份,所以同一个分区存储在不同的broker上,即当third-2存在当前机器kafka01上,实际上在kafka03中也有这个分区的文件(副本),分区中包含副本,即一个分区可以设置多个副本,副本中有一个是leader,其余为follower。
在这里插入图片描述
如果.log文件超出大小,则会产生新的.log文件。如下所示

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

此时如何快速定位数据,步骤:

.index文件存储的消息的offset+真实的起始偏移量。.log中存放的是真实的数据。

  • 首先通过二分查找.index文件到查找到当前消息具体的偏移,如上图所示,查找segement为2,发现第二个文件index起始为6,则定位到一个文件中。
  • 然后通过第一个.index文件通过seek定位元素的位置3,定位到之后获取起始偏移量+当前文件大小=总的偏移量
  • 获取到总的偏移量之后,直接定位到.log文件即可快速获得当前消息大小。

4.3 生产者分区策略

分区的原因

  • 方便在集群中扩展:每个partition通过调整以适应它所在的机器,而一个Topic又可以有多个partition组成,因此整个集群可以适应适合的数据
  • 可以提高并发:以Partition为单位进行读写。类似于多路。

分区的原则

  • 指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值
  • 没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值
  • 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。

4.4 生产者ISR< In-Sync Replicas (同步副本集) >

为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement,如果producer收到ack就会进行下一轮的发送,否则重新发送数据。
在这里插入图片描述
发送ack的时机
确保有follower与leader同步完成,leader在发送ack,这样可以保证在leader挂掉之后,follower中可以选出新的leader(主要是确保follower中数据不丢失)
follower同步完成多少才发送ack

  • 半数以上的follower同步完成,即可发送ack
  • 全部的follower同步完成,才可以发送ack

副本数据同步策略

  1. 半数follower同步完成即发送ack

    优点是延迟低
    缺点是选举新的leader的时候,容忍n台节点的故障,需要2n+1个副本(因为需要半数同意,所以故障的时候,能够选举的前提是剩下的副本超过半数),容错率为1/2

  2. 全部follower同步完成完成发送ack

    优点是容错率高,选举新的leader的时候,容忍n台节点的故障只需要n+1个副本即可,因为只需要剩下的一个人同意即可发送ack了
    缺点是延迟高,因为需要全部副本同步完成才可

kafka选择的是第二种,因为在容错率上面更加有优势,同时对于分区的数据而言,每个分区都有大量的数据,第一种方案会造成大量数据的冗余。虽然第二种网络延迟较高,但是网络延迟对于Kafka的影响较小。

ISR(同步副本集)

采用了第二种方案进行同步ack之后,如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,那么leader就要一直等待下去,直到它同步完成,才可以发送ack,此时需要如何解决这个问题呢?

解决方法:

leader中维护了一个动态的ISR(in-sync replica set),即与leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该之间阈值由replica.lag.time.max.ms参数设定。当leader发生故障之后,会从ISR中选举出新的leader。

4.5 生产者ack机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等到ISR中所有的follower全部接受成功。

Kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡选择不同的配置。

ack参数配置

  • 0:producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据

  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)
    在这里插入图片描述

  • 1(all):producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack,但是如果在follower同步完成后,broker发送ack之前,如果leader发生故障,会造成数据重复。(这里的数据重复是因为没有收到,所以继续重发导致的数据重复)
    在这里插入图片描述

producer返ack:0无落盘直接返,1只leader落盘然后返,-1全部落盘然后返

4.6 数据一致性问题

在这里插入图片描述

  • LEO(Log End Offset):每个副本最后的一个offset
  • HW(High Watermark):高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。

follower故障和leader故障

  • follower故障:follower发生故障后会被临时踢出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
  • leader故障:leader发生故障之后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据的一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。

这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

4.7 Exactly Once

将服务器的ACK级别设置为-1(all),可以保证producer到Server之间不会丢失数据,即At Least Once(至少一次语义)。
将服务器的ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once(至多一次)。

At Least Once可以保证数据不丢失,但是不能保证数据不重复,而At Most Once可以保证数据不重复,但是不能保证数据不丢失,对于重要的数据,则要求数据不重复也不丢失,即Exactly Once(精确的一次)。

  • 在0.11版本的Kafka之前,只能保证数据不丢失,在下游对数据的重复进行去重操作,多个下游应用的情况,则分别进行全局去重,对性能有很大影响。
  • 0.11版本的kafka,引入了一项重大特性:幂等性,幂等性指代Producer不论向Server发送了多少次重复数据,Server端都只会持久化一条数据。幂等性结合At Least Once语义就构成了Kafka的Exactly Once语义。

启用幂等性,即在Producer的参数中设置enable.idempotence=true即可,Kafka的幂等性实现实际是将之前的去重操作放在了数据上游来做,开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number,而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息的时候,Broker只会持久化一条。

但PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

producer参数:
在这里插入图片描述

5. 消费者分区分配策略

消费方式

  • consumer采用pull拉的方式来从broker中读取数据。
  • push推的模式很难适应消费速率不同的消费者,因为消息发送率是由broker决定的,它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull方式则可以让consumer根据自己的消费处理能力以适当的速度消费消息。

pull模式不足在于如果Kafka中没有数据,消费者可能会陷入循环之中 (因为消费者类似监听状态获取数据消费的),一直返回空数据,针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,时长为timeout。

5.1 分区分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由那个consumer消费的问题。

Kafka的两种分配策略

  • round-robin(轮询)
  • range
  • Round-Robin

    主要采用轮询的方式分配所有的分区,该策略主要实现的步骤:

    假设存在三个topic:t0/t1/t2,分别拥有1/2/3个分区,共有6个分区,分别为t0-0/t1-0/t1-1/t2-0/t2-1/t2-2,这里假设我们有三个Consumer,C0、C1、C2,订阅情况为C0:t0,C1:t0/t1,C2:t0/t1/t2。

    此时round-robin采取的分配方式,则是按照分区的字典对分区和消费者进行排序,然后对分区进行循环遍历,遇到自己订阅的则消费,否则向下轮询下一个消费者。即按照分区轮询消费者,继而消息被消费。
    在这里插入图片描述
    分区在循环遍历消费者,自己被当前消费者订阅,则消息与消费者共同向下(消息被消费),否则消费者向下消息继续遍历(消息没有被消费)。轮询的方式会导致每个Consumer所承载的分区数量不一致,从而导致各个Consumer压力不均。上面的C2因为订阅的比较多,导致承受的压力也相对较大。

  • Range

    Range的重分配策略,首先计算各个Consumer将会承载的分区数量,然后将指定数量的分区分配给该Consumer。假设存在两个Consumer,C0和C1,两个Topic,t0和t1,这两个Topic分别都有三个分区,那么总共的分区有6个,t0-0,t0-1,t0-2,t1-0,t1-1,t1-2。分配方式如下:

    • range按照topic一次进行分配,即消费者遍历topic,t0,含有三个分区,同时有两个订阅了该topic的消费者,将这些分区和消费者按照字典序排列。
    • 按照平均分配的方式计算每个Consumer会得到多少个分区,如果没有除尽,多出来的分区则按照字典序挨个分配给消费者。按照此方式以此分配每一个topic给订阅的消费者,最后完成topic分区的分配。
      在这里插入图片描述
      按照range的方式进行分配,本质上是以此遍历每个topic,然后将这些topic按照其订阅的consumer数进行平均分配,多出来的则按照consumer的字典序挨个分配,这种方式会导致在前面的consumer得到更多的分区,导致各个consumer的压力不均衡。

5.2 消费者offset的存储

由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复以后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了那个offset,以便故障恢复后继续消费。
在这里插入图片描述

Kafka0.9版本之前,consumer默认将offset保存在zookeeper中
从0.9版本之后,consumer默认将offset保存在kafka一个内置的topic中,该topic为__consumer_offsets

# 利用__consumer_offsets读取数据
[root@node-251 kafka]# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 192.168.71.251:9092  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
[test-consumer-group,__consumer_offsets,22]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,30]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,8]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,21]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,4]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,27]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
[test-consumer-group,__consumer_offsets,7]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1684937341773, expireTimestamp=None)
...

5.3 消费者组案例

测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。

# 首先需要修改config/consumer.properties文件,可以修改为一个临时文件
group.id=xxxx
# 启动消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.233.129:19093 --topic test --consumer.config ../config/consumer.properties
# 启动生产者
./kafka-console-producer.sh --broker-list 192.168.233.129:19092 --topic test
# 发送消息

可以发现选定了一个组的,一条消息只会被一个组中的一个消费者所消费,只有ctrl+c退出了其中的一个消费者,另一个消费者才有机会进行消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/563935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

`JOB`的正确打开方式

文章目录 JOB的正确打开方式 简介工作原理使用场景使用方式注意事项启动JOB失败的情况JOB正确打开方式错误方式正确方式进阶方式终极方式 总结 JOB的正确打开方式 最近有一些小伙伴在使用JOB时&#xff0c;由于使用不当&#xff0c;引起一些问题。例如把license占满&#xff0c…

ASEMI代理长电可控硅MCR100-8特征,MCR100-8应用

编辑-Z 长电可控硅MCR100-8参数&#xff1a; 型号&#xff1a;MCR100-8 VDRM/VRRM&#xff1a;600V IT(RMS)&#xff1a;0.8A 结点温度Tj&#xff1a;-40~125℃ 储存温度Tstg&#xff1a;-55 ~ 150℃ 通态电压VTM&#xff1a;1.7V 栅极触发电压VGT&#xff1a;0.8V 正…

泰克MDO4104C(Tektronix) mdo4104c混合域示波器

泰克 MDO4104C混合域示波器&#xff0c;1 GHz&#xff0c;4 通道&#xff0c;2.5 - 5 GS/s&#xff0c;20 M 点 ​泰克 MDO4104C 示波器是一款 6 合 1 集成示波器&#xff0c;可以配置可选的频谱分析仪、任意波形/函数发生器、逻辑分析仪、协议分析仪和 DVM/频率计数器。当配置…

黑盒测试能发现以下几类错误

黑盒测试能发现以下几类错误 黑盒测试是指在不考虑被测试软件的内部结构和工作原理的情况下&#xff0c;通过输入输出的方式对被测试软件进行测试。它主要关注被测试软件的功能是否达到预期的要求。黑盒测试能够发现以下几类错误。 1. 输入错误&#xff1a;黑盒测试可以检查被测…

如何增加网站权重?有效提高网站权重的技巧方法

权重对于网站优化来说非常的重要&#xff0c;那什么是网站权重呢&#xff1f;网站权重是指搜索引擎给网站&#xff08;包括网页&#xff09;赋予一定的权威值&#xff0c;对网站&#xff08;含网页&#xff09;权威的评估评价。一个网站权重越高&#xff0c;在搜索引擎所占的份…

【C++】虚表和虚基表到底有哪些区别?

虚表和虚基表 虚表虚基表虚拟继承和虚函数都存在时的对象模型 虚表 我们知道&#xff0c;如果类中声明了的方法是用virtual进行修饰的&#xff0c;则说明当前这个方法要作为虚函数&#xff0c;而虚函数的存储和普通函数的存储是有区别的 当有虚函数声明时&#xff0c;编译器会…

运营-16.个性化推荐

个性化推荐 个性化推荐&#xff0c;是根据用户的行为来分析用户的喜好&#xff0c;进而做商品精准推荐。 为什么要做个性化推荐&#xff1f; 1. 收集用户信息&#xff0c;精准获取用户需求&#xff1b; 2. 减少用户搜索商品的页面层级&#xff0c;提高转化率&#xff1b; …

聊聊 Milvus GC:从一次数据丢失事件展开

QueryNode 日志中频繁报错&#xff1f;对象存储数据离奇消失[1]&#xff1f; 令人震惊的数据丢失事件就这样发生了&#xff0c;一位来自 BOSS 直聘的 AI 研发工程师无意卷入到此次的风波中&#xff0c;他和 Milvus 社区的伙伴经过层层排查、抽丝剥茧&#xff0c;成功找出了问题…

还在用 JS 做节流吗?CSS 也可以防止按钮重复点击

目录 一、CSS 实现思路分析 二、CSS 动画的精准控制 三、CSS 实现的其他思路 四、总结一下 众所周知&#xff0c;函数节流&#xff08;throttle&#xff09;是 JS 中一个非常常见的优化手段&#xff0c;可以有效的避免函数过于频繁的执行。 举个例子&#xff1a;一个保存按…

opencv_c++学习(二十)

一、形态学应用案例 开、闭运算、形态学梯度等原理&#xff1a; 相关函数: morphologyEx(InputArray src, OutputArray dst, int op, lnputArray kernel, Point anchor Point(-1,-1), int iterations 1, int borderType BORDER_CONSTANT, const Scalar & border…

Android中静态和动态文字的绘制和测量

Android中静态和动态文字的绘制和测量 Android中自定义视图的时候存在两种情况&#xff0c;静态文字和动态文字。 顾名思义&#xff0c;静态文字就是显示内容是固定的&#xff0c;不会产生变化的文字&#xff0c;而动态文字则是内容会不断产生变化的文字信息。 在说明为什么…

Revit技巧 | Revit中图元不可见怎么办?

在revit中&#xff0c;控制图元课件性的设置有很多种&#xff0c;因此图元不可见&#xff0c;也会有各种各样的原因&#xff0c;这也是经常困扰新手的问题&#xff0c;下面我把这些解决办法做一些归纳总结。 图元如果过远偏离当前视图的中心&#xff0c;将导致视图不可见这时&…

MySQL:数据库的查询与连接

目录 1.复合查询 1.1 多表查询&#xff08;联合查询&#xff09; 1.2 join on (inner join) 1.3 自连接 1.4 子查询 1.5 合并查询 2.内外连接 3.关于高内聚、低耦合 1.复合查询 1.1 多表查询&#xff08;联合查询&#xff09; 什么是多表插叙&#xff1f;实际开发中往…

网络安全管理员证书有什么用?2023证书怎么考?证书报考条件?

网络安全管理员是做什么工作的呢&#xff1f;现如今&#xff0c;网络高速发展&#xff0c;带动了很多行业的兴起&#xff0c;比如说电商行业&#xff0c;今天已经步入到足不出户即可购物的时代了&#xff0c;当然网络也是一把“双刃剑”&#xff0c;带来了好处的同时&#xff0…

Sui Move Object讲解

要了解Sui的独特特性&#xff0c;首先要了解Sui中以对象为中心的数据模型。 Sui的设计初衷是重新定义数字资产所有权的可能性。重新设计的一个基本部分 — — Sui是以对象为中心的数据模型&#xff0c;也是Sui和其他Layer 1区块链之间的一个显著区别。 其他L1如何处理资产所有…

day8 - 使用不同的滤波核进行图像降噪

本期主要介绍用于图像平滑处理的滤波&#xff0c;分别是方框滤波、均值滤波、中值滤波、高斯滤波&#xff0c;比较不同滤波的效果&#xff1b;并了解自定义滤波器进行图像处理。 完成本期内容&#xff0c;你可以&#xff1a; 会使用方框滤波、均值滤波、中值滤波、高斯滤波进行…

实时聊天组合功能,你了解吗?

你有兴趣安装实时聊天组合功能吗&#xff1f;如果您选择了SaleSmartly&#xff08;ss客服&#xff09;&#xff0c;您的实时聊天插件可以不仅仅只是聊天通道&#xff0c;还可以有各种各样的功能&#xff0c;你不需要包含每一个功能&#xff0c;正所谓「宁缺勿滥」&#xff0c;功…

Windows主机中构建适用于K8S Operator开发环境

基于 win 10 打造K8S应用开发环境 一、wsl子系统安装 在cmd命令行终端或powershell中操作 1.1 确认windows操作系统版本 1.2 开启wsl功能 1.3 wsl配置 PS C:\Users\cpf> wsl提示&#xff1a;适用于 Linux 的 Windows 子系统没有已安装的分发版。可以通过访问 Microsoft St…

使用canvas给图片添加水印

上接文章“图片处理” canvas元素其实就是一个画布&#xff0c;我们可以很方便地绘制一些文字、线条、图形等&#xff0c;它也可以将一个img标签里渲染的图片画在画布上。 我们在上传文件到后端的时候&#xff0c;使用input标签读取用户本地文件后得到的其实是一个Blob对象&a…

Redis7实战加面试题-基础篇(Redis持久化,Redis事务,Redis管道,Redis发布订阅)

Redis持久化 RDB (Redis DataBase) RDB&#xff08;Redis 数据库&#xff09;&#xff1a;RDB 持久性以指定的时间间隔执行数据集的时间点快照。实现类似照片记录效果的方式&#xff0c;就是把某一时刻的数据和状态以文件的形式写到磁盘上&#xff0c;也就是快照。这样一来即使…