kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

news2024/9/27 9:28:41

一、kafka生产者客户端

在kafka体系结构中有如下几个重要的概念:

  • Producer:生产者,负责生产消息并投递到kafka broker的某个的分区中
  • Consumer:消费者,负责消费kafka若干个分区中的消息
  • Broker:kafka服务节点
1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {
		//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容
		public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
		//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数
		public void onAcknowledgement(RecordMetadata metadata, Exception exception);
		public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0

broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

server4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {
		public static KafkaConsumer<String, String> kafkaConsumer;
		public static void main(String[] args) {
				Properties properties = new Properties();
				properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhos
				properties.put(KafkaConstant.GROUP_ID, "test01");
				properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");
				properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");
				properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.
				properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clas
				properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
				properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");
				properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
				kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);
				
				receiveMessage("rack02");
		}
		public static void receiveMessage(String topic) {
				TopicPartition topicPartition0 = new TopicPartition(topic, 0);
				kafkaConsumer.assign(Arrays.asList(topicPartition0));
				while(true) {
						// Kafka的消费者⼀次拉取⼀批的数据
						ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll
						//System.out.println("开始打印消息!");
						// 5.将将记录(record)的offset、key、value都打印出来
						for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
								// 主题
								String topicName = consumerRecord.topic();
								int partition = consumerRecord.partition();
								// offset:这条消息处于Kafka分区中的哪个位置
								long offset = consumerRecord.offset();
								// key\value
								String key = consumerRecord.key();
								String value = consumerRecord.value();
								System.out.println(String.format("topic: %s, partition: %s, offs
						}
				}
		}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1379878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微软为Windows内置记事本应用开发AI功能;2024年15个 AI 语音生成器

&#x1f989; AI新闻 &#x1f680; 微软为Windows内置记事本应用开发AI功能 摘要&#xff1a;微软正在开发一个新的生成式AI功能&#xff0c;名为"Cowriter"&#xff0c;用于Windows内置的记事本应用。该功能类似于画图应用中的"Cocreator"功能&#x…

计算机导论04-操作系统

操作系统基础 操作系统及其特征 操作系统的概念 操作系统是&#xff1a; 管理和控制计算机硬件与软件资源的计算机程序的集合&#xff1b;操作系统直接运行在“裸机”之上&#xff0c;是最基本的系统软件&#xff0c;其他软件都必须在操作系统的支持下才能运行。 操作系统…

vue中使用js-doc

安装依赖 安装vue-template-compiler npm install ​vue-template-compiler​npm install ​vue-template-compiler​ 安装minami npm install minami 安装js-doc npm install js-doc 根目录下创建 .jsdoc.conf.json 内容&#xff1a; {"tags": {"all…

VMware Visio OmniGraffle模板和图标

VMware Visio OmniGraffle模板和图标 包含可用于Visio、omnigraffle的图标和SVG矢量图。 简介 这组资源适用于 IT 管理员、系统架构师、网络工程师和其他需要可视化 VMware 基础架构的专业人士创建精确的 VMware 网络和数据中心部署图&#xff0c;通过使用这些模板和图标&am…

国内开源环境漫谈

我国开源软件产业相较于欧美发达国家而言起步相对较晚&#xff0c;开源项目很少超过五年&#xff0c;开发者较年轻。国外很多开源项目都是10年以上的规划与投入。在开源社区发展初期、发展期、协作期、结晶期与流行期的五个阶段中&#xff0c;中国的开源社区平台大多处于前三个…

RabbitMQ解决消息丢失以及重复消费问题

文章目录 1、概念2、基于ACK/NACK机制2.1 基于Spring AMQP框架整合ACK/NACK机制2.2 测试消费失败1.02.3 测试结果1.02.4 测试MQ宕机2.5 测试结果2.0 3、RabbitMQ 如何实现幂等性设计3.1 幂等服务设计思路3.1.1 通过雪花算法生成分布式唯一ID3.1.2 通过枚举类&#xff0c;设计Me…

亚马逊怎么防止店铺关联?

亚马逊&#xff08;Amazon&#xff09;为了确保公平竞争和防止不当行为&#xff0c;采取了一些措施来防止店铺关联&#xff0c;即通过不同的方式将多个店铺相关联&#xff0c;以获取不正当的竞争优势。以下是一些亚马逊防止店铺关联的主要措施&#xff1a; 同一经营者规定&…

【TC3xx芯片】TC3xx芯片电源管理系统PMS详解

目录 前言 正文 1.供电模式选择&#xff08;Supply Mode Selection&#xff09; 1.1 供电域 1.2 供电模式 1.3 供电阈值 1.4 供电上升和下降行为Supply Ramp-up and Ramp-down Behavior 1.5 EVRC产生供电 2. 电源监控 2.1 电源监控原理 2.2 Primary低电压监控 2.3 …

轻松掌握构建工具:Webpack、Gulp、Grunt 和 Rollup 的使用技巧(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

spring常见漏洞(1)

简介 Spring的英文翻译为春天&#xff0c;可以说是给Java程序员带来了春天&#xff0c;因为它极大的简化了开发。得出一个公式&#xff1a;Spring 春天 Java程序员的春天 简化开发。最后的简化开发正是Spring框架带来的最大好处。 Spring是一个开放源代码的设计层面框架&a…

行为型设计模式——状态模式

状态模式 状态模式是比较简单的设计模式&#xff0c;它的主要作用是减少代码中大量的 if-else 或者 switch-case 等逻辑判断&#xff08;俗称屎山&#xff09;。它将每个状态定义为一个类&#xff0c;而每个状态类有自己对应的方法&#xff0c;因此当需要根据状态执行逻辑代码…

AI语音识别模块--whisper模块

1.下载 ffmpeg&#xff0c;挑一个自己电脑系统的版本&#xff0c;下载&#xff0c;如我win64&#xff1a; 地址&#xff1a; Releases BtbN/FFmpeg-Builds GitHub 下载压缩包zip&#xff0c;到本地 解压安装&#xff0c;其实无需安装&#xff0c;只需把对应的目录下的bin&…

采用不同的方式,合并多个文件为一个文件。其中包括:Java方法,Windows脚本,CMD命令

1. 批处理命令 可以实现不同文件的合并&#xff0c;将文件拖入这个命令即可。 echo off setlocal enabledelayedexpansionset "outputFilemerged_output.txt"rem Check if the output file already exists and delete it if exist "%outputFile%" del &qu…

阿尔泰科技——PXIe8912/8914/8916高速数据采集卡

阿尔泰科技PXIe8912/8914/8916高速数据采集卡是2通道同步采样数字化仪&#xff0c;专为输入信号高达 100M 的高频和高动态范围的信号而设计。 与Labview无缝连接&#xff0c;提供图形化API函数。模拟输入范围可以通过软件编程设置为1V 或者5V。配备了容量高达 2GB的板载内存。…

JAVA数组以及小练习

目录 数组的概述和静态初始化 数组的地址值和元素访问 数组的遍历 数组的动态初始化 数组练习 数组的概述和静态初始化 package 数组;public class array1 {public static void main(String[] args){//格式//静态初始化//数据类型 [] 数组名 new 数组类型[]{元素1&#xf…

第8章-第4节-Java中字节流的缓冲流

1、缓冲流&#xff1a;属于高级IO流&#xff0c;并不能直接读写数据&#xff0c;需要依赖于基础流。缓冲流的目的是为了提高文件的读写效率&#xff1f;那么是如何提高文件的读写效率的呢&#xff1f; 在内存中设置一个缓冲区&#xff0c;缓冲区的默认大小是8192字节&#xff…

Vue面试之v-if与v-show的区别

Vue面试之v-if与v-show的区别 DOM渲染初始渲染性能切换开销标签配合源码实现 最近在整理一些前端面试中经常被问到的问题&#xff0c;分为vue相关、react相关、js相关、react相关等等专题&#xff0c;可持续关注后续内容&#xff0c;会不断进行整理~ 作为Vue中两种条件性渲染元…

IaC基础设施即代码:Windows 部署 Terraform

目录 一、实验 1.环境 2.Windows 部署 Terraform 3.VS Code 部署 Terraform插件 二、问题 1.Terraform有哪些功能 2.Chocolatey有何作用 一、实验 1.环境 &#xff08;1&#xff09;主机 表1-1 主机 主机系统目标软件工具备注jia Windows 11 Terraform 1.6.6 Power…

推荐两款好用的卫星地图。

问题描述&#xff1a;推荐两款好用的卫星地图。 问题解决&#xff1a;谷歌地球、高德卫星地图。个人感觉谷歌地球好用一些。

inflate流程分析

一.inflate的三参数重载方法else里面逻辑 我们先看到setContentView里面的inflate的调用链&#xff1a; public View inflate(LayoutRes int resource, Nullable ViewGroup root) {return inflate(resource, root, root ! null);}public View inflate(LayoutRes int resource…