Kafka学习笔记(三)Kafka分区和副本机制、自定义分区、消费者指定分区

news2024/9/30 20:39:41

文章目录

  • 前言
  • 7 分区和副本机制
    • 7.1 生产者分区写入策略
      • 7.1.1 轮询分区策略
      • 7.1.2 随机分区策略
      • 7.1.3 按key分区分配策略
      • 7.1.4 自定义分区策略
        • 7.1.4.1 实现`Partitioner`接口
        • 7.1.4.2 实现分区逻辑
        • 7.1.4.3 配置使用自定义分区器
        • 7.1.4.4 分区测试
    • 7.2 消费者分区分配策略
      • 7.2.1 RangeAssignor(范围分配策略)
      • 7.2.2 RoundRobinAssignor(轮询分配策略)
      • 7.2.3 StickyAssignor(粘性分配策略)
      • 7.2.4 消费者组的Reblance机制
    • 7.3 副本机制
      • 7.3.1 生产者的`acks`参数
      • 7.3.2 `acks`参数配置为0
      • 7.3.2 `acks`参数配置为1
      • 7.3.3 `acks`参数配置为-1或all
      • 7.3.4 基准测试
    • 7.4 消费指定分区数据

前言

Kafka学习笔记(一)Linux环境基于Zookeeper搭建Kafka集群、Kafka的架构
Kafka学习笔记(二)Kafka基准测试、幂等性和事务、Java编程操作Kafka

7 分区和副本机制

7.1 生产者分区写入策略

生产者写入消息到Topic,Kafka将依据不同的策略将数据分配到不同的分区中,主要有以下策略:

7.1.1 轮询分区策略

默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区。

7.1.2 随机分区策略

每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区,但后续轮询策略表现更佳,所以基本上很少会使用随机策略。

7.1.3 按key分区分配策略

根据key值,通过一定的算法将消费分配到不同分区。按key分配策略,有可能会出现「数据倾斜」,例如某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到这个分区中,造成该分区的消息数量远大于其他的分区。

7.1.4 自定义分区策略

轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储(分区内局部有序),但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。

7.1.4.1 实现Partitioner接口

在Java中,自定义分区需要实现org.apache.kafka.clients.producer.Partitioner接口,该接口定义了如下方法:

  • topic:针对特定Topic使用不同的分区规则。
  • keykeyBytes:针对特定key值使用不同的分区规则。
  • valuevalueBytes:针对特定的消息内容使用不同的分区规则。
  • cluster:Cluster对象提供了Topic的分区信息,可以据此动态调整分区策略。
7.1.4.2 实现分区逻辑

重写partition()方法,实现分区逻辑。例如:

/**
 * 自定义分区器
 */
public class MyKafkaPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if(key != null) {
            String keyString = (String) key;
            // key 以 animal 开头时分配到分区 0
            if(keyString.startsWith("animal")) {
                return 0;
            }
            // key 以 food 开头时分配到分区 1
            if(keyString.startsWith("food")) {
                return 1;
            }
        }
        // 默认分配到分区 0
        return 0;
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public void close() {
    }
}
7.1.4.3 配置使用自定义分区器

在Kafka生产者配置中,使用自定义分区器的类名:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());
7.1.4.4 分区测试

向3分区1副本的Topic[topic_3_1]发送key值为animal_rabbit的消息:

执行结果如下:

将key值修改为food_apple,则分配的分区是1:

7.2 消费者分区分配策略

通过消费者组(Consumer Group),Kafka允许多个消费者共同处理某个Topic的消息,但生产者已经将消息写入了Topic的不同分区,因此首先要解决哪个消费者消费哪个分区的数据的问题,即消费者分区分配策略问题。

在Java中,ConsumerPartitionAssignor接口用来定制消费者的分区分配策略,该接口的3个子类实现分别对应3种消费者分区分配策略。

7.2.1 RangeAssignor(范围分配策略)

范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。

注意:范围分配策略是针对每个Topic的。

范围分配策略有两个算法公式:

  • n = 分区数量 / 消费者数量
  • m = 分区数量 % 消费者数量

策略结果是:前m个消费者消费n+1个分区,剩余消费者消费n个分区。如图:

7.2.2 RoundRobinAssignor(轮询分配策略)

轮询分配策略是将消费者组内所有消费者以及消费者所订阅的所有Topic的分区按照字典序排序(Topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区分配给每个消费者。

注意:轮询分配策略不局限于单个Topic。

如上图所示,3个消费者共订阅了2个Topic,共8个分区,将8个分区按照字典序排序后,开始轮询:

  • TopicA p0 → consumer0
  • TopicA p1 → consumer1
  • TopicA p2 → consumer2
  • TopicA p3 → consumer0
  • TopicB p0 → consumer1
  • TopicB p1 → consumer2
  • TopicB p2 → consumer0
  • TopicB p3 → consumer1
  • Topica p0 → consumer2

7.2.3 StickyAssignor(粘性分配策略)

从Kafka 0.11.x版本开始,引入此类分配策略。其主要目的在于使分区分配尽可能均匀,同时在Topic或消费者发送变动需要重新分配时,分区的分配尽可能与上一次分配保持相同。

粘性分配策略主要作用在需要重新分配的情况,而不需要重新分配时和轮询分配策略类似。如图:

如果consumer2崩溃了,此时需要进行重新分配。而粘性分配策略会保留重新分配之前的分配结果,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。

例如:之前consumer0、consumer1正在消费某几个分区,但由于需要重新分配,导致consumer0、consumer1需要取消处理,之后重新消费之前正在处理的分区,导致不必要的系统开销。而粘性分配策略可以明显减少这样的系统资源浪费。

7.2.4 消费者组的Reblance机制

上面提到了消费者的分区重新分配,其实就是Kafka中的Rebalance机制,称之为再均衡

Reblance机制是Kafka中确保消费者组下所有的consumer如何达成一致,分配订阅的Topic的每个分区的机制。

Rebalance触发的时机有:

  • 1)消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。

  • 2)订阅的Topic个数发生变化。消费者可以订阅多个主题,假设当前消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。

  • 3)订阅的Topic分区数发生变化。

当然,Reblance机制的不良影响也挺大的。发生Rebalance时,消费者组下的所有consumer都将停止工作,直到Rebalance完成。

7.3 副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以从其他备份上读取,保障数据可用。

7.3.1 生产者的acks参数

生产者配置的acks参数,表示当生产者生产消息时,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。

例如,在之前的测试代码中有如下配置:

props.put("acks", "all");

7.3.2 acks参数配置为0

acks参数配置为0,生产者不会等到Broker确认,而直接发送下一条数据。因此它的性能最高,但有可能会丢失数据。

7.3.2 acks参数配置为1

acks参数配置为1,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。

7.3.3 acks参数配置为-1或all

acks参数配置为1,生产者会等待所有副本同步完成并确认接收后,才会发送下一条数据,性能最低。

7.3.4 基准测试

分别对不同的acks参数进行基准测试,acks参数为0时的命令如下,其余类推:

bin/kafka-producer-perf-test.sh --topic topic_1_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=0

基准测试结果如下:

指标(1分区1副本)ack=0ack=1ack=-1/all
吞吐量18299.132255 records/sec19160.979049 records/sec13137.876761 records/sec
吞吐速率17.45 MB/sec18.27 MB/sec12.53 MB/sec
平均延迟时间1769.71 ms1692.25 ms2473.96 ms
最大延迟时间5490.00 ms4455.00 ms10434.00 ms

由此可见,acks参数为0和1时性能相当,为-1/all时性能大幅下降。

7.4 消费指定分区数据

如上图所示的Kafka消费者代码,只需要指定Topic,就可以直接读取消息,而不需要管理分区、副本、offset等元数据,实现方便。

这是因为,Kafka的偏移量offset是由Zookeeper管理的,消费者会自动根据上一次在Zookeeper中保存的offset去接着获取数据。不同的消费者组,在Zookeeper中保存了不同的offset,这样不同消费者组读取同一个Topic就不会有任何影响。

但以上代码也有缺点,就是不能细化控制分区、副本、offset等,从而无法从指定位置读取数据。

如果想要手动指定消费分区,则不能再使用之前的subscribe()方法订阅主题,而是要用assign()方法:

// 3. 订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
// kafkaConsumer.subscribe(Arrays.asList("my_topic"));

String topic = "topic_3_1";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
TopicPartition partition2 = new TopicPartition(topic, 2);
// 手动指定只消费分区1的数据
kafkaConsumer.assign(Arrays.asList(partition1));

利用自定义分区策略(详见7.1.4节),向Topic[topic_3_1]的分区0、分区1分别写入数据:

但消费者只消费了分区1的数据:

本节完,更多内容请查阅分类专栏:微服务学习笔记

感兴趣的读者还可以查阅我的另外几个专栏:

  • SpringBoot源码解读与原理分析
  • MyBatis3源码深度解析
  • Redis从入门到精通
  • MyBatisPlus详解
  • SpringCloud学习笔记

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2180643.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL】常见的SQL优化方式(二)

目录 1、limit 优化 &#xff08;1&#xff09;延迟关联&#xff08;索引覆盖子查询&#xff09; &#xff08;2&#xff09; 已知位置查询 2、group by 优化 &#xff08;1&#xff09;使用索引 &#xff08;2&#xff09;避免排序 &#xff08;3&#xff09;分析查询 …

LeetCode[中等] 739. 每日温度

给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 思路 栈 暴力法为遍历列…

毕业论文不会写?教你如何利用AI来帮我完成初稿

AI是个工具&#xff0c;它能帮你提高效率&#xff0c;但可不能完全替代你的工作。所以&#xff0c;你要做的是学会如何利用AI来辅助你的毕业论文写作。 writehelp智能写作辅导&#xff1a;可节约1000小时写作时间帮助快速完成初稿的撰写。 传送门&#xff1a;http://www.write…

Qt Linguist手册-翻译员

翻译人员 Qt Linguist 是为 Qt 应用程序添加翻译的工具。一旦安装了 Qt&#xff0c;就可以像开发主机上的其他应用程序一样启动 Qt Linguist。 Qt Linguist 主窗口包含一个菜单栏和以下视图&#xff1a; 上下文 (F6) 用于从上下文列表中选择要翻译的字符串。字符串 (F7) 用于…

软考中级网络工程师下午题近五年笔记

刷题地址&#xff1a;软考达人—专业的软考刷题题库&#xff0c;软考历年真题&#xff0c;软考模拟考试&#xff0c;软考考前押题。柴丁科技 (ruankaodaren.com) 上面这个网站有的图不清晰&#xff0c;也可以看这个网站策未来网校——在线刷题、智能刷题 (ceweilai.cn) 2019…

[leetcode] 71. 简化路径

文章目录 题目描述解题方法栈java代码复杂度分析 题目描述 给你一个字符串 path &#xff0c;表示指向某一文件或目录的 Unix 风格 绝对路径 &#xff08;以 / 开头&#xff09;&#xff0c;请你将其转化为 更加简洁的规范路径。 在 Unix 风格的文件系统中规则如下&#xff1…

springboot+大数据+基于协同过滤算法的校园食堂订餐系统【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

25:stm32的低功耗模式

低功耗模式 1、PWR电源控制2、低功耗模式 1、PWR电源控制 PWR&#xff08;Power Control&#xff09;电源控制。PWR负责管理STM32内部的电源供电部分&#xff0c;可以实现可编程电压监测器和低功耗模式的功能&#xff0c;这里我们只学习低功耗模式的功能&#xff0c;低功耗模式…

【AI】AIOT简介

随着技术的快速发展&#xff0c;人工智能AI和物联网IoT已经成为当今最热门的技术领域。AIOT是人工智能和物联网的结合&#xff0c;使物联网设备更加智能化&#xff0c;能够进行自主决策和学习的技术。 通过物联网产生、收集来自不同维度的、海量的数据存储于云端、边缘端&#…

Netty源码-业务流程之写数据

Netty基本介绍&#xff0c;参考 Netty与网络编程 1、源码分析&#xff0c;EchoServerHandler之Write流程 1.1 write流程入口 通常我们通过ChannelRead收到消息后&#xff0c;需要给一个响应&#xff0c;通过ctx.write()将响应返回客户端。 在自定义handler的channelRead方法…

极限电流型氧传感器的工作原理以及有哪些应用场景?

极限电流型氧传感器的工作原理&#xff1a; 极限电流型氧传感器的工作原理基于稳定ZrO2固体电解质的氧泵作用。在已稳定化ZrO2两侧被覆铂电极&#xff0c;阴极侧用有气体扩散孔的罩接合&#xff0c;形成阴极空腔。在一定的温度下&#xff0c;当ZrO2电极两侧加一定电压时&#…

使用AOP处理参数

说明&#xff1a;在一些时候&#xff0c;我们需要在接口介绍到参数前处理参数&#xff0c;像参数校验、参数转换等&#xff0c;本文介绍如何使用AOP来实现此需求。 场景 需求&#xff1a;有一批开放给第三方调用的接口&#xff0c;之前传递的都是用户表的ID&#xff0c;现在需…

vscode 内网不联网如何导入vscode插件

如果有小伙伴百度到这篇文章&#xff0c;那一定是遇到了在内网开发不能联网的问题&#xff0c;那就往下看看吧。 安装一个新的Visual Studio Code&#xff0c;需要必要的一些插件&#xff0c;但是不能联网&#xff0c;于是自带的扩展程序安装便不能用了。 1、在一台能访问外网…

Kali Linux入门教程(非常详细)从零基础入门到精通,看完这一篇就够了。

作为一名从事渗透测试的人员&#xff0c;不懂Kali Linux的话&#xff0c;就out了。它预装了数百种享誉盛名的渗透工具&#xff0c;使你可以更轻松地测试、破解以及进行与数字取证相关的任何其他工作。 今天给大家分享一套Kali Linux资料合集&#xff0c;包括12份Kali Linux渗透…

数据结构-栈(理解版)

一、栈的定义 相信大家对于栈或多或少有一些了解&#xff0c;可能大多数人会告诉你栈是一种先进后出的数据结构。这其实说了跟没说一样(❁◡❁)&#xff01;当然&#xff08;last in&#xff0c;first out&#xff09;是栈最有特色的性质。 这里可以给大家一些比较好理解的例…

车辆重识别(改进的去噪扩散概率模型)论文阅读2024/9/29

所谓改进的去噪扩散概率模型主要改进在哪些方面&#xff1a; ①对数似然值的改进 通过对噪声的那个方差和T进行调参&#xff0c;来实现改进。 ②学习 这个参数也就是后验概率的方差。通过数据分析&#xff0c;发现在T非常大的情况下对样本质量几乎没有影响&#xff0c;也就是说…

Python库matplotlib之四

Python库matplotlib之四 小部件(widget)RadioButtons构造器APIs应用实列 Slider构造器APIs应用实列 小部件(widget) 小部件(widget)可与任何GUI后端一起工作。所有这些小部件都要求预定义一个Axes实例&#xff0c;并将其作为第一个参数传递。 Matplotlib不会试图布局这些小部件…

基于Springboot+Vue的c语言学习辅导网站的设计与实现 (含源码数据库)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统中…

美洽客户服务AI Agent 1.0,全渠道多场景赋能业务增长

“到 2025 年&#xff0c;由 AI 驱动的客户服务交互将增长 400%。” ——Gartner “70% 的企业报告称&#xff0c;在实施 AI 驱动的客户服务平台后&#xff0c;客户满意度分值提升。” ——麦肯锡 在美洽 AI 中心负责人看来&#xff0c;未来几年&#xff0c;AI 之于企业将由辅助…

国内ChatGPT镜像网站整理汇总【OpenAI o1/GPT 4o】-2024/10月最新

一、中文镜像站 ①yixiaai.com 支持GPT4、4o以及o1&#xff0c;支持MJ绘画、文件上传 ②chat.lify.vip 支持通用全模型&#xff0c;支持文件读取、插件、绘画、AIPPT ③AI Chat 支持GPT3.5/4&#xff0c;4o以及MJ绘画 1. 什么是镜像站 镜像站&#xff08;mirrored site&am…