Kafka 实战使用、单机搭建、集群搭建、Kraft集群搭建

news2024/9/22 19:20:21

文章目录

    • 实验环境
    • 单机服务
      • 启动
      • 停止服务
      • 简单收发消息
      • 其他消费模式
      • 理解Kakfa的消息传递机制
    • 集群服务
      • 为什么要使用集群
      • 部署Zookeeper集群
      • 部署Kafka集群
      • 理解服务端的Topic、Partition和Broker
      • 总结
    • Kraft集群
    • 相关概念

实验环境

准备三台虚拟机

三台机器均预装CentOS7 操作系统。分别配置机器名 worker1,worker2,worker3。

vi /etc/hosts

192.168.75.61 worker1
192.168.75.62 worker2
192.168.75.63 worker3


firewall-cmd --state   查看防火墙状态
systemctl stop firewalld.service   关闭防火墙
systemctl disable firewalld 禁止开机自启



都需要安装jdk

[root@localhost ~]# mkdir -p /usr/local/soft/jdk

[root@localhost ~]# tar -zxvf jdk-8u201-linux-x64.tar.gz -C /usr/local/soft/jdk

[root@localhost ~]# vim /etc/profile
export JAVA_HOME=/usr/local/soft/jdk/jdk1.8.0_201
export JRE_HOME=/usr/local/soft/jdk/jdk1.8.0_201/jre
export CLASS_PATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

[root@localhost jdk1.8.0_201]# source /etc/profile
[root@localhost jdk1.8.0_201]# java -version



下载kafka3.4.0

前面的2.13是开发kafka的scala语言的版本,后面的3.4.0是kafka应用的版本。

Scala是一种运行于JVM虚拟机之上的语言。在运行时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码,就必须选择对应的Scala版本。因为Scala语言的版本并不是向后兼容的。

另外,在选择kafka版本时,建议先去kafka的官网看下发布日志,了解一下各个版本的特性。 https://kafka.apache.org/downloads。




在这里插入图片描述



下载Zookeeper 3.6.2版本。

在这里插入图片描述



kafka的安装程序中自带了Zookeeper,可以在kafka的安装包的libs目录下查看到zookeeper的客户端jar包。但是,通常情况下,为了让应用更好维护,我们会使用单独部署的Zookeeper,而不使用kafka自带的Zookeeper。

下载完成后,将这两个工具包上传到三台服务器上,解压后,分别放到/usr/local/soft/kafka和/usr/local/soft/zookeeper目录下。最好是并将部署目录下的bin目录路径配置到path环境变量中。

单机服务

启动

解压kafka

[root@localhost ~]# mkdir /usr/local/soft/kafka/

[root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/

# 最好是设置一下环境变量
[root@worker1 ~]# vi ./.bash_profile 
export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@worker1 ~]# source ./.bash_profile

[root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0

直接执行

# 启动kafka自带的zookeeper服务
# 从nohup.out中可以看到zookeeper默认会在2181端口启动。通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

# 启动kafka服务
# 也可以加上-daemon表示后台启动    bin/kafka-server-start.sh -daemon config/server.properties
# 服务会默认在9092端口启动。
nohup bin/kafka-server-start.sh config/server.properties &
[root@localhost kafka_2.13-3.4.0]# jps
2402 Kafka
2850 Jps
2044 QuorumPeerMain



停止服务

[root@worker1 ~]# kafka-server-stop.sh 
[root@worker1 ~]# zookeeper-server-stop.sh 
[root@worker1 ~]# jps
21269 Jps



简单收发消息

Kafka的基础工作机制是消息发送者可以将消息发送到kafka上指定的topic,而消息消费者,可以从指定的topic上消费消息。

在这里插入图片描述



首先,可以使用Kafka提供的客户端脚本创建Topic

#创建Topic
[root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.

#查看Topic   --describe参数
[root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --topic test --describe --bootstrap-server localhost:9092
Topic: test	TopicId: XSQWNCb3SbGbDEleLlBvtQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0



然后,启动一个消息发送者端。往一个名为test的Topic发送消息。

当命令行出现 > 符号后,随意输入一些字符。Ctrl+C 退出命令行。这样就完成了往kafka发消息的操作。

[root@localhost kafka_2.13-3.4.0]# bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1
>2
>3
>4
>

如果不提前创建Topic,那么在第一次往一个之前不存在的Topic发送消息时,消息也能正常发送,只是会抛出LEADER_NOT_AVAILABLE警告。

[oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>123
12[2021-03-05 14:00:23,347] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
3[2021-03-05 14:00:23,479] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2021-03-05 14:00:23,589] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>>123

这是因为Broker端在创建完主题后,会显示通知Clients端LEADER_NOT_AVAILABLE异常。Clients端接收到异常后,就会主动去更新元数据,获取新创建的主题信息。



然后启动一个消息消费端,从名为test的Topic上接收消息。

[root@localhost kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092

在这里插入图片描述



其他消费模式

指定消费进度

上方消费者只能接收到我新发送的5和6这两条消息,之前发送的4条消息就没有接收到。

如果想要消费之前发送的消息,可以通过添加--from-beginning参数指定。

[root@worker1 kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
1
2
3
4
5
6



如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。

这表示从第0号Partition上的第5条消息开始读起。

bin/kafka-console-consumer.sh --topic test --partition 0 --offset 4 --bootstrap-server localhost:9092
5
6

topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。

消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略



分组消费

一个partition中的消息,只会被一个消费者组中的某一个消费者消费。和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的

kafka-console-consumer.sh脚本中,可以通过--consumer-property group.id=testGroup来指定所属的消费者组

# --consumer-property可以指定多个key-value  其中group.id只是其中一个
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092



我们现在可以启动三个消费者,来验证一下分组消费机制:

# 同一个消费者组
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
# 另一个消费者组
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup2 --bootstrap-server localhost:9092

在这里插入图片描述



查看消费者组的偏移量

接下来,还可以使用kafka-consumer-groups.sh观测消费者组的情况。包括他们的消费进度。

# 添加-group 和 --describe两个参数
[root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092

Consumer group 'testGroup' has no active members. # 当前消费者组无消费者,这是因为我此时停止了所有的消费者进程

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID     HOST            CLIENT-ID
testGroup       test            0          8               8               0            -               -               -
  • GROUP表示哪一个消费者组
  • TOPIC是topic名
  • PARTITION 一个topic可以存在多个partition,则标识是第0号partition
  • CURRENT-OFFSET表示当前消费者组消费到了topic为test、第0号partition的消费消息offset偏移量
  • LOG-END-OFFSET 表示生产者往当前partition中生产的消息总数
  • LAG 表示 还未消费的数量

比如我此时使用消息生产者再发送几条消息后再查看

# 生产三条消息
[root@worker1 ~]# kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1  
>2
>3
>

# 此时 LOG-END-OFFSET就变为了11   而LAG就变为了3
[root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092

Consumer group 'testGroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID     HOST            CLIENT-ID
testGroup       test            0          8               11              3               -               -               -

虽然业务上是通过Topic来分发消息的,但是实际上,消息是保存在Partition这样一个数据结构上的。



理解Kakfa的消息传递机制

kafka的消息生产者和消息消费者是通过topic这样一个逻辑概念来进行业务沟通的,单实际上所有的消息是存在Partition这样一个数据结构中的

在这里插入图片描述



概念:

  • 客户端client:消息生产者和消息消费者都是Kafka的Client

  • 消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。

  • 服务端Broker:一个Kafka服务就是一个Broker

  • 话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题

  • 分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。



集群服务

为什么要使用集群

  • 创建多个partition,多个partition分布在不同的broker上,减少一个broker的压力
  • 为每个partition准备多个备份,保证数据不丢失。多个partition会尽可能的分布在不同的broker上。多个partition是通过zookeeper来选举出一个Leader 处理Client的读写请求,其他备份partition就是Follower角色,只做数据备份和参与重新选举Leader

集群环境下就不会再使用kafka自带的zookeeper了,会单独部署zookeeper服务。

在这里插入图片描述



部署Zookeeper集群

zookeeper是使用的zab协议,选举需要半数以上的同意,所以部署的节点数需要单数。

[root@worker1 ~]# tar -zxf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/soft/zookeeper/
[root@worker1 ~]# cd /usr/local/soft/zookeeper/apache-zookeeper-3.6.2-bin

然后进入conf目录,修改配置文件。在conf目录中,提供了一个zoo_sample.cfg文件,这是一个示例文件。我们只需要将这个文件复制一份zoo.cfg,修改下其中的关键配置就可以了。其中比较关键的修改参数如下:

[root@worker1 apache-zookeeper-3.6.2-bin]# cd conf/
[root@worker1 conf]# cp zoo_sample.cfg zoo.cfg
[root@worker1 conf]# vi zoo.cfg 

#Zookeeper的本地数据目录,默认是/tmp/zookeeper。这是Linux的临时目录,随时会被删掉。
dataDir=/app/zookeeper/data
#Zookeeper的服务端口
clientPort=2181
#集群节点配置  2888集群内部数据传输   3888集群内部进行选举
server.1=192.168.75.61:2888:3888
server.2=192.168.75.62:2888:3888
server.3=192.168.75.63:2888:3888

其中,clientPort 2181是对客户端开放的服务端口。

集群配置部分, server.x这个x就是节点在集群中的myid。后面的2888端口是集群内部数据传输使用的端口。3888是集群内部进行选举使用的端口。



启动服务

# --config 指定config目录,它会自己去读取目录下的zoo.cfg文件
[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start 
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... FAILED TO START

发现启动失败了,查看vi ./logs/zookeeper-root-server-worker1.out保存信息,提示是没有myid文件

在这里插入图片描述



因为我们上方的配置文件中dataDir=/app/zookeeper/data,那么我们就在这个目录下创建一个myid文件,并指定一个集群环境下唯一id

# 在这个文件中输入一个1   其他节点就输入2  3  保证各节点不重复即可
vi /app/zookeeper/data/myid
1

再启动服务

[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start 
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED



启动完成后,使用jps指令可以看到一个QuorumPeerMain进程就表示服务启动成功。

[root@worker1 apache-zookeeper-3.6.2-bin]# jps
8438 Jps
8395 QuorumPeerMain



三台机器都启动完成后,可以查看下集群状态。

这其中Mode 为leader就是主节点,follower就是从节点。

[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /app/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader   



部署Kafka集群

kafka服务并不需要进行选举,因此也没有奇数台服务的建议。每台服务器上都执行下面的命令

解压kafka

[root@localhost ~]# mkdir /usr/local/soft/kafka/

[root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/

# 最好是设置一下环境变量
[root@worker1 ~]# vi ./.bash_profile 
export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@worker1 ~]# source ./.bash_profile

[root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0



然后进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置。

[root@worker1 kafka_2.13-3.4.0]# mkdir -p /app/kafka/logs
[root@worker1 kafka_2.13-3.4.0]# vi config/server.properties
#broker 的全局唯一编号,不能重复,只能是数字。 所以一个集群下的kafka实例就需要使用不同的id
broker.id=1
#数据文件地址。同样默认是给的/tmp目录。
log.dirs=/app/kafka/logs
# 默认的每个Topic的分区数1,可以修改,我这里就修改为了2
num.partitions=2
#zookeeper的服务地址,因为我配置了域名映射,所以这里就使用使用的worker1 worker2 worker3
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
#可以选择指定zookeeper上的基础节点。
#zookeeper.connect=worker1:2181,worker2:2181,worker3:2181/kafka

broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下。

多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。

一些核心的配置内如下所示

PropertyDefaultDescription
broker.id0broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connectlocalhost:2181zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。
log.retention.hours168每个日志文件删除之前保存的时间。
num.partitions1创建topic的默认分区数
default.replication.factor1自动创建topic的默认副本数量
min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enablefalse是否允许删除主题



接下来就可以启动kafka服务了。启动服务时需要指定配置文件。

# -daemon表示后台启动
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@worker1 kafka_2.13-3.4.0]# jps
8819 Kafka
8884 Jps
8395 QuorumPeerMain



理解服务端的Topic、Partition和Broker

创建一个topic,指定该topic的partition数量为4,每组partition的副本集个数为2,

这里的副本是指一个Leader一个Follower,并不是指一个Leader两个Follower

# --partitions 参数表示分区数   --replication-factor 表示每个分区的副本集个数
kafka-topics.sh --create --topic disTopic --partitions 4 --replication-factor 2 --bootstrap-server worker1:9092



列出所有的topic

[root@worker1 ~]# kafka-topics.sh --list --bootstrap-server worker1:9092
disTopic



查看列表情况

# --describe查看Topic信息。
[root@worker1 ~]# kafka-topics.sh --describe --topic disTopic --bootstrap-server worker1:9092
Topic: disTopic	TopicId: rDUdZBO7RH2GNPgdRXk7Tw	PartitionCount: 4	ReplicationFactor: 2	Configs: 
	Topic: disTopic	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1
	Topic: disTopic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: disTopic	Partition: 2	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: disTopic	Partition: 3	Leader: 3	Replicas: 3,2	Isr: 3,2
	
	
# 这里有4个partition   就拿partition0来举例,其中partition0这个副本集中的Leader在BrokerId为3的机器上,其中两个副本分别在brokerId1 BrokerId3上
  • Partiton参数列出了四个partition,后面带有分区编号,用来标识这些分区。

  • Leader表示这一组partition的Leader节点是哪一个,这个后面的数字是启动kafka服务时配置文件中指定的broker.id数

    kafka的每一组partition都会选举一个Leader,Leader会处理Client的读写请求,但实际上Leader可能会让某个副本去响应Client的请求。

  • Replicas表示这个Partition的复制集是分配到哪些Broker上的。但是Replicas列出的只是一个逻辑上的分配情况,并不关心数据实际上是不是按照这个分配,甚至有些节点服务挂了,这一列也还会显示

  • Isr表示Partition的实际分配情况,它是Replicas列的一个子集,只列出当前存活,能正常同步数据Broker节点



我们在配置文件中指定的log.dirs=/app/kafka/logs 配置

在这里插入图片描述



一个Broker上的一个partition就对应着一个目录,而这个Partition上的所有消息,就保存在这个对应的目录当中。

在这里插入图片描述



在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生

而消费者消费消息的offset其实就是每个消息在partition上的偏移量



总结

在这里插入图片描述

  1. Topic是逻辑上的概念,producer和consumer通过Topic进行业务沟通
  2. Topic并不存储数据,数据是保存在Topic下的多组Partition中的,消息会尽量平均的分发在各组Partition中,每组Partition保存了Topic下的一部分消息
  3. 每组Partition包含一个Leader和多个Follower,每组Partition的个数成为备份因子replica factor
  4. producer将消息发送到Partition上,consumer通过partition上的offset记录自己所属组在当前partition上消费消息的消费进度
  5. producer将消息发送到Topic,kafka会推送给所有订阅了该topic的消费者组进行处理。但是每个消费者组内部只会有一个消费者实例处理这一条消息
  6. kafka的Broker通过zookeeper组成集群,然后在这些Broker中,需要选举产生一个担任Controller角色的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理工作。这也是通过ZooKeeper选举的。



Kraft集群

在Kafka的config目录下,提供了一个kraft的文件夹,在这里面就是Kraft协议的参考配置文件。在这个文件夹中有三个配置文件,broker.properties,controller.properties,server.properties,分别给出了Kraft中三种不同角色的示例配置。

  • broker.properties: 数据节点
  • controller.properties: Controller控制节点
  • server.properties: 即可以是数据节点,又可以是Controller控制节点。

这里同样列出几个比较关键的配置项,按照自己的环境进行定制即可。

[root@worker1 kafka_2.13-3.4.0]# vi config/kraft/server.properties
#配置当前节点的角色。Controller相当于Zookeeper的功能,负责集群管理。Broker提供具体的消息转发服务。
process.roles=broker,controller
#配置当前节点的id。与普通集群一样,要求集群内每个节点的ID不能重复。
node.id=1
#配置集群的投票节点。其中@前面的是node.id的值,后面是节点的地址和端口,这个端口跟客户端访问的端口是不一样的。通常将集群内的所有Controllor节点都配置进去。
controller.quorum.voters=1@192.168.75.61:9093,2@192.168.75.62:9093,3@192.168.75.63:9093
#Broker对客户端暴露的服务地址。基于PLAINTEXT协议。一般写本机ip
advertised.listeners=PLAINTEXT://worker1:9092
#Controller服务协议的别名。默认就是CONTROLLER
controller.listener.names=CONTROLLER
#配置监听服务。不同的服务可以绑定不同的接口。这种配置方式在端口前面是省略了一个主机IP的,主机IP默认是使用的java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#数据文件地址。默认配置在/tmp目录下。
log.dirs=/app/kafka/kraft-log
#topic默认的partition分区数。
num.partitions=2



将配置文件分发,并修改每个服务器上的node.idcontroller.quorum.voterslog.dirsnum.partitions属性。

由于Kafka的Kraft集群对数据格式有另外的要求,所以在启动Kraft集群前,还需要对日志目录进行格式化。

# 先使用kafka提供的脚本生成一个uuid
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
NT1Y5KgOTQ63SPppgsfhqA

# -t 集群ID,三个服务器上使用同一个集群ID。 加上上方的uuid  -c 指定配置文件
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t NT1Y5KgOTQ63SPppgsfhqA -c config/kraft/server.properties 




在这里插入图片描述



接下来就可以指定配置文件,启动Kafka的服务了。 例如,在Worker1上,启动Broker和Controller服务。

[root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/kraft/server.properties 
[root@worker1 kafka_2.13-3.4.0]# jps
12960 Kafka
13028 Jps

等三个服务都启动完成后,就可以像普通集群一样去创建Topic,并维护Topic的信息了。



相关概念

topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。

消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略



一个partition中的消息,只会被一个消费者组中的某一个消费者消费,和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的



消息副本,每个消费者组都有一个消息副本,而一个消费者组下的多个消费者共用一个消息副本,也就是一条消息只能被一个消费者组下的一个消费者消费。



概念:

  • 客户端client:消息生产者和消息消费者都是Kafka的Client

  • 消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。

  • 服务端Broker:一个Kafka服务就是一个Broker

  • 话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题

  • 分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。

在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生

而消费者消费消息的offset其实就是每个消息在partition上的偏移量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1990134.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索Transformer中的多头注意力机制:如何利用GPU并发

什么是多头注意力机制? 首先,什么是多头注意力机制?简单来说,它是Transformer模型的核心组件之一。它通过并行计算多个注意力头(attention heads),使模型能够从不同的表示子空间中捕捉不同的特…

Oracle服务器windows操作系统升级出现计算机名称改变导致数据库无法正常连接

1.数据库莫名奇妙无法正常连接,经排查是主机名称改变,导致oracle无法正常运行 如何查看ORACLE主机名称及路径:需要修改 listener 和 tnsnames的配置的主机名 2.修改tnsnames配置的主机名称,HOST主机名称 3.修改listener中的主机…

【案例36】Apache未指向新的openssl

客户发现apache报openssl相关漏洞,于是升级了操作系统的openssl组件。但再次漏扫发现相关版本依旧显示openssl的版本为:1.0.2k。怀疑升级的有问题。 问题分析 查看libssl.so.10指向的是/lib64.so.10 ldd mod_ssl.so libssl.so.10指向的是openssl1.0.2k…

【实际案例】服务器宕机情况分析及处理建议

了解银河麒麟操作系统更多全新产品,请点击访问麒麟软件产品专区:https://product.kylinos.cn 服务器环境以及配置 物理机/虚拟机/云/容器 物理机 外网/私有网络/无网络 私有网络 处理器: Kunpeng 920 内存: 4 TiB BIOS版…

【JVM基础18】——实践-Java内存泄漏排查思路?

目录 1- 引言:2- ⭐核心:2-1 排查思路2-2 步骤1:获取堆内存快照 dump2-3 步骤2、3:使用 VisualVM 打开 dump文件 3- 小结:3-1 Java内存泄漏排查思路? 1- 引言: 首先得明确哪里会产生内存泄漏的…

Solidworks API利用C# 实现物体的运动与碰撞检测

详情见github连接 SolidWorks-API-Collision-Detection Use SolidWorks API to MovePart and Collision Detection 利用solidworks的API来移动控件物体以及进行碰撞检测 visual studio 2022 利用Nuget 安装这些库 打开solidworks 可以看到有两个控件 部件运动 使用封装的函…

嵌入式初学-C语言-十七

#接嵌入式初学-C语言-十六# 函数的递归调用 含义: 在一个函数中直接或者间接调用了函数本身,称之为函数的递归调用 // 直接调用a()->a(); // 间接调用a()->b()->a();a()->b()->..->a();递归调用的本质: 本是是一种循环…

深入理解Spring的三级缓存机制

个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119qq.com] &#x1f4f1…

Ubuntu(20.04 LTS)更换镜像源

此换镜像源方法只适用x86_64架构的系统,其他架构的系统参考ubuntu-ports的方法 1、备份文件 sudo mv /etc/apt/sources.list /etc/apt/sources.list.bk2、创建新文件 sudo vi /etc/apt/sources.list根据自己系统版本选择下面对应的镜像源添加到新文件中&#xf…

智能指针--

智能指针简介 头文件&分类 智能指针都在memory中, 有auto_ptr, unique_ptr, shared_ptr, weak_ptr 智能指针发展史 C++98就有智能指针了(auto_ptr) c++11前,智能指针主要靠bo…

FPGA开发——在Quartus中实现对IP核的PLL调用

一、简介 PLL主要由鉴相器(PD)、环路滤波器(LF)和压控振荡器(VCO)三部分组成。鉴相器检测输入信号与VCO输出信号的相位差,并输出一个与相位差成正比的电压信号。该信号经过环路滤波器滤除高频噪…

esp32学习笔记

前言:学习视频链接放在最后,开发方式为esp32Arduino,使用型号为ESP32-WROOM-32,引脚功能分配图如下。 #esp32介绍 GPIO的引脚默认情况下,只能当做普通功能引脚使用,也就是只能输入,输出&#x…

git 常用指令(创建分支、提交分支、解决冲突)

1. 初始化git 将你的代码放入你创建的文件中&#xff0c;执行 git init(前提你电脑安装过git哈)2. 查看当前项目git 状态 git status 3. 将代码添加到暂存区 git add . &#xff08;提交所有修改的代码&#xff0c;如果向指定提交使用&#xff1a;git add <文件名>&am…

SQL语句创建数据库(增删查改)

SQL语句 一.数据库的基础1.1 什么是数据库1.2 基本使用1.2.1 连接服务器1.2.2 使用案例 1.2 SQL分类 二.库的操作2.1 创建数据库2.2 创建数据库示例2.3 字符集和校验规则2.3.1 查看系统默认字符集以及校验规则2.3.2查看数据库支持的字符集2.3.3查看数据库支持的字符集校验规则2…

【RTOS面试题】ISR中可以使用互斥锁和信号量吗?

在中断服务程序&#xff08;ISR, Interrupt Service Routine&#xff09;中直接使用互斥锁&#xff08;mutex&#xff09;和信号量&#xff08;semaphore&#xff09;是有风险的&#xff0c;因为这些同步机制通常不是中断安全的。但是&#xff0c;可以通过一些方法来安全地在 I…

QWT+Qt Creator+MSVC的配置与使用

目录 一、介绍 二、QWT下载 三、QWT编译 3.1 设置构建套件 3.2 修改QWT相关文件 3.3 进行QWT编译 四、QWT配置 4.1 配置QWT的lib文件 4.2 配置QWT的dll文件 4.3 配置QWT的designer的dll文件 五、代码实验 一、介绍 QWT&#xff0c;全称是Qt Widgets for Technical…

Python 异步编程:Asyncio 实现原理

常见的并发模型 多进程/多线程异步ActorPub/Sub Python 异步的基石&#xff1a;协程 协程简介 概念&#xff1a;协作式多任务的子程序&#xff0c;用户态线程或微线程&#xff08;Coroutine&#xff09;。 特点&#xff1a;子程序执行可以中断&#xff0c;恢复后不会丢失之…

uniapp 荣耀手机 没有检测到设备 运行到Android手机 真机运行

背景&#xff1a; 使用uniapp框架搭建的项目&#xff0c;开发的时候在浏览器运行&#xff0c;因为项目要打包成App&#xff0c;所以需要真机联调&#xff0c;需要运行到Android手机&#xff0c;在手机上查看/运行项目。通过真机调试才能确保软件开发的准确性和页面显示的完整性…

mac 2k显示器 配置

前言 今年5月份买了一个2k显示器&#xff0c;刚收到的时候发现只有一个1080 x 720&#xff08;HiDPI&#xff09;分辨率是人眼看起来比较舒服的&#xff0c;于是一直用着。但是直到开始写前端代码的时候&#xff0c;我才发现&#xff0c;网页在2k显示器和内建显示器的布局竟然…

Python 循环引用与内存泄漏:深度解析

Python 循环引用与内存泄漏&#xff1a;深度解析 在Python编程中&#xff0c;循环引用和内存泄漏是两个需要特别注意的问题。本文将深入探讨Python中的循环引用现象、其导致的内存泄漏问题&#xff0c;并提供详细的解决思路与方法。同时&#xff0c;我们还将分析一些常见场景&…