极客时间Kafka - 02 为什么要分区|生产者的分区策略|轮询策略|随机策略|消息键保序策略

news2024/11/23 7:30:10

文章目录

      • 1. 为什么分区?
      • 2. Kafka 生产者的分区策略
        • 1. 轮询策略 RoundRobinPartitioner
        • 2. 随机策略 UniformStickyPartitioner
        • 3. 按消息键保序策略 DefaultPartitioner

我们在使用 Apache Kafka 生产和消费消息的时候,肯定是希望能够将数据均匀地分配到所有服务器上。比如很多公司使用 Kafka 收集应用服务器的日志数据,这种数据都是很多的,特别是对于那种大批量机器组成的集群环境,每分钟产生的日志量都能以 GB 数,因此如何将这么大的数据量均匀地分配到 Kafka 的各个 Broker 上,就成为一个非常重要的问题。

1. 为什么分区?

Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份,如下所示:

在这里插入图片描述

现在我抛出一个问题你可以先思考一下:你觉得为什么Kafka 要做这样的设计?为什么使用分区的概念而不是直接使用多个主题呢?

其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性
(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

值得注意的是,不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来
它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。

除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题

2. Kafka 生产者的分区策略

生产者发送的消息实体 ProducerRecord 的构造方法:

在这里插入图片描述

我们发送消息时可以指定分区号,如果不指定那就需要分区器,这个很重要,一条消息该发往哪一个分区,关系到顺序消息问题。下面我们说说 Kafka 生产者的分区策略。所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

① Partitioner 接口源码:

public interface Partitioner extends Configurable, Closeable {
    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
 
    void close();
 
    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class 参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。

② 实现自定义分区策略 DefinePartitioner:

public class DefinePartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
        int size = partitionInfos.size();
        if(null==keyBytes){
            return counter.getAndIncrement() % size;
        }else{
            return Utils.toPositive(Utils.murmur2(keyBytes) % size);
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

③ 显式地配置生产者端的参数 partitioner.class:

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                       StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       StringSerializer.class.getName());
        // 使用自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                       DefinePartitioner.class.getName());
        // kafka生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String,String>("test","hello,kafka");
        kafkaProducer.send(producerRecord).get();
        // 关闭资源
        kafkaProducer.close();
    }
}

虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种,分析源码我们查看到提供了三种分区策略:

在这里插入图片描述

1. 轮询策略 RoundRobinPartitioner

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。

在这里插入图片描述

这就是所谓的轮询策略,轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

RoundRobinPartitioner 轮询策略实现源码:

/**
   The "Round-Robin" partitioner
   
   This partitioning strategy can be used when user wants to distribute the writes to all    partitions equally. This is the behaviour regardless of record key hash. 
   这种分区策略可以用于用户想要消息被平均分配到所有分区,这是种与key的哈希值无关的行为
 */
public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取主题中所有的分区列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // 根据主题获取下一个原子数+1
        int nextValue = nextValue(topic);
        // 获取可用的分区列表
        List<PartitionInfo> availablePartitions 
            					= cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            // 取余,这样获取的就是一个轮询的方式,从可用的分区列表中获取分区
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            // 取余,这样获取的就是一个轮询的方式,从分区列表中获取分区
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        // 根据Topic获取Map中的AtomicInteger的值,获取不到就默认是0
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {}
}

2. 随机策略 UniformStickyPartitioner

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。

在这里插入图片描述

如果要实现随机策略版的 partition 方法,很简单,只需要两步即可:先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

UniformStickyPartitioner 随机策略实现源码:

public class UniformStickyPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}
am cluster The current cluster metadata

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return stickyPartitionCache.partition(topic, cluster);
    }

    public void close() {}
    
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

其核心的方法是 stickyPartitionCache.partition(topic, cluster),分析 StickyPartitionCache 源码:

public class StickyPartitionCache {
    
    private final ConcurrentMap<String, Integer> indexCache;
    
    public StickyPartitionCache() {
        this.indexCache = new ConcurrentHashMap<>();
    }

    public int partition(String topic, Cluster cluster) {
        Integer part = indexCache.get(topic);
        if (part == null) {
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        // 获取所有的分区列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        // 获取上一次发送消息的的分区系数
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        if (oldPart == null || oldPart == prevPartition) {
            // 获取所有可用的分区列表
            List<PartitionInfo> availablePartitions 
                				= cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                // 获取随机数
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                // 根据随机数和分区总数取余,获取系数
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                // 如果只有一个可用的分区,那么默认只能选择这个分区
                newPart = availablePartitions.get(0).partition();
            } else {
                // 获取的分区系数和上一次的不能一样
                while (newPart == null || newPart.equals(oldPart)) {
                    // 获取随机数
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    // 根据随机数和分区总数取余,获取系数
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Topic 对应的信息如果不存在 MapCache 就放入,存在就替换
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }
}

3. 按消息键保序策略 DefaultPartitioner

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

在这里插入图片描述

Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,版本不同则不一样0.9.0~2.3版本采用轮询策略,目前2.4、2.5采用随机算法

/**
   The default partitioning strategy:
    If a partition is specified in the record, use it
    If no partition is specified but a key is present choose a partition based on a hash of the key
    If no partition or key is present choose the sticky partition that changes when the batch is full.
 */
public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        // key如果不存在使用随机算法
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        // 根据key的字节数计算获取分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {}
    
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

我曾经给一个国企进行过 Kafka 培训,碰到的一个问题就是如何实现消息的顺序问题。这家企业发送的 Kafka 的消息是有因果关系的,故处理因果关系也必须要保证有序性,否则先处理了“果”后处理“因”必然造成业务上的混乱。

当时那家企业的做法是给 Kafka 主题设置单分区,也就是 1 个分区。这样所有的消息都只在这一个分区内读写,因此保证了全局的顺序性。这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势。

后来经过了解和调研,发现这种具有因果关系的消息都有一定的特点,比如在消息体中都封装了固定的标志位,后来我就建议他们对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区,这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利。

这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把标志位数据提取出来统一放到Key 中,这样更加符合 Kafka 的设计思想。经过改造之后,这个企业的消息处理吞吐量一下提升了 40 多倍,从这个案例你也可以看到自定制分区策略的效果可见一斑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/65029.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jenkins-jenkins凭证管理与代码拉取

什么是凭证&#xff1f; Jenkins经常与第三方插件如git&#xff0c;docker等交互&#xff0c;需要提供第三方的凭证&#xff0c;比如access token&#xff0c;用户名和密码等 可以使用插件Credentials Binding Plugin来管理这些凭证 jenkins凭证类型 jenkins可以管理以下凭证…

UEFI的一点点概识

最近看了一篇Blog讲的是关于PC安全的&#xff0c;其中很多的地方还是有一定相似之处。其中这个UEFI引起了我兴趣&#xff0c;以前安装系统的时候听说过这个名词。这里于是便来认识一下什么是UEFI。 前言 大多数人接触UEFI都是在PC的应用场景上&#xff0c;有在PC上安装过多操…

关闭二维码

关闭二维码 结果演示 概述 通过事件的绑定来实现&#xff0c;关闭二维码的效果。 构建HTML框架 <body><div class"box">二维码<img src"images/tao.png" alt""><i class"close-btn"></i></div&g…

第四十一篇 指令中的VNode

VNode 前面讲到了自定义指令的引入使用&#xff0c;以及结合封装swiper组件一起进行结合使用&#xff0c;还记在inserted 指令生命周期当中使用的参数吗&#xff1f;第一个参数是可以拿到DOM节点&#xff08;el&#xff09;&#xff0c;第二个参数是可以拿到使用自定义指令绑定…

NLP-信息抽取-三元组-联合抽取-多任务学习-2019:spERT【采用分类的思想实现联合抽取,实体抽取和关系抽取模型均为分类模型】

论文题目&#xff1a;Span-based Joint Entity and Relation Extraction with Transformer Pre-trainin 论文链接&#xff1a;https://arxiv.org/abs/1909.07755 论文代码&#xff1a;https://github.com/markus-eberts/spert SpERT模型是联合式抽取模型&#xff0c;同时抽取…

消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式

文章目录一、简单模式&#xff08;Hello World&#xff09;代码实现二、队列模式&#xff08;Work Queues&#xff09;轮训分发消息代码实现消息应答概述RabbitMQ持久化不公平分发三、发布订阅模式原理概述发布确认策略单个确认发布批量确认发布异步确认发布三种发布确认速度对…

MongoDB_实战部分(二)

目录一、MongoDB CRUD操作MongoDB 插入文档MongoDB 查询文档MongoDB 修改文档MongoDB 删除文档练习题二、Mongoose三、VSCode连接MongoDB模块化一、MongoDB CRUD操作 MongoDB 插入文档 /*向数据库插入文档db.<collection>.insert()db.<collection>.insertOne() 插…

SDK 2019.1 - GNU Debugger (GDB) 不正常工作

报错截图 报错显示 warning: Can not parse XML target description; XML support was disabled at compile time warning: No executable has been specified and target does not support determining executable automatically. Try using the “file” command. " 解…

ROS service简单使用示例

1、为什么要使用ROS service 之前写过一篇关于ROS topic的内容。对于实时性、周期性的消息&#xff0c;使用topic来传输是最佳的选择。topic是一种点对点的单向通信方式&#xff0c;这里的“点”指的是node&#xff0c;也就是说node之间可以通过topic方式来传递信息。topic要经…

详细设计阶段复习

详细设计详细设计:确定具体实现方案,得出精确描述任务:结构程序设计:三种基本控制结构(选择[if]/顺序/循环[while|for])实现任何单入单出的程序人机界面设计:属于接口设计的重要组成问题设计指南设计工具:描述处理过程的工具程序流程图(历史悠久)盒图(N-S图): 不违背结构程序设…

[附源码]计算机毕业设计JAVA疫情环境下的酒店管理系统

[附源码]计算机毕业设计JAVA疫情环境下的酒店管理系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM…

机器学习实战——股票close预测

前言 用股票历史的close预测未来的close。 另一篇用深度学习搞得&#xff0c;见&#xff1a;深度学习实战——CNNLSTMAttention预测股票 技术栈 xgboostpython 原理 都是很简单的小玩意&#xff0c;试了下发现预测的还不错&#xff0c;先上效果图&#xff1a; 有点惊讶&a…

CSS中 设置( 单行、多行 )超出显示省略号

1. 设置超出显示省略号 css设置超出显示省略号可分两种情况&#xff1a; 单行文本溢出显示省略号…多行文本溢出显示省略号… 但使用的核心代码是一样的&#xff1a;需要先使用 “overflow:hidden;” 来把超出的部分隐藏&#xff0c;然后使用“text-overflow:ellipsis;”当文…

Java进阶架构师之如何画好架构图?阿里大神手把手教你!

1、什么是架构 架构就是对系统中的实体以及实体之间的关系所进行的抽象描述&#xff0c;是一系列的决策。 架构是结构和愿景。 系统架构是概念的体现&#xff0c;是对物/信息的功能与形式元素之间的对应情况所做的分配&#xff0c;是对元素之间的关系以及元素同周边环境之间…

基于灰狼算法优化的lssvm回归预测-附代码

基于灰狼算法优化的lssvm回归预测 - 附代码 文章目录基于灰狼算法优化的lssvm回归预测 - 附代码1.数据集2.lssvm模型3.基于灰狼算法优化的LSSVM4.测试结果5.Matlab代码摘要&#xff1a;为了提高最小二乘支持向量机&#xff08;lssvm&#xff09;的回归预测准确率&#xff0c;对…

Java基础:Collection、泛型

第一章 Collection集合 1.1 集合概述 在前面使用过集合ArrayList&#xff0c;那么集合到底是什么呢&#xff1f; 集合&#xff1a;集合是java中提供的一种容器&#xff0c;可以用来存储多个数据。 集合和数组既然都是容器&#xff0c;它们有啥区别呢&#xff1f; 数组的长…

DPDK 数据传输流程

在进行正式的收发包之前&#xff0c;DPDK需要做一些初始化操作&#xff0c;包括&#xff1a; 初始化一个或多个mbuf_pool&#xff0c;用来存储从网卡中接受的数据包修改网卡配置&#xff0c;指定其接受队列的个数&#xff08;通常每个转发核一个&#xff09;&#xff0c;长度&…

【Hadoop 2.7.1】HDFS Shell操作的简单试验

【Hadoop 2.7.1】HDFS Shell操作的简单试验 HDFS提供了多种数据访问的方式&#xff0c;其中&#xff0c;命令行的形式是最简单的&#xff0c;同时也是开发者最容易掌握的方式 文章目录【Hadoop 2.7.1】HDFS Shell操作的简单试验HDFS Shell命令的使用上传文件(put)查看文件列表(…

全网最详细Centos7搭建Redis集群

1、准备三台服务器 没有服务器的话&#xff0c;虚拟机也一样 2、每台服务器安装上redis 相关网址&#xff1a; CentOS7安装Redis完整教程_长头发的程序猿的博客-CSDN博客_centos7 redis安装 3、修改“139.196.105.140&#xff08;主机&#xff09;”的配置文件 vim /etc/r…

路由策略简介、配置举例

路由策略简介、配置举例 定义 路由策略主要实现了路由过滤和路由属性等设置功能&#xff0c;他通过改变路由属性&#xff08;包括可达性&#xff09;来改变网络流量所经过的路径。 目的优势 目的 路由协议在发布、接收和引入路由信息时&#xff0c;根据实际组网需求实施一些策…