在k8s中部署Kafka高可用集群超详细讲解

news2024/12/25 10:04:40

🐇明明跟你说过:个人主页

🏅个人专栏:《数据流专家:Kafka探索》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、为什么在Kubernetes中部署Kafka

二、Kubernetes基础 

1、Kubernetes概述

2、Pods、Services、Deployments等基本概念

三、Kafka集群架构

1、Kafka集群的组成与工作原理

2、Kafka集群的高可用性设计

四、准备部署环境 

1、准备k8s集群

2、准备StorageClass 

3、准备部署Kafka集群所需的镜像 

五、部署Kafka集群

1、部署Zookeeper集群

2、部署Kafka集群


一、引言

1、Kafka简介

阿帕奇Kafka(Apache Kafka)是一个开源的分布式流处理平台,由LinkedIn开发并于2011年开源,现在是Apache软件基金会的一部分。Kafka的设计初衷是为了处理实时数据流,具备高吞吐量、低延迟、高容错性和可扩展性的特点。

Kafka的核心概念


1. Producer(生产者):

  • 生产者是向Kafka主题(topic)发送消息的应用程序。它们负责将数据推送到Kafka集群。

2. Consumer(消费者):

  • 消费者是从Kafka主题中读取消息的应用程序。它们可以是单个消费者或消费者组(consumer group),每个消费者组中的消费者可以并行地读取数据。

3. Topic(主题):

  • 主题是Kafka中的一个逻辑分类,用于将消息进行分组。每个主题可以细分为多个分区(partition),每个分区内的消息是有序的,但不同分区之间没有顺序保证。

4. Partition(分区):

  • 分区是主题的一个子单元,每个分区可以存储大量消息,并且每个分区可以被多个消费者读取。分区使得Kafka能够水平扩展。

5. Broker(代理):

  • Broker是Kafka集群中的一个服务器节点,负责接收、存储和发送消息。一个Kafka集群可以包含多个broker,以提供高可用性和容错能力。

6. Zookeeper:

  • Kafka使用Zookeeper来进行分布式协调和管理集群状态。Zookeeper负责维护配置信息、分区的leader选举以及消费组的offset管理等。

Kafka的主要功能


1. 消息持久化:

  • Kafka将消息持久化到磁盘,并且允许设置保留策略,确保数据的可靠性。

2. 高吞吐量:

  • Kafka能够处理高吞吐量的数据流,适合处理大规模的数据流应用。

3. 低延迟:

  • Kafka的设计优化了数据传输路径,使得消息传输具有低延迟。

4. 扩展性:

  • Kafka的分区机制允许在集群中增加更多的broker和分区,从而实现水平扩展。

5. 容错性:

  • 通过复制机制,Kafka确保了数据的高可用性,即使个别节点故障,数据仍然能够恢复。

Kafka的应用场景


1. 日志收集和聚合:

  • 作为一种高效的日志收集系统,Kafka可以收集和聚合分布式系统的日志数据。

2. 实时数据流处理:

  • Kafka与流处理框架(如Apache Flink、Apache Storm等)结合,可以进行实时数据分析和处理。

3. 消息队列:

  • Kafka可以充当高吞吐量的消息队列系统,用于不同应用程序之间的解耦和异步通信。

4. 事件源:

  • Kafka可以用作事件源系统,捕获和存储所有变化事件,以便后续处理和回放。

2、为什么在Kubernetes中部署Kafka

  1. 弹性和可伸缩性:Kubernetes提供了强大的自动伸缩和调度功能,可以根据负载情况自动扩展Kafka集群的节点数量,以满足不同工作负载的需求。这种弹性能力使得Kafka能够更好地应对数据量的变化和突发性负载。
  2. 易于管理:Kubernetes提供了统一的管理接口和工具,简化了Kafka集群的部署、扩展、更新和监控。通过Kubernetes的控制面板,管理员可以轻松地管理Kafka集群的生命周期,而无需深入了解底层的部署细节。
  3. 高可用性:Kubernetes具有自动故障检测和恢复机制,可以在节点故障时自动重新调度Kafka的副本,确保数据的高可用性和持久性。
  4. 资源隔离:通过Kubernetes的命名空间和资源限制功能,可以实现Kafka集群与其他应用程序之间的资源隔离,避免因资源竞争导致的性能问题。

 
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==编辑

二、Kubernetes基础 

1、Kubernetes概述

Kubernetes(K8s)是一个开源的容器编排平台,最初由Google开发并于2014年发布,现已成为Cloud Native Computing Foundation(CNCF)的一部分。Kubernetes旨在简化容器化应用程序的部署、管理和扩展,提供了丰富的功能和工具,使得用户可以更轻松地构建和运行分布式系统。

Kubernetes的主要功能


1. 自动化部署和扩展:

  • Kubernetes提供了丰富的调度和自动伸缩功能,可以根据负载情况自动调度和扩展应用程序的副本数量,以适应不同的工作负载需求。

2. 服务发现和负载均衡:

  • Kubernetes通过Service对象提供了统一的服务发现和负载均衡功能,使得应用程序可以通过简单的域名访问其他服务,并自动实现负载均衡。

3. 存储管理:

  • Kubernetes支持多种存储后端和存储卷类型,可以为应用程序提供持久化存储和共享存储功能,如PersistentVolume、PersistentVolumeClaim等。

4. 自我修复:

  • Kubernetes具有自我修复和健康检查机制,可以监控容器和节点的健康状态,并在出现故障时自动进行修复和恢复。

   

2、Pods、Services、Deployments等基本概念

1. Pods(容器组)


Pod是Kubernetes中最小的可部署单元,它可以包含一个或多个紧密相关的容器。这些容器共享相同的网络命名空间和存储卷,并且通常在同一节点上运行。Pod提供了一种逻辑主机的概念,使得容器之间可以共享资源和通信。Pod通常用于部署一个应用程序的一组相关容器。

2. Services(服务)


服务是Kubernetes中一种抽象,用于定义一组Pod的逻辑集合,并为它们提供统一的访问入口。服务通过标签选择器(Selector)将请求路由到对应的Pod,并实现负载均衡和服务发现功能。Kubernetes支持多种类型的服务,如ClusterIP、NodePort、LoadBalancer和ExternalName。

3. Deployments(部署)


部署是Kubernetes中用于管理Pod副本数量和应用程序版本更新的对象。部署定义了应用程序的期望状态,包括所需的副本数量、容器镜像和其他配置信息。部署控制器负责根据部署的定义创建、更新和删除Pod实例,同时提供滚动更新和回滚的功能。

三、Kafka集群架构

1、Kafka集群的组成与工作原理

Kafka集群由多个服务器节点(Broker)组成,每个Broker负责存储和处理一部分数据。Kafka的工作原理基于发布/订阅模式,其中生产者(Producer)将消息发布到一个或多个主题(Topic),而消费者(Consumer)订阅这些主题并从Broker拉取消息进行处理。

工作原理


1. 消息存储:

  • 生产者将消息发布到主题,并根据主题的分区策略选择将消息存储到对应的分区中。每个分区的消息按顺序存储,并根据消息的偏移量(Offset)进行标识。

2. 消息复制:

  • Kafka通过副本集机制实现数据的高可用性和持久性。每个分区都有一个主副本(Leader Replica)和多个副本(Follower Replica),主副本负责处理读写请求,而副本用于备份数据。当主副本故障时,Kafka会自动选举一个新的主副本来接管工作。

3. 消息传输:

  • 生产者将消息发送到Broker的主副本,Broker负责将消息复制到副本并进行持久化存储。消费者从Broker的主副本拉取消息进行处理,可以选择从不同的副本读取消息以实现负载均衡和故障恢复。

4. 消息保留策略:

  • Kafka支持设置消息的保留策略,包括基于时间、基于大小或基于日志段的策略。一旦消息达到保留期限或超过指定的大小限制,Kafka将自动删除过期消息,以释放存储空间。

   

2、Kafka集群的高可用性设计

1. 多副本复制

  • Kafka通过在多个Broker之间复制分区的副本来实现数据的冗余和高可用性。每个分区通常有多个副本,其中一个是主副本(Leader Replica),负责处理读写请求;其他副本是从副本(Follower Replica),用于备份数据。当主副本发生故障时,Kafka会自动选举一个新的主副本来接管工作,确保数据的可用性和一致性。

2. 自动故障检测与恢复

  • Kafka集群内置了自动故障检测和恢复机制,可以监控Broker和分区的健康状态,并在发现故障时自动进行故障转移和数据恢复。当Broker或分区发生故障时,Kafka会触发重新选举过程,选举出新的主副本并将数据复制到新的副本中,以实现快速的故障恢复。

3. 副本分布和均衡

  • Kafka会将分区的副本分布在不同的Broker上,以确保数据的冗余和均衡。通过将副本分布在不同的机架、数据中心或云区域中,可以提高系统的容错能力,即使整个机架或数据中心发生故障,系统仍然能够继续运行。

4. 水平扩展和动态伸缩

  • Kafka支持水平扩展和动态伸缩,可以根据负载情况和性能需求来增加或减少Broker的数量和容量。通过添加更多的Broker和分区,可以提高系统的吞吐量和容量,同时降低单点故障的风险,实现更高的可用性和可伸缩性。

   

四、准备部署环境 

1、准备k8s集群

这里我们使用的k8s集群版本为 1.23,也可以使用其他的版本,只是镜像导入命令不通,如果还未搭建k8s集群,请参考《在Centos中搭建 K8s 1.23 集群超详细讲解》这篇文章

2、准备StorageClass 

因为我们要对Kafka中的数据进行持久化,避免Pod漂移后数据丢失,保证数据的完整性与可用性

如果还未创建存储类,请参考《k8s 存储类(StorageClass)创建与动态生成PV解析,(附带镜像)》这篇文件。

3、准备部署Kafka集群所需的镜像 

 在k8s Node节点上执行以下命令

[root@node1 ~]# docker pull zookeeper:3.8
[root@node1 ~]# docker pull kafka:3.1.0
[root@node2 ~]# docker pull zookeeper:3.8
[root@node2 ~]# docker pull kafka:3.1.0
[root@node3 ~]# docker pull zookeeper:3.8
[root@node3 ~]# docker pull kafka:3.1.0

五、部署Kafka集群

1、部署Zookeeper集群

为什么部署kafka前要部署zookeeper?

Kafka依赖Zookeeper来实现分布式协调和配置管理。在Kafka集群中,Zookeeper扮演着多种角色,包括:

  1. 配置管理:Kafka集群的配置信息和元数据存储在Zookeeper中,包括主题(topics)、分区(partitions)、副本(replicas)等配置信息。
  2. Leader选举:Kafka的分区(partitions)被分布式存储在集群中的多个Broker上,每个分区都有一个Leader和多个Follower。Zookeeper负责Leader选举,确保每个分区都有一个活跃的Leader。
  3. Broker注册:Kafka Broker在启动时会向Zookeeper注册自己的信息,包括地址、ID等,以便其他Broker和客户端发现和连接。
  4. 健康监测:Zookeeper监控Kafka集群中各个节点的健康状态,并在节点出现故障或宕机时触发相应的处理操作。

因此,在部署Kafka之前,需要先部署Zookeeper,确保Kafka集群正常运行所需的分布式协调和配置管理功能可用。没有Zookeeper,Kafka无法正常运行,并且无法实现高可用性、数据一致性和故障容错等特性。

编写部署Zookeeper集群的YAML文件

[root@master ~]# vim zook.yaml
# 添加如下内容
apiVersion: v1
kind: Namespace
metadata:
  name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系
  namespace: kafka
  labels:
    app: zookeeper
spec:
  ports:
    - port: 2181
      name: zookeeper
    - port: 2188
      name: cluster1
    - port: 3888
      name: cluster2
  clusterIP: None
  selector:
    app: zookeeper
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-0
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-0
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32181   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-1
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-1
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32182   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-2
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-2
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32183   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: zookeeper-config
  namespace: kafka
  labels:
    app: zookeeper
data:             #具体挂载的配置文件
  zoo.cfg: |+
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data
    dataLogDir=/datalog
    clientPort=2181
    server.1=zookeeper-0.zookeeper-cluster.kafka:2188:3888
    server.2=zookeeper-1.zookeeper-cluster.kafka:2188:3888
    server.3=zookeeper-2.zookeeper-cluster.kafka:2188:3888
    4lw.commands.whitelist=*
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: kafka
spec:
  serviceName: "zookeeper-cluster"   #填写无头服务的名称
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      initContainers:
      - name: set-zk-id
        image: busybox:latest
        command: ['sh', '-c', "hostname | cut -d '-' -f 2 | awk '{print $0 + 1}' > /data/myid"]
        volumeMounts:
        - name: data
          mountPath: /data
      containers:
      - name: zookeeper
        image: zookeeper:3.8
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "1000m"
        ports:
        - containerPort: 2181
          name: zookeeper
        - containerPort: 2188
          name: cluster1
        - containerPort: 3888
          name: cluster2
        volumeMounts:
          - name: zook-config            #挂载配置
            mountPath: /conf/zoo.cfg
            subPath: zoo.cfg
          - name: data
            mountPath: /data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
      volumes:
      - name: zook-config
        configMap:                                #configMap挂载
          name: zookeeper-config
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi

创建Zookeeper集群

[root@master ~]# kubectl apply -f  zook.yaml

查看Pod状态  

2、部署Kafka集群

编写部署Kafka集群的YAML文件

[root@master ~]# vim kafka.yaml
# 输入如下内容 
apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系
  namespace: kafka
  labels:
    app: kafka
spec:
  ports:
    - port: 9092
      name: kafka
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-0
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka0-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30092   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-1
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka1-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30093   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-2
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka2-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30094   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka0
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka0
  template:
    metadata:
      labels:
        app: kafka0
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=0 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30092 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data0
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data0
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka1
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka1
  template:
    metadata:
      labels:
        app: kafka1
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=1 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30093 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data1
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data1
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka2
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka2
  template:
    metadata:
      labels:
        app: kafka2
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=2 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30094 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data2
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data2
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi

创建Kafka集群

[root@master ~]# kubectl apply -f  kafka.yaml 

 查看Pod状态

  

至此,Kafka集群部署完成

💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于Kafka的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1792677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Facebook的隐私保护挑战:用户数据安全的新时代

在全球范围内,Facebook已经成为了不可忽视的社交媒体巨头,它连接着超过20亿的活跃用户。然而,随着其影响力的不断扩大,关于用户隐私和数据安全的问题也愈加引人关注。本文将深入探讨Facebook面临的隐私保护挑战,以及它…

列存在 OceanBase 数据库架构中的应用与演进

OceanBase 4.3 版本上线了列存功能,以满足实时分析的需求。 本文作为《特性解读:列存技术》的后续,将详细阐述列存技术在OceanBase数据库架构中的应用、发展历程,以及未来的趋势。 一、前言 1970 年,关系模型之父 Co…

275 基于matlab的脉搏信号处理GUI界面编程

基于matlab的脉搏信号处理GUI界面编程,并实现滤波、去噪、实时回放、小波分析 计算脉率。采用低通滤波器,计算巴特沃斯数字滤波器的阶数N和截止频率Wn、使用coif4小波基计算信号的平稳小波分解完成降噪。程序已调通,可直接运行。 275 脉搏信号…

Ubuntu22.04之安装有道词典(二百三十六)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…

SQL158 每类视频近一个月的转发量/率

描述 用户-视频互动表tb_user_video_log iduidvideo_idstart_timeend_timeif_followif_likeif_retweetcomment_id110120012021-10-01 10:00:002021-10-01 10:00:20011NULL210220012021-10-01 10:00:002021-10-01 10:00:15001NULL310320012021-10-01 11:00:502021-10-01 11:01…

【golang学习之旅】Go中的cron定时任务

系列文章 【golang学习之旅】报错:a declared but not used 【golang学习之旅】Go 的基本数据类型 【golang学习之旅】深入理解字符串string数据类型 【golang学习之旅】go mod tidy 【golang学习之旅】记录一次 panic case : reflect: reflect.Value.SetInt using…

微处理器体系结构

1.冯诺依曼结构 传统计算机采用冯●诺依曼(Von Neumann)结构,也称普林斯顿结构,是一种将程序指令存储器和数据存储器合并在一起的存储器结构。 特征: 冯●诺依曼结构的计算机程序和数据共用一个存储空间,程序指令存储地址和数据…

SAP IQ03 Error IW351

今天在系统内碰到一个情况是IQ03 无法显示设备对象 报错为:IW 351 Equipment object status errors BS001 IW351 查阅资料后,note 1835087 提供了解决办法 1.SE38-->RISTEQ03 2.输入设备号,测试执行,确认无误后&#xff0…

8990890

作者主页:作者主页 数据结构专栏:数据结构 创作时间 :2024年5月18日

基于百度接口的实时流式语音识别系统

目录 基于百度接口的实时流式语音识别系统 1. 简介 2. 需求分析 3. 系统架构 4. 模块设计 4.1 音频输入模块 4.2 WebSocket通信模块 4.3 音频处理模块 4.4 结果处理模块 5. 接口设计 5.1 WebSocket接口 5.2 音频输入接口 6. 流程图 程序说明文档 1. 安装依赖 2.…

重邮803计网概述

目录 一.计算机网络向用户提供的最重要的功能 二.互联网概述 1.网络的网络 2.计算机网络的概念 3. 互联网发展的三个阶段 4.制订互联网的正式标准要经过以下的四个阶段 5.互联网的组成(功能) 6.互联网功能 7.互联网的组成(物理&#…

13_前端工程化_ES6

1.前端工程化概念 前端工程化是使用软件工程的方法来单独解决前端的开发流程中模块化、组件化、规范化、自动化的问题,其主要目的为了提高效率和降低成本。 前后端分离(前端代码工程化独立出来形成一个单独的app) 1.开发分离 2.部署分离 3.服务器分离…

图片和PDF展示预览、并支持下载

需求 展示图片和PDF类型&#xff0c;并且点击图片或者PDF可以预览 第一步&#xff1a;遍历所有的图片和PDF列表 <div v-for"(data,index) in parerFont(item.fileInfo)" :key"index" class"data-list-item"><downloadCard :file-inf…

STM32HAL-最简单的长、短、多击按键框架

目录 概述 一、开发环境 二、STM32CubeMx配置 三、编码 四、运行结果 五、总结 概述 本文章使用最简单的写法实现长、短、多击按键框架&#xff0c;非常适合移植各类型单片机&#xff0c;特别是资源少的芯片上。接下来将在stm32单片机上实现&#xff0c;只需占用1个定时…

spring 启动顺序

BeanFactoryAware 可在Bean 中获取 BeanFactory 实例 ApplicationContextAware 可在Bean 中获取 ApplicationContext 实例 BeanNameAware 可以在Bean中得到它在IOC容器中的Bean的实例的名字。 ApplicationListener 可监听 ContextRefreshedEvent等。 CommandLineRunner 整…

MySQL学习——选项文件的使用

MySQL 的许多程序都可以从选项文件&#xff08;有时也被称为配置文件&#xff09;中读取启动选项。选项文件提供了一种方便的方式来指定常用的选项&#xff0c;这样你就不必每次运行程序时都在命令行上输入这些选项。 要确定一个程序是否读取选项文件&#xff0c;你可以使用 -…

【工具箱】嵌入式系统存储芯片——CS创世 SD NAND

大家都知道MCU是一种"麻雀"虽小&#xff0c;却"五脏俱全"的主控。它的应用领域非常广泛&#xff0c;小到手机手表&#xff0c;大到航空航天的设备上都会用到MCU.市面上目前几个主流厂商有意法半导体&#xff08;其中最经典的一款就是STM32系列&#xff09;…

SSL证书国产化对提升我国网络安全的重要性

在互联网时代&#xff0c;数据安全和隐私保护成为全球关注的焦点。SSL证书作为保障网络通信安全的关键技术&#xff0c;其重要性不言而喻。然而&#xff0c;长期以来&#xff0c;我国在SSL证书领域过度依赖国外产品和技术&#xff0c;不仅存在安全隐患&#xff0c;也制约了我国…

LNMP网络架构的搭建

操作准备&#xff1a;准备三台虚拟机 安装 MySQL 服务 &#xff08;1&#xff09;准备好mysql目录上传软件压缩包并解压 cd /opt mkdir mysql tar xf mysql-boost-5.7.44.tar.gz &#xff08;2&#xff09;安装mysql环境依赖包 yum -y install ncurses ncurses-devel bison…

计算机网络之曼彻斯特编码和差分曼彻斯特编码

目录 前言 曼彻斯特编码 定义 策略 思路 差分曼彻斯特编码 定义 策略 思路 结束语 前言 今天是坚持写博客的第十九天&#xff0c;很高兴自己又坚持了一天&#xff0c;今天想送给自己一句李白《行路难》当中的诗词&#xff0c;希望我自己和大家都可以铭记于心&#x…