【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07

news2024/12/26 22:28:16

【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07

  • 1. 数据重复
    • 1.1 数据传递语义
    • 1.2 幂等性
      • 1.2.1 如何开启幂等性
      • 1.2.2 同一个消息,多个分区都会存在吗?
    • 1.3 事务
      • 1.3.1 Kafka 事务原理
      • 1.3.2 Kafka事务的作用和意义
        • 作用
        • 具体应用场景
  • 2. 数据有序
  • 3. 数据乱序

1. 数据重复

1.1 数据传递语义

  • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

  • 最多一次(At Most Once)= ACK级别设置为0

  • 总结:
    At Least Once可以保证数据不丢失,但是不能保证数据不重复;
    At Most Once可以保证数据不重复,但是不能保证数据不丢失。

  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

1.2 幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID(Producer Id)是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

当幂等性Producer开启时,Kafka通过以下机制来保证消息的幂等性:

  1. Producer ID(PID)和Sequence Number:
    每个幂等性Producer在初始化时都会分配一个唯一的Producer ID(PID)。
    每条消息在发送时会被分配一个递增的Sequence Number(序列号)。
    Kafka Broker通过PID和Sequence Number来判断消息是否重复。
  2. 去重机制:
    当Broker收到一条消息时,会检查消息的PID和Sequence Number。如果消息的PID和Sequence Number已经存在,Broker会认为这是一个重复的消息,并且不会再次写入。
    这种机制只在单个分区内有效。如果消息发送到不同的分区,Kafka无法保证幂等性。

在这里插入图片描述

1.2.1 如何开启幂等性

开启方法:

  1. 二次开发代码中添加 “props.put(“enable.idempotence”,true)”。
  2. 客户端配置文件中添加 “enable.idempotence = true”。
// 初始化配置,开启事务特性
Properties props = new Properties();
props.put("enable.idempotence", true);
props.put("transactional.id", "transaction1");
...

KafkaProducer producer = new KafkaProducer<String, String>(props);

1.2.2 同一个消息,多个分区都会存在吗?

在Kafka中,同一个消息在多个分区中一般不会存在。Kafka的设计原则之一是消息在分区间是分布的,而不是复制的。以下是一些关键点:

Kafka消息分区

  1. 分区(Partition):
    每个Kafka主题(Topic)可以有多个分区(Partitions),消息在这些分区之间分布。每个消息会被发送到一个特定的分区,而不是所有分区。
    分区可以提高并行处理能力和扩展性,因为不同的分区可以由不同的消费者并行处理。

  2. 消息键(Message Key):
    当你向Kafka发送消息时,可以指定一个键(Key)。Kafka使用这个键来决定消息应该被写入哪个分区。相同键的消息会被写入同一个分区,从而保证了消息的顺序性。
    如果没有指定键,Kafka会使用轮询(Round-Robin)或者其他算法来将消息分配到不同的分区。

  3. 副本(Replica):
    虽然同一个消息不会被写入多个分区,但Kafka有一个副本机制(Replication),用于提高数据的可靠性和容错性。每个分区有一个主副本(Leader)和多个从副本(Follower),这些副本会在不同的Broker上保存相同的数据。
    当Producer发送消息到一个分区的主副本时,主副本会将消息复制到从副本中,以保证数据的高可用性。

1.3 事务

1.3.1 Kafka 事务原理

在这里插入图片描述

Kafka 的事务一共有如下5个API

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction() throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4 提交事务
void commitTransaction() throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerTranactions {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
		
		// 初始化事务
        kafkaProducer.initTransactions();
		// 开启事务
        kafkaProducer.beginTransaction();

        try {
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
            }
			
			// 模拟失败
            int i = 1 / 0;
			
			// 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
        	// 放弃事务
            kafkaProducer.abortTransaction();
        } finally {
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
}

1.3.2 Kafka事务的作用和意义

作用
  1. 保证消息的原子性:
    事务可以保证一组消息的写入要么全部成功,要么全部失败。对于需要在多个分区或多个主题上写入数据的场景,事务能够确保数据的原子性。

  2. 避免数据丢失和重复:
    通过事务机制,Kafka可以避免消息在网络或系统故障时出现丢失或重复的情况。事务保证了每条消息的唯一性和可靠性。

  3. 支持跨分区和跨主题的操作:
    事务支持跨多个分区和多个主题的原子操作,使得Kafka在处理复杂数据流时更加灵活和可靠。

  4. 简化一致性处理:
    使用事务,开发者可以更简单地实现分布式系统中的数据一致性,而不需要手动处理分布式事务协调和一致性检查。

  5. 支持幂等性:
    事务机制基于幂等性,确保每条消息在分区内唯一,不会因重试操作导致重复消息。

具体应用场景
  1. 金融交易:
    在金融系统中,事务可以确保交易数据的完整性和一致性,避免资金损失和数据错乱。
  2. 订单处理:
    电商平台中的订单处理需要保证多个步骤(如库存检查、支付处理、订单确认)的原子性,事务可以确保订单处理的可靠性。
  3. 日志聚合:
    在日志收集和处理系统中,事务可以保证多条相关日志的完整性,避免丢失或重复。
  4. 数据同步:
    在多数据中心或多系统的数据同步中,事务可以确保数据的同步操作原子性,避免数据不一致。

2. 数据有序

在这里插入图片描述

3. 数据乱序

  1. kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
  1. kafka在1.x及以后版本保证数据单分区有序,条件如下:
    a.未开启幂等性 : max.in.flight.requests.per.connection需要设置为1
    b.开启幂等性: max.in.flight.requests.per.connection需要设置小于等于5
    原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
    故无论如何,都可以保证最近5个request的数据都是有序的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1843042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LogicFlow 学习笔记——10. LogicFlow 进阶 边

我们可以基于 Vue 组件自定义边&#xff0c;可以在边上添加任何想要的 Vue 组件&#xff0c;甚至将原有的边通过样式隐藏&#xff0c;重新绘制。 如 Example3 中所示&#xff1a; 锚点 默认情况下&#xff0c;LogicFlow 只记录节点与节点的信息。但是在一些业务场景下&#…

易兆微电子_嵌入式软件工程师笔试题

易先电子 嵌入式软件工程师笔试题(十七) 1.关键字 extern是什么含义, 请举例说明。 修饰符extern用在变量或者函数的声明前&#xff0c;用来说明 “ 此变量 / 函数是在别处定义的&#xff0c;要在此处引用 ”。 //main.c #include <stdio.h>int main() {extern int num…

HTML播放flv

页面效果&#xff1a; 代码如下&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" …

Object类hashCode方法和equals方法源码

hashCode方法 顶级类Object里面的方法&#xff0c;所有类都是继承Object的&#xff0c;返回值int类型 根据一定的hash规则&#xff08;存储地址、字段、或者长度等&#xff09;&#xff0c;映射成一个数值&#xff0c;即散列值 public static int hashCode(Object a[]) {if (a…

windows系统下安装redis,并进行密码配置

一、windows系统下安装redis Redis&#xff08;Remote Dictionary Server &#xff0c;远程字典服务&#xff09; 是一个高性能的key-value数据格式的内存数据库&#xff0c;是NoSQL数据库。redis的出现主要是为了替代早起的Memcache缓存系统的。 内存型(数据存放在内存中)的非…

MPI并行计算关键点讲解及使用入门

MPI&#xff08;Message Passing Interface&#xff09;是并行计算领域的一个关键标准&#xff0c;它定义了一套用于在多个计算节点间进行高效消息传递和数据交换的通信协议和库。在高性能计算&#xff08;HPC&#xff09;领域&#xff0c;MPI尤为重要&#xff0c;特别是在处理…

Nuxt3 实战 (十一):添加路由 Transition 过渡效果和 Loading 动画

页面过渡效果 Nuxt3 利用 Vue 的 组件 在页面和布局之间应用过渡效果。 nuxt.config.ts 文件配置&#xff1a; export default defineNuxtConfig({app: {pageTransition: { name: page, mode: out-in }}, })在页面之间添加过渡效果&#xff0c;在 app.vue 文件中添加以下 CS…

opencv 打开图片后,cv::mat存入共享内存的代码,实现消费者与生产者模型。XSI信号量和POSIX 信号量

文章目录 基于 sys 系统信号量(XSI信号量)常用api参考 基于 POSIX 信号量有名信号量常用 api 无名信号量常用 api 参考 实践-基于POSIX有名信号量生产者消费者模型任务说明同步关系互斥关系 设置一个互斥信号量&#xff0c;实现对共享内存的互斥访问设置两个信号量&#xff0c;…

ESP32 矩阵键盘 4*3状态机

简洁高效的ESP32处理矩阵键盘代码… /**********矩阵键盘IO映射***************3(9) 1(8) 5(4)2(13)7(12)6(18)4(19)*************************************/ uint8_t Trg0,Cont0; void Key_Task(void) {uint8_t ReadData,ColumnData,RowData;pinMode(9,INPUT_PULLUP);pin…

[面试题]RabbitMQ

[面试题]Java【基础】[面试题]Java【虚拟机】[面试题]Java【并发】[面试题]Java【集合】[面试题]MySQL[面试题]Maven[面试题]Spring Boot[面试题]Spring Cloud[面试题]Spring MVC[面试题]Spring[面试题]MyBatis[面试题]Nginx[面试题]缓存[面试题]Redis[面试题]消息队列[面试题]…

候选键的确定方法-如何判断属性集U的子集K是否为候选键、如何找到关系模式的候选键

一、候选键的定义 在关系模式R(U,F)中&#xff0c;若&#xff0c;且K满足&#xff0c;则K为关系模式R的候选键 关系模式R的候选键必须满足以下两个条件&#xff1a; &#xff08;1&#xff09;必须是属性集U的子集 &#xff08;2&#xff09;完全函数决定属性集U 二、如何…

使用opencv合并两个图像

本节的目的 linear blending&#xff08;线性混合&#xff09;使用**addWeighted()**来添加两个图像 原理 (其实我也没太懂&#xff0c;留个坑&#xff0c;感觉本科的时候线代没学好。不对&#xff0c;我本科就没学线代。) 源码分析 源码链接 #include "opencv2/imgc…

工控 UI 风格美轮美奂

工控 UI 风格美轮美奂

Docker 部署项目,真的太雅了~

大家好&#xff0c;我是南城余&#xff01; 最近在找工作&#xff0c;正好手里有台服务器&#xff0c;之前项目上线用的宝塔部署项目上线&#xff0c;在公司实习了一年后&#xff0c;发现如今项目部署都使用的是容器化部署方案&#xff0c;也就是类似于和 Docker 一样的部署方案…

PFC 离散元数值模拟仿真技术与应用

近几年&#xff0c;随着计算能力的提高和算法的优化&#xff0c;离散元仿真技术得到了快速发展&#xff0c;并在学术界产生了大量研究成果。在 PFC 离散元计算中无需给定材料的宏观本构关系和对应的参数&#xff0c;这些传统的参数和力学特性在程序中可以自动得到。据调查&…

【绝对有用】刚刚开通的GPT-4o计算这种数学题目出现问题了

欢迎关注如何解决以上问题的方法&#xff1a;查看个人简介中的链接的具体解决方案

Matlab数学建模实战应用:案例2 - 传染病传播

目录 前言 一、问题分析 二、模型建立 三、Matlab代码实现 四、模型验证 灵敏度分析 五、模型应用 实例总结 总结 前言 传染病传播模型是公共卫生和流行病学的重要研究内容&#xff0c;通过数学建模可以帮助我们理解传染病的传播规律和趋势&#xff0c;以便制定有效的…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 机器人搬砖(100分) - 三语言AC题解(Python/Java/Cpp)

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; &#x1f…

全网最易懂,开源时序数据库influxDB,实际应用评测

前言&#xff1a; 当今是信息爆炸的时代&#xff0c;在处理高频数据时&#xff0c;关系型数据库oracle/mysql明显表现出乏力&#xff0c;因秒级、毫秒级高频数据&#xff0c;分分钟可以把关系型数据库的表塞爆。在日常生活工作中&#xff0c;我们经常会遇到哪些需要高频分析的场…

令人震撼的人类智慧的科学领域-AI技术

AI&#xff0c;全称为人工智能&#xff08;Artificial Intelligence&#xff09;&#xff0c;是一门致力于让机器模仿人类智慧的科学领域。其核心技术涵盖了机器学习、自然语言处理、计算机视觉及专家系统等多个方面。AI旨在开发能够感知环境、进行逻辑推理、自主学习并做出决策…