Kafka消费者群组和负载均衡

news2024/12/27 12:30:54
alt

前言

在大数据时代,消息处理成为众多企业关注的焦点。而Kafka作为一种高性能、分布式的消息系统,通过其消费者群组和负载均衡的特性,实现了高效的消息处理和可靠的数据传递。

消费者群组

Kafka的消费者群组是一种灵活而强大的机制,允许多个消费者协同工作以实现高吞吐量的消息处理。消费者群组通过订阅同一个主题的不同分区,实现消息的并行处理。当有新消息到达时,Kafka根据一定的策略将消息均匀地分配给不同的群组成员消费。

alt

消费者群组的好处不仅体现在提高消息处理的吞吐量,还在于实现了负载均衡。消费者群组中的每个消费者将被分配到不同的分区上,这样每个消费者只需处理部分消息,大大减轻了单个消费者的负担,提高了整体的处理效率。

消费模式

发布订阅模式

顾名思义就是生产者发布消息以后,消费者订阅对应的主题分区,然后进行消费,它是一对多的,就像本号,一个订阅号会有很多人关注,当我们发布一篇文章,各位关注的帅哥美女都能收到。

那么kafka的发布订阅模式怎么实现的呢?

很简单,就是不同的消费组就能实现发布订阅模式,在上面我们说了一个分区只能被同一个消费组内的消费者消费,那么我们使用不同的消费组的消费者消费同一个分区就行了,这就实现了发布订阅模式,假如有三个消费组a,b,c的消费者c1,c2,c3消费同一个分区,在kafka中使用groupId来表示消费组,如果所有的消费者的groupId都设置一样,那么他们就属于同一个消费组,具体如下图:

alt

发布订阅模式的应用场景有很多,比如下游有很多服务都需要使用同一份数据,如果通过编码的方式来实现的话,可以通过RPC方式来调用,但是就会造成系统的耦合,使用消息中间件的话,上游只管投递消息,下游服务订阅后,就可以消费到消息,大大降低了耦合。

点对点模式

点对点模式就是一对一模式,如微信、QQ的两个人聊天,在kafka中要使用点对点模式,那么我们还是要回到一个分区只能被同一个消费组内的消费者消费这个问题上,我们创建了一个消费组,每个分区的数据只能被这个消费组内的消费者消息,就实现了点对点模式。 alt

点对点的应用场景也很多,因为它的数据只能被一个消费者使用,比如可以削峰,比如上游服务发送了很多数据过来,如果下游服务的就只有一个消费者实例,那么就可能造成消息的积压,这时候就可以多开几个消费者实例一起消费,就加快了消息的消费速度,不过也得考虑一些因素,比如消息的顺序。

负载均衡

在Kafka中,负载均衡是指将消费者群组的负载均匀地分配给不同的消费者,以实现最大化的利用。Kafka通过内置的负载均衡算法,自动监测和管理消费者的状态,确保消息能够被高效地分发和处理。

下面是几种常见的负载均衡策略的详细解释:

  1. Round-robin(轮询): 这是最简单也是最常见的负载均衡策略。当有新的消费者加入消费者群组或者有分区需要重新分配时,Kafka按照轮询的方式将分区依次分配给消费者实例。这意味着每个消费者实例依次接收到一个分区,然后循环往复。轮询策略适用于消费者实例处理能力相当的情况。
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.Cluster;

public class RoundRobinAssignor implements RangeAssignor {

   @Override
   public Map<String, List<TopicPartition>> assign(Cluster metadata, Map<String, Integer> assignments) {
       // 对metadata中的所有分区按照字典序排序
       List<PartitionInfo> sortedPartitions = metadata.partitions()
           .stream()
           .sorted(Comparator.comparing(PartitionInfo::toString))
           .collect(Collectors.toList());

       // 将所有消费者节点按照字典序排序
       List<String> sortedConsumers = assignments.entrySet()
           .stream()
           .sorted(Comparator.comparing(Map.Entry::getKey))
           .map(Map.Entry::getKey)
           .collect(Collectors.toList());

       // 平均分配所有分区给所有消费者
       Map<String, List<TopicPartition>> result = new HashMap<>();
       for (int i = 0; i < sortedConsumers.size(); i++) {
           String consumerId = sortedConsumers.get(i);
           List<TopicPartition> consumerPartitions = new ArrayList<>();
           for (int j = i; j < sortedPartitions.size(); j += sortedConsumers.size()) {
               PartitionInfo partitionInfo = sortedPartitions.get(j);
               TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
               consumerPartitions.add(partition);
           }
           result.put(consumerId, consumerPartitions);
       }

       return result;
   }

}
  1. Range(范围): 这种策略会将主题分区根据分区ID的范围进行分配。每个消费者实例被分配一定范围的分区。例如,如果一个主题有10个分区,而消费者群组有4个消费者实例,则第一个消费者实例被分配分区0-2,第二个消费者实例被分配分区3-5,依此类推。范围策略适用于消费者实例的处理能力不同的情况,可以更合理地分配负载。
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.Cluster;

public class RangeAssignor implements RangeAssignor {

    @Override
    public Map<String, List<TopicPartition>> assign(Cluster metadata, Map<String, Integer> assignments) {
        // 对metadata中的所有分区按照分区ID范围排序
        List<PartitionInfo> sortedPartitions = metadata.partitions()
            .stream()
            .sorted(Comparator.comparing(PartitionInfo::partition))
            .collect(Collectors.toList());

        // 计算每个消费者节点应该分配哪些分区
        Map<String, List<TopicPartition>> result = new HashMap<>();
        int consumerCount = assignments.size();
        int maxPartitionId = sortedPartitions.get(sortedPartitions.size() - 1).partition();
        for (Map.Entry<String, Integer> entry : assignments.entrySet()) {
            String consumerId = entry.getKey();
            int consumerIndex = entry.getValue();

            List<TopicPartition> consumerPartitions = new ArrayList<>();
            int startPartition = maxPartitionId;
            for (int i = 0; i < sortedPartitions.size(); i++) {
                PartitionInfo partitionInfo = sortedPartitions.get(i);
                TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                if (i % consumerCount == consumerIndex) {
                    consumerPartitions.add(partition);
                    startPartition = partitionInfo.partition();
                } else if (partitionInfo.partition() <= startPartition) {
                    consumerPartitions.add(partition);
                }
            }

            result.put(consumerId, consumerPartitions);
        }

        return result;
    }

}

  1. Capacity-based(基于处理能力): 这种负载均衡策略根据消费者实例的处理能力来进行分配。在消费者实例加入消费者群组或者分区需要重新分配时,Kafka会根据每个消费者实例的处理能力(例如,每秒处理消息的数量)来动态调整分配。较强的消费者实例会被分配更多的分区,以确保整体上的负载均衡。基于处理能力的策略适用于消费者实例的处理能力差异较大的情况。

    Kafka 并没有内置的 Capacity-based 策略,但你可以根据自己的需求来实现这个策略。Capacity-based 策略是根据每个消费者的处理能力和负载情况来分配分区,以实现负载均衡和最大化处理能力。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class CapacityBasedConsumerExample {

    public static void main(String[] args) {

        // 假设有3个消费者,每个消费者的处理能力为3
        int numConsumers = 3;
        int maxCapacity = 3;

        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092");
        props.put("group.id""my-consumer-group");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 模拟消费者的处理时间
                    // 这里假设消费者处理一条消息需要1秒
                    Thread.sleep(1000);
                    System.out.println("Partition: " + record.partition() +
                            ", Offset: " + record.offset() +
                            ", Key: " + record.key() +
                            ", Value: " + record.value());
                }
                // 手动提交消费位移
                consumer.commitSync();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

在示例代码中,我们假设有3个消费者,并且每个消费者的处理能力为3个消息/秒(通过模拟消费者处理时间来实现)。然后,我们使用 poll() 方法从 Kafka 服务器获取消息,并在消费每条消息时进行处理操作。在这里,通过 Thread.sleep(1000) 来模拟每条消息的处理时间。

请注意,这里的例子只是一个模拟,实际的 Capacity-based 策略可能更加复杂,涉及更多因素,如消费者的网络状况、处理能力的评估和动态调整等。

  1. Sticky(粘性): 粘性负载均衡策略会尽量将同一分区分配给同一个消费者实例,以避免消息的重新分配。当消费者实例离线或者新的消费者加入时,分区再平衡会尽量将原来分配给该消费者的分区分配给它。这可以确保消费者实例在处理分区时保持状态的连续性,适用于一些需要有序处理消息的场景。 Kafka 没有内置的 Sticky 策略,但你可以根据自己的需求来实现这个策略。

下面是一个简单的示例代码,展示了如何实现 Sticky 策略:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class StickyConsumerExample {

    public static void main(String[] args) {

        // 假设有3个消费者和6个分区
        int numConsumers = 3;
        int numPartitions = 6;

        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092");
        props.put("group.id""my-consumer-group");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 根据分区和消费者数量计算消费者的索引
                    int consumerIndex = Integer.parseInt(record.partition()) % numConsumers;
                    // 获取消费者的分配信息
                    Set<TopicPartition> partitions = consumer.assignment();
                    TopicPartition currentPartition = new TopicPartition(record.topic(), record.partition());
                    // 如果分区没有被分配给消费者,将其分配给对应的消费者
                    if (!partitions.contains(currentPartition)) {
                        // 暂停消费者的分区分配
                        consumer.pause(partitions);
                        // 分配特定的分区给消费者
                        consumer.assign(Collections.singletonList(currentPartition));
                        // 恢复消费者的分区分配
                        consumer.resume(partitions);
                    }
                    System.out.println("Consumer: " + consumerIndex +
                            ", Partition: " + record.partition() +
                            ", Offset: " + record.offset() +
                            ", Key: " + record.key() +
                            ", Value: " + record.value());
                }
                // 手动提交消费位移
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}

实现高效消息处理

利用Kafka的消费者群组和负载均衡特性,可以实现高效的消息处理。通过创建适当数量的消费者,将其组织成群组,并订阅合适的主题,可以将消息处理的负载分散到多个消费者上。

同时,消费者群组还具备一定的容错能力。当某个消费者发生故障或不可用时,其他消费者仍然可以继续处理消息,确保了系统的可靠性和连续性。

通过合理设置消费者群组和利用负载均衡算法,可以优化消息处理的性能和效率。消费者群组和负载均衡机制使得企业能够更好地应对庞大且高并发的消息流,并提供及时、可靠的数据传递服务。

总结

Kafka的消费者群组和负载均衡机制为企业提供了强大的消息处理能力。通过合理组织消费者群组,利用负载均衡算法,可以提高消息处理的吞吐量,实现高效的数据传递。这种高可扩展性和容错能力使得Kafka成为当今大数据领域中不可或缺的消息系统之一。

好货分享

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1068621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java进阶——Java深入学习的笔记汇总 JVM底层、多线程、类加载 ...

前言 spring作为主流的 Java Web 开发的开源框架&#xff0c;是Java 世界最为成功的框架&#xff0c;持续不断深入认识spring框架是Java程序员不变的追求&#xff1b;而spring的底层其实就是Java&#xff0c;因此&#xff0c;深入学习Spring和深入学习Java是硬币的正反面&…

Win10系统打开组策略编辑器的两种方法

组策略编辑器是Win10电脑中很实用的工具&#xff0c;它可以帮助用户管理和设置计算机的安全性、网络连接、软件安装等各种策略。但是&#xff0c;很多新手用户不知道打开Win10电脑中组策略编辑器的方法步骤&#xff0c;下面小编给大家介绍两种简单的方法&#xff0c;帮助打开快…

uniapp EventChannel 页面跳转参数事件传递navigateBack,navigateTo 成功后通知事件区别

问题&#xff1a;navigateBack&#xff08;&#xff09;emit事件在onload()监听不到 从A页面跳转到B页面&#xff0c;在B点击产生数据后&#xff0c;跳转回到A&#xff0c;并告诉A点击的数据是什么&#xff0c;使用&#xff1a; navigateBack&#xff08;&#xff09; B页面&…

内存溢出和内存泄漏

内存溢出和内存泄漏 内存溢出 内存溢出相对于内存泄漏来说&#xff0c;尽管更容易被理解&#xff0c;但是同样的&#xff0c;内存溢出也是引发程序崩溃的罪魁祸首之一。由于GC一直在发展&#xff0c;所以一般情况下&#xff0c;除非应用程序占用的内存增长速度非常快&#xf…

李沐深度学习记录5:13.Dropout

Dropout从零开始实现 import torch from torch import nn from d2l import torch as d2l# 定义Dropout函数 def dropout_layer(X, dropout):assert 0 < dropout < 1# 在本情况中&#xff0c;所有元素都被丢弃if dropout 1:return torch.zeros_like(X)# 在本情况中&…

模拟器运行在AndroidStudio内部,设置其独立窗口显示

在窗口内部运行 设置成独立窗口 Android Studio->Settings或Preferences->Tools->Emulator->取消勾选Launch in the Running Devices tool window --->点击右下角的OK按钮 ---> 重启Android Studio 再次启动模拟器

二维码基础学习指南

1.二维码基础原理 二维码生成原理及解析代码_二维码算法及原理-CSDN博客 2. 关于字符容纳的分辨 2.1 数字编码(Numeric Mode) 数字编码的范围为 0~9。 对于数字编码&#xff0c;统计需要编码数字的个数是否为 3 的倍数&#xff1a;如果不是 3 的倍数&#xff0c;则剩下的 1 …

ios safari 浏览器跳转页面没有自适应

今天开发遇到了一个问题&#xff0c;当用户点击浏览器中的表单进行注册时&#xff0c;表单元素会放大&#xff0c;随后跳转页面无法还原到初始状态。 这是因为如果 的 font-size 被设定为 16px 或更大&#xff0c;那么 iOS 上的 Safari 将正常聚焦到输入表单中。但是&#xff…

NSSCTF [BJDCTF 2020]easy_md5 md5实现sql

开局一个框 啥都没有用 然后我们进行抓包 发现存在提示 这里是一个sql语句 看到了 是md5加密后的 这里也是看了wp 才知道特殊MD5 可以被识别为 注入的万能钥匙 ffifdyopmd5 加密后是 276F722736C95D99E921722CF9ED621C转变为字符串 后是 or6 乱码这里就可以实现 注入 所…

一款构建Python命令行应用的开源库

1 简介 当我们编写 Python 程序时&#xff0c;我们经常需要与用户进行交互&#xff0c;接收输入并输出结果。Python 提供了许多方法来实现这一点&#xff0c;其中一个非常方便的方法是使用 typer 库。typer 是一个用于构建命令行应用程序的 Python 库&#xff0c;它使得创建命令…

Qt元对象系统 day4

Qt元对象系统 day4 元对象 元对象系统是一个基于标准C的扩展&#xff0c;为Qt提供了信号与槽机制、实时类型信息、动态属性系统。元对象可以操作、创建、描述或是执行其他对象&#xff0c;元对象又称为基对象元对象组成 QObject&#xff1a; QT 对象模型的核心&#xff0c;绝…

2023年9月:比特币逆势崛起!全球市场暴跌中的优异表现引人瞩目!

比特币在 9 月份上涨&#xff0c;而许多传统资产遭受了重大损失&#xff0c;凸显了加密货币的多元化特性。全球市场的压力似乎源于政府债券收益率上升和油价上涨。 随着比特币链上指标在本月的改善&#xff0c;强劲的基本面发挥了关键作用。稳定币市值在去年下降后趋于稳定&am…

RDkit的安装

1.一定要以管理员模式运行anaconda 2.用Anaconda创建一个新的虚环境 conda create -n my-rdkit python3.63.(进入)虚环境 #windows conda deactivate4.安装 conda install -c rdkit rdkit # 解释: -c 是选择channels, 选择从哪里下载, 第一个rdkit是通道,第二个是我们需要的…

SaaS和CRM软件系统间的关系

CRM系统和SaaS的概念是很多企业并不熟知的&#xff0c;CRM的字眼也是在数字化转型的浪潮下才渐渐出现在大家的眼前&#xff0c;让更多人认识到数字化工具的作用&#xff0c;但你清楚CRM系统和SaaS的关系吗&#xff1f; 什么是SaaS&#xff1f;SaaS可以理解为一种服务方式。厂商…

如何做好sop流程图?sop流程图用什么软件做?

5.如何做好sop流程图&#xff1f;sop流程图用什么软件做&#xff1f; 建立标准作业程序sop已经成为企业进步和发展的必经之路&#xff0c;不过&#xff0c;很多刚刚开始着手搭建sop的企业并不知道要如何操作&#xff0c;对于如何做sop流程图、用什么软件做sop流程图等问题充满…

数据中心负载测试中常见的挑战和解决方案有哪些?

数据中心负载测试中常见的挑战一个是搭建真实的测试环境&#xff0c;需要考虑到数据中心的规模、硬件设备、网络拓扑等因素&#xff0c;以确保测试的准确性和可靠性。在进行负载测试时&#xff0c;需要合理管理资源&#xff0c;包括服务器、存储设备、网络带宽等&#xff0c;以…

Selenium进行无界面爬虫开发

在网络爬虫开发中&#xff0c;利用Selenium进行无界面浏览器自动化是一种常见且强大的技术。无界面浏览器可以模拟真实用户的行为&#xff0c;解决动态加载页面和JavaScript渲染的问题&#xff0c;给爬虫带来了更大的便利。本文将为您介绍如何利用Selenium进行无界面浏览器自动…

C++ — 指针和数组的关系?

在本文中&#xff0c;您将了解数组与指针之间的关系&#xff0c;并在程序中有效地使用它们。 指针是保存地址的变量。指针不仅可以存储单个变量的地址&#xff0c;还可以存储数组单元的地址。 看以下示例&#xff1a; int* ptr; // 定义指针变量ptr int a[5]; ptr &a[2…

关于JDK于JRE路径配置问题

今天在配置tomcat时发现&#xff0c;无法找到jre的路径&#xff0c;在网上找了半天&#xff0c;才知道&#xff0c;JDK11版本之后&#xff0c;jre的路径默认和JDK路径一致&#xff0c;JDK11之后的文件夹中不再包含jre文件夹&#xff0c;由此在配置JRE环境变量时&#xff0c;只需…

194、SpringBoot --- 下载和安装 Erlang 、 RabbitMQ

本节要点&#xff1a; 一些命令&#xff1a; 小黑窗输入&#xff1a; rabbitmq-plugins enable rabbitmq_management 启动控制台插件 rabbitmq-server 启动rabbitMQ服务器 管理员启动小黑窗&#xff1a; rabbitmq-service install 添加rabbitMQ为本地服务 启动浏览器访问 htt…