Kafka简单入门02——ISR机制

news2024/11/22 4:39:04

目录

ISR机制

ISR 关键概念

HW和LEO

Java使用Kafka通信

Kafka 生产者示例

Kafka 消费者示例


ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。

ISR 关键概念
  1. 领导者和追随者:每个分区有一个领导者和零个或多个追随者。领导者负责处理客户端的写请求,而追随者主要用于数据复制。

  2. ISR 集合:ISR 集合是分区领导者的一组追随者副本,它们与领导者保持数据同步。只有在 ISR 集合中的追随者副本可以参与数据的写入和读取操作。

  3. 数据复制:领导者将消息写入其本地日志,并定期将这些消息发送给 ISR 集合中的追随者。追随者接收消息后,将其写入本地日志,以保持数据同步。

  4. Leader Epoch 和 Log Start Offset:ISR 集合中的每个追随者都维护了领导者的日志信息,包括领导者的 Leader Epoch 和 Log Start Offset。这些信息用于确保数据的正确复制和同步。

  5. 数据一致性:只有在 ISR 集合中的所有追随者都成功复制了一条消息后,领导者才会将该消息标记为已提交,确保数据的一致性。

  6. 故障处理:如果某个追随者发生故障或者追赶进度过慢,那么该追随者可能会被从 ISR 集合中移除。这有助于保持数据的可靠性和避免影响性能。

其中,需要注意的的概念:

  • 分区中的所有副本统称为AR(Assigned Replicas)。

  • 所有Leader副本加上和Leader副本保持同步的Follower副本组成ISR(In-Sync Replicas)。

  • 所有没有保持同步的Follower副本组成OSR(Out-of-Sync Replicas)。

  • AR = ISR + OSR。正常情况下,所有Follower副本都应该和Leader副本一致,即AR=ISR。

  • 当Leader故障时,在ISR集合中的Follower才有资格被选举为新的Leader。

HW和LEO

在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消费有关的两个重要概念。

HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它表示已经提交的消息。只有在 HW 之前的消息才被认为是已经提交的,这些消息已经被写入分区的所有追随者副本,并且被认为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。

LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 表示分区日志中的最后一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而逐渐增加。

HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性:

  • HW 之前的消息是已经提交的消息,它们在数据复制中是安全的,不会丢失。

  • LEO 之前的消息是已经写入但尚未被所有追随者副本复制的消息。这些消息可能会在 HW 之前被提交,也可能会在之后被提交。

  • 一旦 HW 追赶上 LEO,表示所有的消息都已经提交,分区的数据一致性得到了保障。

Kafka的消息同步流程:

  1. 初始状态,HW和LEO在同一位置。消费者可以读取的有效消息为0,1,2,3.

  2. 消息写入Leader,LEO位置改变。Follower进行同步。

  3. Follower同步进度决定HW位置,消费者可读的有效消息0,1,2,3,4。

  4. 完成同步,消费者可读的有效消息0,1,2,3,4,5,6。

可以看出,Kafka的复制机制既不是完全的同步复制,也不是单纯异步复制。

  • 同步复制要求所有Follower副本都复制完,太影响性能了。

  • 异步复制只要数据被写入Leader副本就认为提交成功,在这种情况下,如果Leader宕机时候Follower还是落后于Leader就会造成数据丢失。

而Kafka使用的ISR机制则有效地权衡了数据可靠性和性能之间的关系。

Java使用Kafka通信

以下是 Kafka 生产者和消费者的简单示例,使用 Kafka 的 Java 客户端库(Kafka Producer 和 Kafka Consumer)来创建一个基本的消息传递示例。

Kafka 生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
​
public class KafkaProducerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092"; // Kafka 服务器地址
        String topic = "my-topic"; // Kafka 主题名称
​
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
        Producer<String, String> producer = new KafkaProducer<>(properties);
​
        // 发送消息
        producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
            } else {
                System.err.println("Error sending message: " + exception.getMessage());
            }
        });
​
        producer.close();
    }
}
Kafka 消费者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
​
public class KafkaConsumerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092"; // Kafka 服务器地址
        String groupId = "my-group"; // 消费者组 ID
        String topic = "my-topic"; // Kafka 主题名称
​
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("group.id", groupId);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
​
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1126464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CCF CSP认证历年题目自练Day38

题目 试题编号&#xff1a; 201409-3 试题名称&#xff1a; 字符串匹配 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 256.0MB 问题描述&#xff1a; 问题描述   给出一个字符串和多行文字&#xff0c;在这些文字中找到字符串出现的那些行。你的程序还需支持大小写敏感…

进程之操作系统的概念

再小的努力&#xff0c;乘以365都很明显。文章目录 操作系统操作系统的概念设计操作系统的目的 管理 ps:如何理解管理如何进行管理 操作系统管理软硬件资源小总结系统调用和库函数的概念小总结 操作系统 在讲述进程的时候我们先讲述一下操作系统&#xff08;os&#xff09;,因…

基于Java的小说下载网站管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

支持多用户协作的API测试工具:Apipost

在当今快速发展的数字化时代&#xff0c;API已成为企业与开发者实现数据互通、应用集成的重要桥梁。然而&#xff0c;随着API数量的不断增加&#xff0c;API开发、调试、测试、文档等工作也变得越来越复杂。为了解决这一痛点&#xff0c;一款名为Apipost的API协同研发工具应运而…

Python基础入门例程6-NP6 牛牛的小数输出

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 示例1 解答&#xff1a; 说明&#xff1a; 描述 牛牛正在学习Python的输出&#xff0c;他想要使用print函数控制小数的位数&#xff0c;你能帮助它把所有读入的数据都保留两位小数输出吗&#xff1f; 输入描述&a…

006:vue使用lottie-web实现web动画

文章目录 1. 简介2. 优点3. 效果4. 安装使用5. lottie-web 常用方法6. Lottie-web 常用的事件 1. 简介 官方介绍&#xff1a;Lottie 是一个库&#xff0c;可以解析使用AE制作的动画&#xff08;需要用bodymovie导出为json格式&#xff09;,支持web、ios、android、flutter和re…

LeetCode 22. 括号生成【字符串,回溯;动态规划】中等

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

基于Java的线上花店管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

OJ题之反转链表

hello ~~~每日一练的分享来了。 今天up主将为大家分享一个 OJ题之反转链表 题目&#xff1a;将链表实现如下的变化 1.思路的讲解&#xff1a;对于原链表我们只需改变指针的指向&#xff08;箭头&#xff09;即可 那么问题就来了&#xff0c;我们如何实现此操作&#xff1f;…

Redis设计与实现(2)链表和链表节点

每一个链表节点 typedef struct listNode{//前置节点struct listNode *prev;//后置节点struct listNode *next;//节点值void *value }lisNode; 多个listNode可以通过pre和next指针组成双端链表 虽然只要使用多个listNode结构就可以组成链表&#xff0c;但使用adlist.h/list来…

NLP入门——语言结构/语言建模

一、Linguistics 语言学 wordsmorphology 形态学&#xff1a;词的构成和内部结构研究。如英语的dog、dogs和dog-catcher有相当的关系morpheme 语素&#xff1a;最小的语法单位&#xff0c;是最小的音义结合体lexeme 词位&#xff1a;词的意义的基本抽象单位&#xff0c;是一组…

C语言_字符串和内存函数

文章目录 前言一. strlen二. strcpy三.strcat四. strcmp &#xff08;字符串比较&#xff09;五. strncpy六. strncmp七. strstr八. strtok九 . strerror perror十. 字符分类函数十一. memcpy (内存拷贝&#xff09;十二. memmove(可以重叠拷贝 也可以实现不重叠的内存拷贝) 前…

CentOS7安装部署CDH6.2.1

文章目录 CentOS7安装部署CDH6.2.1一、前言1.简介2.架构3.环境 二、环境准备1.部署服务器2.安装包准备3.修改机器名4.关闭防火墙5.关闭 SELinux6.Hosts文件7.limits文件8.设置swap空间9.关闭透明巨页内存10.免密登录 三、安装CM管理端1.安装第三方依赖包2.安装Oracle的JDK3.安装…

Rockchip RK3399 - DRM crtc基础知识

一、LCD硬件原理 1.1 CRT介绍 CRT是阴极射线管(Cathode Ray Tube)的缩写&#xff0c;它是一种使用电子束在荧光屏上创建图像的显示设备。CRT显示器在过去很长一段时间内是主流的显示技术&#xff0c;现已被液晶显示屏或其他新兴技术所替代。 在CRT显示器中&#xff0c;扫描电子…

k8s-----6、pod的镜像拉取、重启策略、资源限制

镜像拉取、重启策略、资源限制 1、镜像拉取2、资源限制3、重启机制 1、镜像拉取 [rootmaster ~]# cat nginx.yaml apiVersion: v1 kind: Pod metadata:name: mypod spec:containers:- name: nginximage: nginx:1.14imagePullPolicy: Always# IfNotPresent: 默认值&#xff0c…

CPO是啥?

CPO是啥&#xff1f; CPO通常是“Chief Product Officer”&#xff08;首席产品官&#xff09;的缩写&#xff0c;是企业高层管理团队中负责产品管理和战略规划的主要负责人。CPO通常负责制定公司的产品战略、管理产品组合、带领产品团队以及推动产品的创新和优化。他或她需要有…

引用类型的按值传递

按值传递时&#xff0c;传递过去的是该引用类型实例的引用的一个拷贝&#xff0c;这样说可能不是很清楚&#xff0c;而且容易引起误解。所谓引用&#xff0c;就是分配在栈上的一小块内存区域&#xff0c;里面存放着该引用类型实例在托管堆上的地址。引用类型在按值传递的时候&a…

CUDA学习笔记(十三) Shared Memory

CUDA SHARED MEMORY shared memory在之前的博文有些介绍&#xff0c;这部分会专门讲解其内容。在global Memory部分&#xff0c;数据对齐和连续是很重要的话题&#xff0c;当使用L1的时候&#xff0c;对齐问题可以忽略&#xff0c;但是非连续的获取内存依然会降低性能。依赖于…

codeshell安装配置

codeshell安装配置 1 注意事项1.1 Python版本问题 2 codeshell环境搭建2.1 codeshell使用软件各版本2.2 软件下载2.3 codeshell使用环境安装2.3.1 python-3.10.9-amd64.exe安装2.3.2 Anaconda3-2022.10-Windows-x86_64.exe安装2.3.3 创建环境2.3.4 Pytorch安装2.3.5 transforme…

C++中的多态以及如何实现多态(近万字图文详解)

C中的多态 1. 多态的概念1.1 概念 2. 多态的定义及实现2.1多态的构成条件&#xff08;重点&#xff09;2.2 虚函数2.3 虚函数的重写(重点)2.4 C11 override 和 final2.5 重载、覆盖(重写)、隐藏(重定义)的对比 3. 抽象类3.1 概念3.2 接口继承和实现继承 4. 多态的原理4.1虚函数…