kafka精准一次、事务、幂等性

news2025/1/11 10:04:54

Kafka事务

消息中间件的消息保障的3个级别

  1. At most once 至多一次。数据丢失。
  2. At last once 至少一次。数据冗余
  3. Exactly one 精准一次。好!!!

如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。

:先消费消息、再提交位移。

如果提交位移这一步挂了,就会再消费一遍消息。重复消费====》〉》至少一次

当:先提交位移、再消费消息。

提议位移成功、消费消息失败,那么数据就丢失了====》〉》至多一次

如何精准一次呢?

幂等和事务!

幂等

对接口的多次调用所产生的结果和一次调用的结果是一样的。

即:(第一次调用,中途挂了,再次调用==一次调用) 为true

如何实现?

在v2版本的消息存储格式用有两个字段。produce_id(简称pid) 、first sequence

在这里插入图片描述

每个新的生产者实例在初始化的时候都会被分配一个pid,每个pid,消息发送到每一个分区都有序列号 sequence,序列号会从0开始递增,每发送一条消息,<PID,分区> 对应的序列号的值会➕1。这个序列号值(SN)在broker的内存中维护。只有当SN_new=SN_old+1.

broker才会接收这个消息。

如SN_new < SN_old+1 说明消息重复了,这个消息可以直接丢掉。

如SN_new>SN_old+1 说明消息丢失了,有数据还没有卸写入。抛乱序异常OutOforderSequenceException。

即用序列号来保证消息的顺序消费。

注意 所记录的这个序列号是针对 每一对<PID,分区> 所以这个幂等实现的是单会话、单分区的。

如何保证多个分区之间的幂等性呢?

事务

保证对多个分区写入操作的原子性,要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。

用户显示指定一个事务id: transactionalId。这个事务id是唯一的

从生产者角度来考虑,事务保证了生产者会话消息的幂等发送跨生产者会话的事务恢复.

  • 生产者会话消息的幂等发送:如有有两个相同事务id的生产者,新的创建了 旧的就会被kill
  • 某个生产者实例宕机了,新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断

实现过程,以consume-transform-produce为例。

package com.hzbank.yjj.transaction;

import com.hzbank.yjj.producer.CustomerPartitioner;
import com.hzbank.yjj.producer.ProducerlnterceptorPrefix;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

public class TransactionConsumeTransformProduce {

    public static final String brokerList = "localhost:9092";

    public static Properties getConsumerProps(){
        Properties props =new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupId");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        return props;
    }

    public static Properties getProducerProps(){
        Properties props =new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId");

        return props;
    }

    public static void main(String[] args) {
        //初始化生产者和消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());
        consumer.subscribe(Collections.singletonList("topic-source"));
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());

        //初始化事务
        producer.initTransactions();

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            if(!records.isEmpty()){
                HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                //开启事务
                producer.beginTransaction();

                try {
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            System.out.println("获取到了topic-source发送过来的数据"+record.value());
                            System.out.println("do some ");

                            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());
                            producer.send(producerRecord);
                        }
                        // 获取最近一次的消费位移
                        long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                        offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
                    }
                    //提交消费位移
                    producer.sendOffsetsToTransaction(offsets,"groupId");
                    //提交事务
                    producer.commitTransaction();
                } catch (ProducerFencedException e) {
                    System.out.println("异常了");
                    producer.abortTransaction();
                }
            }
        }
    }


}

1. 找到TransactionCoordinator。

TransactionCoordinator负责分配和管理事务。
FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息

transactionalId 的哈希值计算主题_transaction_state 中的分区编号

根据分区leader副本找到所在的broker节点,极为Transaction Coordinator节点

2. 获取pid

通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。

String transactionalId; 事务id
int transactionTimeoutMs; 事务状态更新超时时间

3. 保存pid

TransactionCoordinator 第一次收到事务id会和对应pid保存下来,以消息(事务日志消息)的形式保存到主题_transaction_state中,实现持久化

InitProducerIdRequest还会出发一下任务:

- 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。
- 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务

4. 开启事务

通过 KafkaProduc町的 beginTransaction()方法。调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。

5. Consume-Transform-Produce

整个事务处理数据。

  • AddPartitionsToTxnRequest:让 TransactionCoordinator 将<transactionld, TopicPartition>的对应关系存储在主题

    transaction state 中

  • ProduceRequest:生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中

  • AddOffsetsToTxnRequest:TransactionCoordinator 收到这个AddOffsetsToTxnRequest请求,通过 groupId 来推导出在一consumer_offsets 中的分区

  • TxnOffsetCommitRequest:发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中

6. 提交或者终止事务

KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。

写不下去了,暂时就先理解这么多了,后面再多结合源码去看看。

参考:书籍《深入理解 Kafka:核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1247968.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【鸿蒙应用ArkTS开发系列】- 云开发入门实战二 实现省市地区三级联动地址选择器组件(上)

目录 概述 云数据库开发 一、创建云数据库的对象类型。 二、预置数据&#xff08;为对象类型添加数据条目&#xff09;。 三、部署云数据库 云函数实现业务逻辑 一、创建云函数 二、云函数目录讲解 三、创建resources目录 四、获取云端凭据 五、导出之前创建的元数据…

java.lang.ArrayIndexOutOfBoundsException: (数组越界异常)

java.lang.ArrayIndexOutOfBoundsException: &#xff08;数组越界异常&#xff09; 如何解决数组越界异常&#xff1f;1.1条件判断1.2循环结构1.3 try-catch&#xff08;异常捕获&#xff09;避免数组越界异常的方法&#xff1a;数组越界异常的调试和排查技巧&#xff1a; 当我…

报错AttributeError: module ‘cv2‘ has no attribute ‘ximgproc‘

报错AttributeError: module ‘cv2’ has no attribute ‘ximgproc’ 首先查看是否安装opencv-contrib-python pip list | grep opencv显示 opencv-contrib-python 4.4.0.46 opencv-python 4.8.1.78 opencv-pyt…

Keil5MDK创建C51工程

Keil5MDK创建C51工程 1.概述 上篇文章介绍了安装Keil5MDK和C51工具&#xff0c;这篇文章介绍工具的使用&#xff0c;首先介绍如何创建一个51单片机工程&#xff0c;写一个demo程序通过编译&#xff0c;烧录到单片机。 第一篇安装工具文章地址&#xff1a;https://blog.csdn.ne…

Maven镜像仓库问题

1.pom文件远程仓库地址 <!--使用aliyun的Maven镜像源提升下载速度--><repositories><repository><id>aliyunmaven</id><name>aliyun</name><url>https://maven.aliyun.com/repository/public</url></repository>&…

Crypto(10)BUUCTF-RSA3(共模攻击)

一.共模攻击的现实意义 好奇一个问题&#xff0c;即共模攻击有什么现实意义&#xff1f; 发现也没有什么现实意义&#xff0c;因为&#xff08;n,e&#xff09;是已知的&#xff0c;通常每个用户的n是不同的&#xff0c;除非特殊情况吧 二.共模攻击的数学原理&#xff1a; 通…

去水印网站哪个好?试试这个去水印软件!

在工作中&#xff0c;我们都曾遇到过图片水印的困扰。在众多的在线水印去除工具中&#xff0c;虽然选择看似丰富&#xff0c;但往往很难找到完全满足我们需求的那一个。有些工具操作过程繁复&#xff0c;有些工具在处理复杂水印时力不从心&#xff0c;还有些工具在去水印的过程…

SpectralGPT: Spectral Foundation Model 论文翻译2

遥感领域的通用大模型 2023.11.13在CVPR发表 原文地址&#xff1a;[2311.07113] SpectralGPT: Spectral Foundation Model (arxiv.org) 实验 ​ 在本节中&#xff0c;我们将严格评估我们的SpectralGPT模型的性能&#xff0c;并对其进行基准测试SOTA基础模型&#xff1a;ResN…

Qt手写ListView

创建视图&#xff1a; QHBoxLayout* pHLay new QHBoxLayout(this);m_pLeftTree new QTreeView(this);m_pLeftTree->setEditTriggers(QAbstractItemView::NoEditTriggers); //设置不可编辑m_pLeftTree->setFixedWidth(300);创建模型和模型项&#xff1a; m_pLeftTree…

Go 语言中的 Switch 语句详解

switch语句 使用switch语句来选择要执行的多个代码块中的一个。 在Go中的switch语句类似于C、C、Java、JavaScript和PHP中的switch语句。不同之处在于它只执行匹配的case&#xff0c;因此不需要使用break语句。 单一case的switch语法 switch 表达式 { case x:// 代码块 cas…

2024年襄阳中级工程师职称评审条件及要求

想要评审襄阳市中级工程师职称的小伙伴看过来&#xff0c;襄阳人社局对于评审所需的条件及要求如下。秋禾火带大家详细来了解一下 评审范围和人员要求 评审所申报的企业必须是在襄阳市注册登记满一年以上&#xff0c;正常运作的非公有制企业&#xff08;也就是私企&#xff09…

五、双向NAT

学习防火墙之前&#xff0c;对路由交换应要有一定的认识 双向NAT1.1.基本原理1.2.NAT Inbound NAT Server1.3.域内NATNAT Server —————————————————————————————————————————————————— 双向NAT 经过前面介绍&#xff0c;…

Red Giant Trapcode Suite红巨星粒子插件合集震撼登场

无论是电影、电视剧、广告还是音乐视频&#xff0c;炫酷特效都是吸引观众眼球的重要元素。而在Adobe After Effects&#xff08;AE&#xff09;和Premiere Pro&#xff08;PR&#xff09;软件中&#xff0c;Red Giant Trapcode Suite&#xff08;红巨星粒子插件合集&#xff09…

ELK+kafka+filebeat企业内部日志分析系统

1、组件介绍 1、Elasticsearch&#xff1a; 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎&#xff0c;基于RESTful web接口。Elasticsearch是用Java开发的&#xff0c;并作为Apache许可条款下的开放源码发布…

PyQt6库和工具库QTDesigner安装与配置

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计12条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…

Selenium实战指南:安装、使用技巧和JavaScript注入案例解析

背景 ​ 最近一段时间我会重新开一个关于selenium的专题&#xff0c;由浅入深的给大家讲一下selenium&#xff0c;同时回顾一下之前学的内容&#xff0c;selenium可以实现模拟登录&#xff0c;动态数据获取&#xff0c;获取动态cookie等等&#xff0c;还有可以写一些抢p的脚本…

游戏测试大揭秘,帮你轻松过关!

游戏测试可以看作是软件测试的一个分支&#xff0c;黑盒测试最基本的要求是会玩游戏。小公司会要求测试能力更加全面的员工&#xff0c;其中除了功能测试还要会性能测试&#xff0c;兼容测试&#xff0c;弱网测试&#xff0c;自动化测试等。 游戏测试是游戏开发过程中必不可少…

Markdown如何自定义字体样式:字体颜色、背景、斜体、粗体

Markdown如何自定义字体样式&#xff1a;字体颜色、背景、斜体、粗体 文章目录 Markdown如何自定义字体样式&#xff1a;字体颜色、背景、斜体、粗体前言一、字体大小二、字体颜色1. 英文字母2. 十六进制颜色值 三、字体背景色三、字体类型四、字体加粗五、字体斜体六、混合使用…

2.19 keil里面工具EventCorder使用方法

设置方法如下&#xff1a; 添加初始化代码如下&#xff1a; eventRecord.c #include "eventRecord.h" #include "usart.h" extern UART_HandleTypeDef *pcControlUart;/* RecordEvent初始化 */ void InitEventRecorder(void) {#ifdef RTE_Compiler_Even…

【免费使用】基于PaddleSeg开源项目开发的人像抠图Web API接口

基于PaddleSeg开源项目开发的人像抠图API接口&#xff0c;服务器不存储照片大家可放心使用。 1、请求接口 请求地址&#xff1a;http://apiseg.hysys.cn/predict_img 请求方式&#xff1a;POST 请求参数&#xff1a;{"image":"/9j/4AAQ..."} 参数是jso…