【大数据学习 | kafka】kafka的ack和一致性

news2024/11/6 14:48:36

1. ack级别

上文中我们提到过kafka是存在确认应答机制的,也就是数据在发送到kafka的时候,kafka会回复一个确认信息,这个确认信息是存在等级的。

ack=0 这个等级是最低的,这个级别中数据sender线程复制完毕数据默认kafka已经接收到数据。

ack=1 这个级别中,sender线程复制完毕数据leader分区拿到数据放入到自己的存储并且返回确认信息

ack= -1 这个级别比较重要,sender线程复制完毕数据,主分区接受完毕数据并且从分区都同步完毕数据然后在返回确认信息

那么以上的等级在使用的时候都会出现什么问题呢?

ack = 0 会丢失数据

ack=0时,在异步复制过程中,leader可能会丢失leader分区和follower分区的数据。

ack=1

ack=1的时候leader虽然接收到数据存储到本地,但是没有同步给follower节点,这个时候主节点宕机,从节点重新选举新的主节点,主节点是不含有这个数据的,数据会丢失.

ack = -1

这个模式不会丢失数据,但是如果leader接受完毕数据并且将数据同步给不同的follower,从节点已经接受完毕,但是还没有返回给sender线程ack的时候,这个时候leader节点宕机了,sender没有接收到这个ack,它人为没有发送成功还会重新发送数据过来,会造成数据重复。

一般前两种都适合在数据并不是特别重要的时候使用,而最后一种效率会比较低下,但是适用于可靠性比较高的场景使用

所以一般使用我们都会使用ack = -1 retries = N 联合在一起使用

那么我们如何能够保证数据的一致性呢?

2. 幂等性

在kafka的0.10以后的版本中增加了新的特性,幂等性,主要就是为了解决kafka的ack = -1的时候,数据的重复问题,设计的原理就是在kafka中增加一个事务编号。

数据在发送的时候在单个分区中的seq事物编号是递增的,如果重复的在一个分区中多次插入编号一致的两个信息,那么这个数据会被去重掉

在单个分区中序号递增,也就是我们开启幂等性也只能保证单个分区的数据是可以去重的

整体代码如下:

pro.put(ProducerConfig.RETRIES_CONFIG,3);
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

设定retries = 3 ,enable.idempotence = true

幂等性开启的时候,ack默认设定为-1。

幂等性的工作原理很简单,每条消息都有一个「主键」,这个主键由 <PID, Partition, SeqNumber> 组成,他们分别是:

  • PID:ProducerID,每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
  • Partition:消息需要发往的分区号
  • SeqNumber:生产者,他会记录自己所发送的消息,给他们分配一个自增的 ID,这个 ID 就是 SeqNumber,是该消息的唯一标识

对于主键相同的数据,Kafka 是不会重复持久化的,它只会接收一条,但由于是原理的限制,幂等性也只能保证单分区、单会话内的数据不重复,如果 Kafka 挂掉,重新给生产者分配了 PID,还是有可能产生重复的数据,这就需要另一个特性来保证了 ——Kafka 事务。

3. kafka的事务

Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。

Kafka 事务分为生产者事务消费者事务,但它们并不是强绑定的关系,消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务。

3.1 如何开启事务

创建一个 Producer,指定一个事务 ID:

Properties properties = new Properties();

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//设置事务ID,必须
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");
//创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

使用事务发送消息:

// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();

//发送10条消息往kafka,假如中间有异常,所有消息都会发送失败
try {
    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<>("topic-test", "a message" + i));
    }
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
    // 终止事务
    producer.abortTransaction();
} finally {
    producer.close();
}

3.2 事务工作原理

1)启动生产者,分配协调器

在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务。

事务协调器的分配涉及到一个特殊的主题 __transaction_state,该主题默认有 50 个分区,每个分区负责一部分事务;Kafka 根据事务ID的hashcode值%50 计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。

分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了。

2)发送消息

生产者分配到 PID 后,要先告诉事务协调器要把消息发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息。

当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应

3)确认事务

当生产者开始发送消息时,协调器判定事务开始。它会将开始的信息持久化到主题 __transaction_state 中。

当生产者发送完事务内的消息,或者遇到异常发送失败,协调器会收到 Commit 或 Abort 请求,接着事务协调器会跟所有主题通信,告诉它们事务是成功还是失败的。

如果是成功,主题会汇报自己已经收到消息,协调者收到所有主题的回应便确认了事务完成,并持久化这一结果。

如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束;整个放弃事务的过程消费者是无感知的,它并不会收到这些数据。

事物不仅可以保证多个数据整体成功失败,还可以保证数据丢失后恢复。

3.3 代码实现

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithTransaction {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaciton_test");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
        producer.initTransactions();
        producer.beginTransaction();
        try{
            for(int i=0;i<5;i++){
                producer.send(record);
            }
//            int a = 1/0;
            producer.commitTransaction();
        }catch (Exception e){
            producer.abortTransaction();
        }finally {
            producer.close();
        }

    }
}

4. 一致性语义

在大数据场景中存在三种时间语义,分别为

At Least Once 至少一次,数据至少一次,可能会重复

At Most Once 至多一次,数据至多一次,可能会丢失

Exactly Once 精准一次,有且只有一次,准确的消息传输

那么针对于以上我们学习了ack已经幂等性以及事务。

所以我们做以下分析:

如果设定ack = 0 或者是 1 出现的语义就是At Most Once 会丢失数据

如果设定ack = - 1 会出现At Least Once 数据的重复

在ack = -1的基础上开启幂等性会解决掉数据重复问题,但是不能保证一个批次的数据整体一致,所以还要开启事务才可以。

5. 参数调节

参数调节
buffer.memoryrecord accumulator的大小,适当增加可以保证producer的速度,默认32M
batch-size异步线程拉取的批次大小,适当增加可以提高效率,但是会增加延迟性
linger.ms异步线程等待时长一般根据生产效率而定,不建议太大增加延迟效果
acks确认应答一般设定为-1,保证数据不丢失
enable.idempotence开启幂等性保证数据去重,实现exactly once语义
retries增加重试次数,保证数据的稳定性
compression.type增加producer端的压缩
max.in.flight.requests.per.connectionsender线程异步复制数据的阻塞次数,当没收到kafka的ack之前可以最多发送五个写入请求,调节这个参数可以保证数据的有序性

全部代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithMultiConfig {
    public static void main(String[] args) throws InterruptedException {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);
        pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        pro.put(ProducerConfig.RETRIES_CONFIG, 3);
        pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
        producer.send(record);
        producer.close();
    }
}

其中max.in.flight.requests.per.connection参数设定后可以增加producer的阻塞大小

在未开启幂等性的时候,这个值设定为1,可以保证单个批次的数据有序,在分区内部有序

如果开启了幂等性可以设定最大值不超过5,可以保证五个request请求单个分区内有序

因为没有开启幂等性的时候如果第一个请求失败,第二个请求重新发送的时候需要二次排序

要是开启幂等性了会保留原来的顺序性,不需要重新排序

总而言之kafka可以保证单分区有序但是整体是无序的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2234343.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

完美解决mysql -u root -p ‘mysql‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件

如果你已经安装了mysql8.0&#xff0c;但是还出现是下面的问题&#xff0c;解决方法是从根目录打开或者配置环境变量。 遇到的错误主要是与命令行环境和 MySQL 命令无法识别有关。这里我会逐步分析问题&#xff0c;并给出可能的解决方法。 问题描述和分析&#xff1a; ‘my…

UE5 材质篇 1 如何偏移顶点

顶点偏移 start content里的plane长这样 我们进行一点顶点偏移就能长这样 XY加起来乘个缩放系数扔给sin结果乘个缩放系数即可

求助帖【如何学习核磁共振的原理】

最近提前进组了 我完全不懂磁共振的相关知识 想问问各位大佬有没有推荐的学习路线 或者是学习资料、论坛都可以的&#xff08;我做的方向是磁共振成像技术&#xff09; 老师给了一本书&#xff0c;但是有点看不懂&#xff0c;全英文的 叫Principles Of Magnetic Resonance …

vite+vue项目创建流程;npm error enoent Could not read package.json异常报错问题

前提概要&#xff1a;默认下载好node vue vite等等东西啊 新建文件夹&#xff0c;放项目管理员身份运行命令行&#xff0c;先转到所在大盘&#xff0c;然后再cd到具体的新建文件夹&#xff0c;执行npm init vitelatest命令。 管理员身份运行vscode&#xff0c;打开刚才新建的v…

[mysql]修改表和课后练习

目录 DDL数据定义语言 添加一个字段 添加一个字段到最后一个 添加到表中的第一个一个字段 选择其中一个位置: 修改一个字段:数据类型,长度,默认值(略) 重命名一个字段 删除一个字段 重命名表 删除表 清空表 DCL中事务相关内容 DCL中COMMIT和ROLLBACK的讲解 对比TR…

秒杀系统的设计与压测

环境准备 数据库 完成demo至少需要两个数据表&#xff0c;一个customer表示秒杀的用户&#xff0c;一个sec_product表示被秒杀的商品。 create database sec_kill;use sec_kill; create table customer(id int primary key auto_increment not null,name varchar(20),phone …

SpringBoot健身房管理系统:用户体验至上

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…

一:时序数据库-Influx应用

目录 0、版本号 1、登录页面 2、账号基本信息 3、数据库案例 4、可视化 5、java案例 0、版本号 InfluxDB v2.4.0 1、登录页面 http://127.0.0.1:8086/signin 账号&#xff1a;自己账号 密码&#xff1a;自己密码 2、账号基本信息 查看用户id和组织id&#xff01;&…

SpringBoot day 1105

ok了家人们&#xff0c;今天继续学习spring boot&#xff0c;let‘s go 六.SpringBoot实现SSM整合 6.1 创建工程&#xff0c;导入静态资源 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</…

深入实践:Langchain-Chatchat大型语言模型本地知识库的部署难题、解决方案及应用指南

检索增强生成(RAG)实践&#xff1a;基于LlamaIndex和Qwen1.5搭建智能问答系统 什么是 RAG LLM 会产生误导性的 “幻觉”&#xff0c;依赖的信息可能过时&#xff0c;处理特定知识时效率不高&#xff0c;缺乏专业领域的深度洞察&#xff0c;同时在推理能力上也有所欠缺。 正是…

鸿蒙5.0时代:原生鸿蒙应用市场引领开发者服务新篇章

前言 10月22日原生鸿蒙之夜发布会宣布HarmonyOS NEXT正式发布&#xff0c;首个版本号&#xff1a;鸿蒙5.0。这次“纯血鸿蒙”脱离了底层安卓架构成为纯国产的独立系统&#xff0c;仅凭这一点就有很多想象空间。 目前鸿蒙生态设备已超10亿&#xff0c;原生鸿蒙操作系统在中国市…

Spark的容错机制

1&#xff0c;Spark如何保障数据的安全 1、RDD容错机制&#xff1a;persist持久化机制 1&#xff09;cache算子 - 功能&#xff1a;将RDD缓存在内存中 - 语法&#xff1a;cache() - 本质&#xff1a;底层调用的还是persist&#xff08;StorageLevel.MEMORY_ONLY&#xff09;&…

Web3对社交媒体的影响:重新定义用户互动方式

随着互联网的发展和人们对隐私、安全、所有权的需求不断提高&#xff0c;Web3 的概念逐渐深入人心。Web3 的出现标志着一个去中心化、用户主导的网络时代的到来&#xff0c;这也将对社交媒体产生深远的影响。Web3 不仅推动社交媒体从中心化模式向用户主导的去中心化模式转变&am…

高通Quick板上安装编译Ros1 noetic,LeGO_LOAM,FAR_Planner和rslidar_sdk

环境要求&#xff1a; 这里quick板上安装的是Ubuntu20.04版本 Ros Noeti安装&#xff1a; 1.设置软件源&#xff1a; 官方提供的软件源&#xff1a; sudo sh -c echo "deb http://packages.ros.org/ros/ubuntu $(lsb_release -sc) main" > /etc/apt/sources.list.…

解决Knife4j 接口界面UI中文乱码问题

1、查看乱码情况 2、修改 编码设置 3、删除 target 文件 项目重新启动 被坑死了

HTML 标签属性——<a>、<img>、<form>、<input>、<table> 标签属性详解

文章目录 1. `<a>`元素属性hreftargetname2. `<img>`元素属性srcaltwidth 和 height3. `<form>`元素属性actionmethodenctype4. `<input>`元素属性typevaluenamereadonly5. `<table>`元素属性cellpaddingcellspacing小结HTML元素除了可以使用全局…

仿真APP助力汽车零部件厂商打造核心竞争力

汽车零部件是汽车工业的基石&#xff0c;是构成车辆的基础元素。一辆汽车通常由上万件零部件组成&#xff0c;包括发动机系统、传动系统、制动系统、电子控制系统等&#xff0c;它们共同确保了汽车的安全、可靠性及高效运行。 在汽车产业快速发展的今天&#xff0c;汽车零部件…

.NET周刊【11月第1期 2024-11-03】

国内文章 .NET 9 AOT的突破 - 支持老旧Win7与XP环境 https://www.cnblogs.com/lsq6/p/18519287 .NET 9 引入了 AOT 支持&#xff0c;使得应用程序能够在编译时优化&#xff0c;以在老旧 Windows 系统上运行。这项技术通过静态编译&#xff0c;消除运行时的 JIT 编译&#xf…

江协科技STM32学习- P36 SPI通信外设

&#x1f680;write in front&#x1f680; &#x1f50e;大家好&#xff0c;我是黄桃罐头&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​…

Type-C接口 PD 受电端(sink)快充协议芯片,XSP08Q应用小家电领域的方案

前言 在智能家居浪潮的推动下&#xff0c;小家电作为日常生活中不可或缺的一部分&#xff0c;其供电方式的创新与优化正逐步成为行业关注的焦点。随着快充技术的普及&#xff0c;特别是Power Delivery&#xff08;PD&#xff09;协议的广泛应用&#xff0c;一种新型供电模式—…