Kafka-生产者分区

news2024/11/13 9:26:01

一、分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

生产者分区.png

kafka默认的分区器DefaultPartitioner

package org.apache.kafka.clients.producer.internals;

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;
/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 *
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

二.分区策略

1.随机策略

指明partition的情况下,直接将指明的值作为partition值;

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, K key, V value) {}

例如partition=1,所有数据写入分区1:

// 指定发送到1号分区
kafkaProducer.send(new ProducerRecord<>("first", 1, "", "record" + i),
        (recordMetadata, exception) -> {
            if (exception == null) {
                System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
            }
        });

实现随机策略版的 partition:

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        return ThreadLocalRandom.current().nextInt(partitions.size());
    }
      @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

2.按消息键保序策略

没有指明partition值但有key的情况下,将 key的hash值与topic的partition数进行取余得到partition值;

public ProducerRecord(String topic, K key, V value) {}

例如:key1的 hash值=5,key2的 hash值=6 ,topic的 partition数 =2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

// 指定发送到1号分区
kafkaProducer.send(new ProducerRecord<>("first", "a", "record" + i),
        (recordMetadata, exception) -> {
            if (exception == null) {
                System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
            }
        });

3.轮询策略

既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

public ProducerRecord(String topic, V value) {}

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

kafkaProducer.send(new ProducerRecord<>("first","record" + i),
        (recordMetadata, exception) -> {
            if (exception == null) {
                System.out.println("主题:" + recordMetadata.topic() + ";分区:" + recordMetadata.partition());
            }
        });

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

三、自定义分区器

1.需求

实现一个分区器实现,发送过来的数据中如果包含 tracy,就发往0号分区,不包含,就发往1号分区。

2.实现

  • 定义类实现 Partitioner 接口。
  • 重写 partition()方法。
  • 在生产者的配置中添加分区器参数。

MyPartitioner:

package kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @Title: MyPartitioner.java
 * @Package kafka
 * @Description: 自定义分区器
 * @Author: hongcaixia
 * @Date: 2023/1/21 21:24
 * @Version V1.0
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 tracy
        if (msgValue.contains("tracy")) {
            partition = 0;
        } else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

MyProducerPartition

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @Title: MyProducer.java
 * @Package kafka
 * @Description: 生产者使用自定义分区器
 * @Author: hongcaixia
 * @Date: 2023/1/20 21:24
 * @Version V1.0
 */
public class MyProducerPartition {

    public static void main(String[] args) {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092");
        // 指定序列化类型 key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //设置自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.MyPartitioner");

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "record" + i));
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

实现基于地理位置的分区策略

这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
假设集群中有一部分机器在北京,另外一部分机器在广州。
某机构计划为每个新注册用户提供一份注册礼品,比如南方的用户注册可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。
但是需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。换句话说,送甜豆腐脑的消费者程序只在广州机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向广州机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!
可以根据 Broker 所在的 IP 地址实现定制化的分区策略:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/177904.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git自学日记

添加暂存区 git add 提交本地库 git commit -m “日志信息” 修改文件 vim 修改文件名 按i进入编辑模式 按esc退出编辑摸模式 :wq 保存更改 历史版本 git reflog 查看版本信息 git log 查看版本详细信息 版本穿梭 git reset --hard 版本号 分支操作 创建分支: git br…

【数据结构】7.1 查找的基本概念

文章目录1. 查找表2. 关键字3. 查找4. 动态查找表和静态查找表5. 平均查找长度1. 查找表 问题&#xff1a;在哪里找&#xff1f; 答&#xff1a;在一个新的数据结构查找表上面找。 查找表&#xff1a; 查找表是由同一类型的数据元素&#xff08;或记录&#xff09;构成的集合…

操作系统真相还原_第5章第3节:加载内核(ELF格式分析)

文章目录用C语言写内核(例)二进制程序的运行方法ELF格式的二进制文件ELF文件格式数据类型ELF header的结构Elf32_Phdr的结构ELF文件实例分析将内核载入内存当前的OS信息当前内存规划源码boot.incmbr.sloader.s内核编译并写入硬盘用C语言写内核(例) 源码&#xff1a; int func…

图像处理 手写体英文字母的目标检测与识别 实验报告

获取本实验的项目代码和实验报告&#xff0c;请>点击此处< [0] 摘要 近年来&#xff0c;随着python的迅速崛起&#xff0c;人工智能、图像识别、计算机视觉等新兴学科变得火热起来。Python的发展也伴随着它的各种衍生库、衍生编辑器的发展&#xff0c;其中OpenCV是比较经…

恶意代码分析实战 7 WinDbg

配置WinDbg双机调试。 下载Windbg&#xff08;WDK&#xff09;。 事实上你自己的win10上应该会自带。 配置WinXP虚拟机的boot.ini 改成如图所示的样子 修改主机上Windb的属性。 修改成如图所示&#xff1a; 启动WinXP选择调试状态&#xff0c;启动Windbg即可开始调试。 7.…

【JavaWeb】JavaScript基础语法(下)

✨哈喽&#xff0c;进来的小伙伴们&#xff0c;你们好耶&#xff01;✨ &#x1f6f0;️&#x1f6f0;️系列专栏:【JavaWeb】 ✈️✈️本篇内容:JavaScript基础语法(上)&#xff01; &#x1f680;&#x1f680;代码托管平台github&#xff1a;JavaWeb代码存放仓库&#xff01…

程序员的自我修养第七章——动态链接

继续更新《程序员的自我修养》这个系列&#xff0c;主要是夏天没把它看完&#xff0c;补上遗憾。本篇来自书中第七章。 再说动态链接前&#xff0c;我们先阐明为什么要动态链接&#xff1a; 动态链接的产生来自静态链接的局限性。随着静态链接的发展&#xff0c;其限制也越来越…

zookeeper可视化工具

参考资料&#xff1a; 参考网址 使用过程&#xff1a; 首先打开网址&#xff0c;将资源克隆下来解压压缩包 打开压缩目录下 startup.bat ,填入对应地ip即可&#xff08;记得优先启动zookeeper&#xff0c;否则会报错&#xff09;

我的1周年创作纪念日

机缘 我目前还是一名六年级小学生&#xff0c;下半年便升入初中了。 我是在2021年上半年&#xff08;我四年级下期时&#xff09;开始学习C的。后来我自己想做一些小游戏&#xff0c;便经常要在百度上搜索&#xff0c;后来就发现CSDN中的内容比较全面&#xff0c;便加入了CSD…

MacOS下在Pycharm中配置Pyqt5工具(2023年新版教程)

前提&#xff1a;使用Anaconda的包管理工具进行管理。创建environment&#xff0c;然后在该Environment上进行下载操作&#xff01;&#xff01;&#xff01;一、安装相关模块安装pyqt5、pyqt5-tools两个基础包&#xff0c;命令如下&#xff1a;pip install -i https://pypi.tu…

CSS文本与字体(文本格式化/对齐/装饰/转换/间距/阴影/字体/样式/大小/简写属性)

目录 文本颜色 文本颜色和背景色 文本对齐 文本方向 垂直对齐 文字装饰 文本转换 文字缩进 字母间距 行高 字间距 空白 文本阴影 所有 CSS 文本属性 字体选择很重要 通用字体族 Serif 和 Sans-serif 字体之间的区别 一些字体的例子 CSS font-family 属性 字…

vue-countTo不兼容vue3解决方案

我们想要做一个数值增长的过度效果可以使用vue-count-to 官网的地址&#xff1a;https://www.npmjs.com/package/vue-count-to 官网的截图 vue2的使用方式 cnpm install -S vue-count-to在main.js import vueCountTo from "vue-count-to"; Vue.component("C…

【JavaEE】如何开始基础的Servlet编程(基于Tomcat服务器)

如何开始最简单的Servlet编程&#xff1f;&#xff08;基于Tomcat服务器&#xff09;知道了如何借助Tomcat开始进行最简单的Servlet编程后&#xff0c;我们就可以进一步完善功能制作一个基础的网站了。在此之前我们先了解一下Servlet的生命周期。Servlet的生命周期初始化init -…

C++ list

目录 一. 初步了解 1.构造、析构、赋值 2.容量 3.元素访问 4.增删 二. 模拟实现 框架 push_back 迭代器 带参构造、析构、赋值 增删 反向迭代器 所有代码 说白了&#xff0c;就是一个双向循环带头链表&#xff0c;由于我们在数据结构中已经学习过链表的知识&a…

macOS Big Sur 11.7.3 (20G1116) Boot ISO 原版可引导镜像

本站下载的 macOS Big Sur 软件包&#xff0c;既可以拖拽到 Applications&#xff08;应用程序&#xff09;下直接安装&#xff0c;也可以制作启动 U 盘安装&#xff0c;或者在虚拟机中启动安装。 请访问原文链接&#xff1a;https://sysin.org/blog/macOS-Big-Sur-boot-iso/&a…

Nginx学习整理|入门记录

目录 1. Nginx概述 1.1 Nginx介绍 1.2 Nginx下载和安装 1.3 Nginx目录结构 2. Nginx命令 3. Nginx配置文件结构 4. Nginx具体应用 4.1 部署静态资源 4.2 反向代理 4.3 负载均衡 1. Nginx概述 1.1 Nginx介绍 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件…

积分电路的并联电阻、反向放大电路的并联电容的区别?

运放反相比例放大电路中反馈电阻两端经常并联一个电容&#xff0c;而运放积分电路的反馈电容上常常并联一个电阻&#xff0c;两者电路结构相似&#xff0c;如下所示&#xff08;隐去阻容值&#xff09;&#xff0c;二者有何区别呢&#xff1f;电阻、电容分别又起到什么作用&…

Catboost

CatBoost简介 CatBoost是俄罗斯搜索巨头Yandex在2017年开源的机器学习库&#xff0c;是Boosting算法的一种&#xff0c;CatBoost和XGBoost&#xff0c;Lightgbm并称为GBDT三大主流神器&#xff0c;都是在GBDT算法框架下的一种改进实现&#xff0c;XGBoost是被广泛应用于工业界…

使用jstack解决线程爆满问题

问题发现生产应用现存在问题&#xff0c;影响到系统的使用&#xff0c;前端页面只配置了35个派生指标&#xff0c;后台任务生成20000多线程任务&#xff0c;占用了全部资源&#xff0c;导致其他系统也没资源可用&#xff0c;指标工厂也无法进一步使用&#xff0c;今天上午发的死…

Email Signature Manager 9.3 Crack

概述 Email Signature Manager为所有用户创建和部署电子邮件签名 包括合并的联系方式、公司徽标、社交媒体图标 和链接&#xff0c;甚至个性化内容&#xff0c;如用户照片 创建和附加电子邮件活动&#xff0c;向所有人介绍奖项&#xff0c; 活动或促销&#xff0c;或设置运行的…