kafka学习笔记--如何保证生产者数据可靠、不重复、有序

news2024/11/27 10:23:33

本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)


PS:本节内容尚硅谷的视频讲的不太友好,又查了很多资料才搞明白

文章目录

  • 数据可靠性
  • 数据不重复
  • 数据有序性

数据可靠性

首先数据的可靠性指的是:

  • 消息不会意外丢失
  • 消息不会重复传递

那回顾我们的数据发送流程,在确认数据发送成功的这一步,也就是ack应答这里,不同的参数对应着不同的策略,如果选择了0和1,则存在丢数的问题,如图:
0: 如果数据发送到某个主题的leader时,leader所在节点挂了,那么这条消息就丢失了
在这里插入图片描述
1: 同理,leader收到了,还没应答时挂了,也会丢数据
在这里插入图片描述
-1(all): 使用-1能保证数据落配盘后才回答,保证数据不丢失
在这里插入图片描述
但是,如果Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

这就引出了ISR队列的概念了

ISR,是一个机制,也代表着一个同步合集,是由Leader维护的一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的


那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?


主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

我们将ISR与ACK应答结合起来使用,就形成了数据可靠条件

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据不重复

上面讲解的,只能保证数据可靠,但是这又引出了一个新的问题
如果,leader在同步完成之后,向生产者回答时,挂掉了,这时候剩下的备份分区会自动选举出一个新leader出来,但是生产者并不知道它挂掉了,只会以为是消息发送失败了,触发重试,又将数据发送了一遍,然后新的leader就又接受了一遍消息,然后在备份分区上再存一遍。这就导致了这条消息存在两份,产生数据重复问题。
在这里插入图片描述
那么kafka是怎么保证数据不重复的呢?
其实这就是数据的幂等性问题了,幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

kafka默认启用数据幂等性,即设置 enable.idempotence = true

在生产者发消息时,这条消息是有它自己的属性的,其中有三个数据被拿来作为数据的主键,kafka会以此来判断这条消息是否重复,若重复,则只保留一条

PID:又叫生产者编号(producerid), Producer在初始化的时候(只有初始化的时候会随机生成PID,也就是重启就会再次生成)会被分配一个PID

Partition:又叫分区编号,即这条消息要发往的分区的paritionid

SeqNumber:又叫序列号,发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息)

这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条,这样就保证了数据的唯一,不重复。

但是幂等性只能保证的是在单分区单会话内不重复,如果发消息时生产者挂掉了,重启后它不知道是否发送成功了,又将这个消息再发送一遍,此时它的PID发生变化,那么这条消息就被认为是一条新的消息,导致重复存储,这种情况怎么解决呢?

这就要引入kafka的事务机制了,事务这个东西大家都知道啥意思,不再重复解释

我们通过事务,让客户端挂掉后继续处理,而不是重新从头来过,保证消息的仅一次发送

注意:开启事务,必须开启幂等性。

请添加图片描述

kafka使用事务,有5个API

// 初始化事务
void initializeTransactions ();

// 开启事务
void beginTransaction () throws ProducerFencedException;

// 在事务中提交已消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;

// 提交事务
void commitTransaction () throws ProducerFencedException;

// 放弃事务(类似于回滚事务的操作)
void abortTransaction () throws ProducerFencedException;

举个例子:

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Test {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息
        properties.put("bootstrap.servers", "hadoop102:9092");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.put("transactional.id", "transaction_id_0");

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 初始化事务
        kafkaProducer.initTransactions();

        // 开启事务
        kafkaProducer.beginTransaction();

        try {
            // 4. 调用 send 方法,发送消息
            // 发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
            }

            // int i = 1 / 0;

            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
}

数据有序性

如果某主题TOPIC只有一个分区,那么它天生有序,因为分区其实就是一个有序队列

如果是多分区的,kafka是通过滑动窗口的思想解决这个问题的

我们知道kafka发送请求时,最多缓存5个,其实在发送时,每个请求都有自己的单调递增编号,kafka broker在接收数据时,会自动按照编号将数据排序,并且如果其中一个编号的请求失败时,后续再次成功,数据过来后,会自动的根据编号插入到应该在的位置上
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1300743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制

🗝️改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制 代码ACmixBiFormerBAMBlock加入方法各种yaml加入结构本文提供了改进 YOLOv8注意力系列包含不同的注意力机制以及多种加入方式,在本文中具有完整的代码和包含多种更有效加入YOLOv8中的yaml结构,读者可以获…

特征驱动开发

FDD 方法来自于一个大型的新加坡银行项目。FDD 的创立者 Jeff De Luca 和 Peter Coad 分别是这个项目的项目经理和首席架构设计师。在 Jeff 和 Peter 接手项目时&#xff0c;客户已经经历了一次项目的失败&#xff0c;从用户到高层都对这个项目持怀疑的态度&#xff0c;项目组士…

WLAN配置实验

本文记录了WLAN配置实践的过程&#xff0c;该操作在华为HCIA中属于相对较复杂的实验&#xff0c;记录过程备忘。这里不就WLAN原理解释&#xff0c;仅进行配置实践&#xff0c;可以作为学习原理时候的参考。本文使用华为ENSP进行仿真。实验拓扑图如下&#xff1a; 1.WLAN工作流程…

基于OpenCV的流水线包装箱检测计数应用(附源码)

导 读 本文主要介绍基于OpenCV的流水线包装箱检测计数应用,并给出源码。 资源下载 完整代码和视频下载地址: https://github.com/freedomwebtech/rpi4-conveyor-belt-boxces-counter 核心代码如下(cboxtest.py): import cv2import numpy as npfrom tracker import*cap=c…

javaSwing酒店管理

一、介绍 在这篇博客中&#xff0c;我们将介绍一个基于MySQL数据库、Java编程语言和Swing图形用户界面的简单酒店管理系统。该系统包括了查询房客信息、查询房客状态、修改房客信息、添加房间信息、添加住户、退房管理、预定管理、退订管理、入账管理、出账管理、修改资料等多…

HarmonyOS4.0从零开始的开发教程11给您的应用添加弹窗

HarmonyOS&#xff08;十&#xff09;给您的应用添加弹窗 概述 在我们日常使用应用的时候&#xff0c;可能会进行一些敏感的操作&#xff0c;比如删除联系人&#xff0c;这时候我们给应用添加弹窗来提示用户是否需要执行该操作&#xff0c;如下图所示&#xff1a; 弹窗是一种…

5.题目:编号1624 小蓝吃糖果

题目: ### 这道题主要考察poriority_queue优先队列 #include<bits/stdc.h> using lllong long; using namespace std; int main(){ios::sync_with_stdio(0),cin.tie(0),cout.tie(0);int n;cin>>n;priority_queue<int> pq;ll sum0,x;for(int i1;i<n;i){c…

基于SSM的java衣服商城

基于SSM的java衣服商城 一、系统介绍二、功能展示四、其他系统实现五、获取源码 一、系统介绍 项目类型&#xff1a;Java EE项目 项目名称&#xff1a;基于SSM的美衣商城 项目架构&#xff1a;B/S架构 开发语言&#xff1a;Java语言 前端技术&#xff1a;Layui等 后端技术…

数据结构:数和二叉树

树概念及结构 树的结构&#xff1a; 树是一种非线性的数据结构&#xff0c;它是由n个有限节点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;即根朝上&#xff0c;叶朝下。 有一个特殊的节点&#xff0c;成为根节点&#xff0c;根节点没有…

详解ZNS SSD基本原理

ZNS SSD的原理是把namespace空间划分多个zone空间&#xff0c;zone空间内部执行顺序写。这样做的优势&#xff1a; 降低SSD内部的写放大&#xff0c;提升SSD的寿命 降低OP空间&#xff0c;host可以获得更大的使用空间 降低SSD内部DRAM的容量&#xff0c;降低整体的SSD成本 降…

15、lambda表达式、右值引用、移动语义

前言 返回值后置 auto 函数名 (形参表) ->decltype(表达式) lambda表达式 lambda表达式的名称是一个表达式 (外观类似函数)&#xff0c;但本质绝非如此 语法规则 [捕获表] (参数表) 选项 -> 返回类型 { 函数体; }lambda表达式的本质 lambda表达式本质其实是一个类…

Windows汇编调用printf

VS2022 汇编 项目右键 生成依赖项 生成自定义 勾选masm 链接器 高级 入口点 main X86 .686 .model flat,stdcall option casemap:none includelib ucrt.lib includelib legacy_stdio_definitions.libEXTERN printf:proc.data szFormat db %s,0 szStr db hello,0.code main…

AI 绘画 | Stable Diffusion 艺术字与光影效果

前言 这篇文章教会你如何使用Stable Diffusion WEB UI扩展插件ControlNet控制光影模型实现艺术字与图片的光影效果。艺术字主要原理是用到了Depth (深度)算法和模型,光影效果是用到了control_v1p_sd15_brightness(亮度)和control_v1p_sd15_illumination(光亮)两个模型其中…

力扣每日一题day32[104. 二叉树的最大深度]

给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3示例 2&#xff1a; 输入&#xff1a;root [1,null,2] 输出…

Spring Boot 3 集成 Druid 连接池详解

在现代的Java应用中&#xff0c;使用一个高效可靠的数据源是至关重要的。Druid连接池作为一款强大的数据库连接池&#xff0c;提供了丰富的监控和管理功能&#xff0c;成为很多Java项目的首选。本文将详细介绍如何在Spring Boot 3项目中配置数据源&#xff0c;集成Druid连接池&…

普冉(PUYA)单片机开发笔记(7): ADC-轮询式多路采样

概述 应用中经常会有使用单片机进行模数转换的需求。PY32F003 具有 1 个 12 位的模拟数字转换器&#xff08;ADC&#xff09;&#xff0c;今天我们一起来使用一下这个 ADC。 数据手册中对 ADC 简介如下。 SAR ADC&#xff1a;逐次逼近式 ADC&#xff0c;原理参见“参考链接&a…

六何分析法分析uniApp

一、什么是 uniApp&#xff08;What&#xff09; uni-app 是一个使用 Vue.js 开发所有前端应用的框架&#xff0c;开发者编写一套代码&#xff0c;可发布iOS、Android、H5、以及各种小程序( 微信/支付宝/百度/头条/00/钉钉/淘宝)、快应用等多个平台。uni-app 在手&#xff0c;…

AI隆重软件,AI原创文章隆重软件

随着信息量的急剧增加&#xff0c;许多写作者、网站管理员和内容创作者们纷纷感受到了文章降重的压力。原始文本的降重&#xff0c;需要保留关键信息的同时避免重复&#xff0c;这是一项既繁琐又耗时的任务。 改写软件的批量降重功能 147SEO改写软件在降重领域的卓越表现主要体…

小目标检测模型设计的一点思考

1. 小目标的特性 目标之间的交叠概率比较低&#xff0c;即使有交叠&#xff0c;其IoU多数情况下也是比较小的 AI-TOD Tiny Person Dateset 小目标自身的纹理显著度有强弱区别&#xff0c;但是总体来说纹理特征都较弱&#xff0c;很多时候需要借助一定的图像上下文来帮助确认 …

自动驾驶学习笔记(十七)——视觉感知

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo 社区开发者圆桌会》免费报名—>传送门 文章目录 前言 分类 目标检测 语义分割 实例分割 …