Kafka第四篇——生产数据总体概括,源码解析分区策略,数据收集器,Sender发送线程,key值

news2024/10/6 7:09:04

目录

流程图以及总体概述

拦截器

分区器以及分区计算策略

为啥进行分区计算?

producer生产者怎么知道有哪些分区?

分区计算

如何自定义实现分区器?

想说的在图里啦!宝宝!💡 ​编辑

如果key值忘记传递了呢!?

数据校验

数据收集器

注意

Sender发送线程


流程图以及总体概述

producer进行发送record,record对象包含topic,key,value,partition,时间戳,通过拦截器,将数据信息发送给broker,但是咱们也不知道把数据信息发送给哪个broker,而我们的Metadata就可以获取出来这个,如下面代码就是获取到9092.获取到缓存,放在底层。然后经过key对象的序列化,value对象的序列化,对应在代码中就是,configMap.put()那两行,并且这个是必须写的。然后经过分区器,partition,每个数据需要发送到broker中,每个消息发送到特定的主题,主题分为多个分区。kafka在发送数据时候,可以将数据发送到指定主题的指定分区,kafka会自动决定将消息发送到那个分区。分区器有那种判断发送给那个broker。然后进行数据校验。在数据收集器当中,相当于一个缓冲池,将同一个主题的数据可以存放在一个队列中,按“批”为单位进行发送,提高效率,并且指定了每批的大小是16K,

数据已经缓存到数据收集器后,就可以进行发送数据喽!此时就不会按topic为单位进行发送了,就可以重新整合,以节点为主!(why??因为不同的topic可以发送给同一个节点呀傻瓜!也就是说,在缓冲区以topic为单位,在发送线程中以节点为单位)封装请求,然后放在缓冲区中。再由网络通信从缓冲区中取出,发送给socket。在缓冲区,需要注意概念,在途请求缓冲区为5,表示同一个节点同一时间处理的请求数量。

拦截器

数据的规范化处理。可以有多个,可以按顺序执行数据的被拦截。和框架那块的一样。

onsend方法就是主要进行执行拦截规则的,for(ProducerInterceptor<K,V> interceptor:this.intercept)就可以循环执行多个拦截器,并且,看try,catch内容,无论当前拦截器发生什么异常,都不会影响到下一个拦截器的执行,更不会影响整个数据的发送。

自定义实现拦截器,帮助自己更好地了解拦截器。

java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class ValueInterceptorTest implements ProducerInterceptor<String, String> {

    /**
     * 实现拦截器规则
     *
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        
    }

    /**
     * 当记录被Broker确认接收时调用
     *
     * 
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 这个方法在记录被Broker确认接收时被调用
        // 根据确认情况实现自定义的处理逻辑
    }

    /**
     * 关闭拦截器时调用
     */
    @Override
    public void close() {
        
    }

    /**
     * 配置拦截器时调用
     *
     *configs 配置信息
     */
    @Override
    public void configure(Map<String, ?> configs) {
        
    }
}

分区器以及分区计算策略

为啥进行分区计算?

 数据发送给某个主题,主题会有很多分区,会在不同的broker当中,所以要算分区编号,不然连数据要发送给主题哪个节点都不知道。但是分区标号也得有范围呀!

producer生产者怎么知道有哪些分区?

从元数据缓存中获取到producer需要的主题相关信息

意味着只要元数据信息缓存了,主题的相关信息我们就可以拿到。

 分区器通过Matadata获取到分区,副本id,leadid之类的,

分区计算

¹²³⁴ 如果参数中指定了分区编号就直接返回

如何自定义实现分区器?

1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    /**
     * 配置分区器
     *
     * @param configs 配置信息
     */
    @Override
    public void configure(Map<String, ?> configs) {
       

    }

    /**
     * 计算分区
     *
     * @param topic       主题名称
     * @param key         消息键,可以为null
     * @param keyBytes    消息键的字节数组表示,可以为null
     * @param value       消息值
     * @param valueBytes  消息值的字节数组表示
     * @param cluster     Kafka集群信息
     * @return 分配的分区ID
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 如果键为null,则使用轮询分区策略
        if (keyBytes == null) {
            return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
        }

        // 使用键的hashCode来计算分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    /**
     * 关闭分区器
     */
    @Override
    public void close() {
        // 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作
    }
}

想说的在图里啦!宝宝!💡 

嘿嘿,这里解决了之前的问题,key并不像之前学到的hashmap中消费者用来消费的key,它的核心作用就是用来进行分区计算

这个点就可以从:没有指定特定的分区标号,并且分区标号没有超过范围!序列化key以及分区器不忽略key的情况下看出来。partitionForKey()方法中就用不加密的hash算法并且对分区数量进行取余处理计算。

如果key值忘记传递了呢!?

return RecordMetadata.UNKNOWN_PARTITION;(这是一个表示未知分区的常量)。表明当前生产者无法确定消息发送到哪个分区,可能需要进一步处理或记录错误信息。那感觉也不太对啊,不知道把key发送到哪一个分区!

其实他是在数据收集器那一步追加了,看这个accumulator.append方法!

点进去哦!分区标号计算:粘性分区策略

如果没有进行传递key参数,也就是当前分区是未知分区,就会根据当前主题的分区负载情况来动态获取分区标号。这就是一种优化后的粘性分区策略!如图1.1

🤔图1.1  当前分区是未知分区,就会根据当前主题的分区负载因子来动态获取分区标号。

会根据当前分区负载情况判断去那个分区!如图1.2

✅当分区负载情况为空,就动态去随机选择分区,然后就尽可能的给这个分区追加数据(粘性分区策略),并且也不能超过数值batch.size=16K。如果超过这个阈值就会切换到下一个分区。并且更新分区负载情况。

✅当前主题分区负载情况不为空,那就不用随机生成了。会根据分区负载使用频率随机生成一个随机权重,然后利用二分查找算法找与权重相近的值,根据这个值获取到相应的分区,就可以得到我们的分区标号啦!

图1.2

数据校验

当数据校验成功,数据就到达了数据收集器当中。数据收集器,生产的数据作为一个临时的存储。

数据收集器

 

如果直接生产一条数据就通过网络通信来发送,这样做效率很低哦!像javaio流读取文件一样,读一个字节写一个字节,性能很低呀!

所以就有了ProducerBatch双端队列,从很减少频繁的网络交互,提高传输效率!

在神魔时候真正进行网络交互呢??

嘿嘿,看最大范围,batch.size=16K。在前面分区计算中,有一个粘性分区策略(一旦确定了一个分区,就尽可能往这个分区中追加数据,追加数据就是往producebatch中追加数据,当到达16K,就会被sender检测到),里面就有“没有传递key,如果没有分区负载情况,就会随机生成分区,不能超过最大

注意

🤔而且这里的16k意思是超过16k就不再接收数据了,不意味着数据不能超过16k!比如数据是20k,kafka要保证数据的完整性,发现这个数据值大于16k,就立马关闭,不再接收!

Sender发送线程

 kafka底层就采用了很多生产者消费者模型,一个放一个取。数据收集器是按照主题分区来放数据,而Sender发送线程会按照broker重新整合。(主题的不同分区会放在不同的节点当中,所以有可能存在不同主题的分区在同一个节点当中)。

当整合好之后,就会封装成produceRequest,进而发送给网络客户端。

默认发送时间0,也就是消息取过来就可以直接发送了!

注意这个在途请求缓冲区数量:5

  • Broker 和 Topic:每个 broker 可以存储一个或多个 topic 的数据分片,Kafka 集群的每个 broker 都可以服务于多个 topic
  • Topic 和 分区:每个 topic 可以被分为多个分区,分区内的消息顺序是有序的,而不同分区之间的消息顺序则不保证,分区允许 Kafka 横向扩展和提高并行处理能力。
  • Broker 和 分区:每个 broker 可能会存储多个 topic 的多个分区数据,这样在整个 Kafka 集群中就形成了数据的分布式存储和处理能力。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1905212.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue移动端地图App:van-uploader导致的卡顿问题

问题描述 基于Vue3+Vant IU 4开发的移动端地图App,在进行地图点位上报、上报记录查看过程中,出现App卡顿、甚至闪退的问题,进行问题定位之后,发现是van-uploader组件导致的问题。 van-uploader文件上传组件 van-uploader组件用于将本地的图片或文件上传至服务器,并在上传…

园区、社区、乡村的智能管理

智慧园区、社区、乡村管理系统是现代信息技术在城市化进程中的重要应用,它们通过集成多种技术手段,实现对园区、社区、乡村的全面、高效、智能化管理。以下是对这三种管理系统的详细阐述: 一、智慧园区管理系统 1. 定义与目的 智慧园区管理系统是运用物联网、云计算、大数…

Canal架构以及使用规范

Canal架构以及使用规范 一、Canal的作用 相关文档&#xff1a;GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 MySQL主备复制原理 MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events&#xff0c;可…

【做一道算一道】和为 K 的子数组

给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1], k 2 输出&#xff1a;2 示例 2&#xff1a; 输入&#xff1a;nums [1,2,3],…

分布式整合

一、分布式架构介绍 什么是分布式系统 分布式系统指一个硬件或软件组件分布在不同的网络计算机上&#xff0c;彼此之间仅仅通过消息传递进行通信和协调的系统。 通俗的理解&#xff0c;分布式系统就是一个业务拆分成多个子业务&#xff0c;分布在不同的服务器节点&#xff0…

【数据结构与算法】快速排序霍尔版

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;《数据结构与算法》 期待您的关注 ​

找不到x3daudio1_7.dll怎么修复?一招搞定x3daudio1_7.dll丢失问题

当你的电脑突然弹出提示&#xff0c;“找不到x3daudio1_7.dll”&#xff0c;这时候你就需要警惕了。这往往意味着你的电脑中的程序出现了问题&#xff0c;你可能会发现自己无法打开程序&#xff0c;或者即便打开了程序也无法正常使用。因此&#xff0c;接下来我们要一起学习一下…

【简单介绍下Memcached】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

LabVIEW从测试曲线中提取特征值

在LabVIEW中开发用于从测试曲线中提取特征值的功能时&#xff0c;可以考虑以下几点&#xff1a; 数据采集与处理&#xff1a; 确保你能够有效地采集和处理测试曲线数据。这可能涉及使用DAQ模块或其他数据采集设备来获取曲线数据&#xff0c;并在LabVIEW中进行处理和分析。 特…

Wormhole Filters: Caching Your Hash on Persistent Memory——泛读笔记

EuroSys 2024 Paper 论文阅读笔记整理 问题 近似成员关系查询&#xff08;AMQ&#xff09;数据结构可以高效地近似确定元素是否在集合中&#xff0c;例如Bloom滤波器[10]、cuckoo滤波器[23]、quotient滤波器[8]及其变体。但AMQ数据结构的内存消耗随着数据规模的增长而快速增长…

Kubernetes集群性能测试之kubemark集群搭建

Kubernetes集群性能测试之kubemark集群搭建 Kubemark是K8s官方提供的一个对K8s集群进行性能测试的工具。它可以模拟出一个K8s cluster&#xff08;Kubemark cluster&#xff09;&#xff0c;不受资源限制&#xff0c;从而能够测试的集群规模比真实集群大的多。这个cluster中ma…

针对tcp不出网打——HTTP隧道代理(以CFS演示)

目录 上传工具到攻击机 使用说明 生成后门文件 由于电脑短路无法拖动文件&#xff0c;我就wget发送到目标主机tunnel.php文件​ 成功上传​ 可以访问上传的文件 启动代理监听 成功带出 后台私信获取弹药库工具reGeorg 上传工具到攻击机 使用说明 生成后门文件 pyt…

Android OpenGL ES 离屏幕渲染2——获取渲染结果并显示到ImageView控件中,使用最简模型展示

简介&#xff1a; 紧接上文&#xff0c;本文将用一个不包含顶点shader和片元shader的最小模型讲述如何把通过EGL创建的OpenGL ES环境渲染后的结果进行提取&#xff0c;单纯输出一片铺满视口的红色的像素。 EGL环境创建逻辑&#xff1a; 先看完整代码&#xff1a; package com.c…

异常组成、作用、处理方式(3种)、异常方法、自定义异常

目录 异常的组成&#xff1a;运行异常与编译异常 两者区别&#xff1a;编译异常用来提醒程序员&#xff0c;运行异常大部分是由于参数传递错误导致 异常作用&#xff1a; 作用1&#xff1a;就是平时的报错&#xff0c;方便我们找到报错的来源 作用2&#xff1a;在方法内部…

华为机试HJ51输出单向链表中倒数第k个结点

华为机试HJ51输出单向链表中倒数第k个结点 题目&#xff1a; 想法&#xff1a; 因为要用链表&#xff0c;且要找到倒数第k个结点&#xff0c;针对输入序列倒叙进行构建链表并找到对应的元素输出。注意因为有多个输入&#xff0c;要能接受多次调用 class Node(object):def __…

[Godot3.3.3] – 人物死亡动画 part-2

前言 在上一个 part 中已经完成了大部分的逻辑&#xff0c;现在进行一些新的修改。 增加重力 首先将 PlayerDeath 中的 AnimationPlayer 设置为自动播放。 返回 PlayerDeath.gd 并增加一个重力 300&#xff0c;防止玩家的尸体腾空运动。 var gravity 1000 _process 函数中…

某大会的影响力正在扩大,吞噬了整个数据库世界!

1.规模空前 你是否曾被那句“上有天堂&#xff0c;下有苏杭”所打动&#xff0c;对杭州的湖光山色心驰神往&#xff1f;7月&#xff0c;正是夏意正浓的时节&#xff0c;也是游览杭州的最佳时期。这座古典与现代交融的城市将迎来了第13届PostgreSQL中国技术大会。作为全球数据库…

基于深度学习LightWeight的人体姿态之行为识别系统源码

一. LightWeight概述 light weight openpose是openpose的简化版本&#xff0c;使用了openpose的大体流程。 Light weight openpose和openpose的区别是&#xff1a; a 前者使用的是Mobilenet V1&#xff08;到conv5_5&#xff09;&#xff0c;后者使用的是Vgg19&#xff08;前10…

二、Spring

二、Spring 1、Spring简介 1.1、Spring概述 官网地址&#xff1a;https://spring.io/ Spring 是最受欢迎的企业级 Java 应用程序开发框架&#xff0c;数以百万的来自世界各地的开发人员使用 Spring 框架来创建性能好、易于测试、可重用的代码。 Spring 框架是一个开源的 Jav…

Nginx七层(应用层)反向代理:HTTP反向代理proxy_pass篇

Nginx七层&#xff08;应用层&#xff09;反向代理 HTTP反向代理proxy_pass篇 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of thi…