Kafka Producer之事务性

news2025/1/9 1:43:47

文章目录

  • 1. 跨会话幂等性失效
  • 2. 开启事务
  • 3. 事务流程原理

事务性可以防止跨会话幂等性失效,同时也可以保证单个生产者的指定数据,要么全部成功要么全部失败,不限分区。不可以多个生产者共用相同的事务ID。

1. 跨会话幂等性失效

幂等性开启后,broker会对每个分区记录生产者状态,并且生产者具有PID,消息被标记为PID加上序列号,数据重复和有序都是在其基础之上运作的。

生产者重启等因素会导致PID变化,导致幂等性短暂失效。

2. 开启事务

因为事务是基于幂等性的,所以幂等性的配置都要有。

package org.dragon.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTransactionTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //配置acks等级
        config.put(ProducerConfig.ACKS_CONFIG, "-1");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.RETRIES_CONFIG, 5);
        // 把buffer改小一点,让测试数据组成更多batch
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
        // 事务ID
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
        //初始化事务
        producer.initTransactions();

        try {
            // 开启事务
            producer.beginTransaction();
            for (int i = 0; i < 10; i++) {
                //创建record
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                        "test2",
                        "" + i,
                        "我是你爹" + i
                );
                //发送record
                Future<RecordMetadata> send = producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("回调信息:消息发送成功");
                    }
                });
                System.out.println("发送数据");
                send.get();
            }
            // 提交事务
            producer.commitTransaction();
        }catch(Exception e) {
            // 中止事务
            producer.abortTransaction();
            e.printStackTrace();
        }finally{
            //关闭producer
            producer.close();
        }
    }
}

3. 事务流程原理

在这里插入图片描述

  1. 查找联系事务管理器

  2. 根据设置的TRANSACTIONAL_ID_CONFIG计算PID,计算方式为哈希值%分区数量

  3. 初始化事务

  4. 将涉及到的分区信息发送给事务管理器,方便事务管理器管理和监控这些分区的事务状态。

  5. 生成数据,发送数据到对应Broker

  6. 对应Broker把分区信息发送给事务管理器,为了确认哪些分区确实已经收到了事务中的消息

  7. 对应Broker返回ACKS

  8. 生产者发起结束事务的请求

  9. 修改事务状态为准备提交

  10. 事务管理器将事务标记为成功或者失败,并通知对应broker。

  11. 修改事务状态为已提交

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1941954.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis的学习(二):常见数据结构及其方法

简介 redis常见的数据结构和他们的常用方法 redis的数据结构 redis是一个key-value的nosql&#xff0c;key一般是字符串&#xff0c;value有很多的类型。 j基本类型&#xff1a; stringhashlistsetsortedSet 特殊类型&#xff1a; GEOBitMapHyperLog key的结构 可以使用…

VScode连接虚拟机运行Python文件的方法

声明&#xff1a;本文使用Linux发行版本为rocky_9.4 目录 1. 在rocky_9.4最小安装的系统中&#xff0c;默认是没有tar工具的&#xff0c;因此&#xff0c;要先下载tar工具 2. 在安装好的vscode中下载ssh远程插件工具 3. 然后连接虚拟机 4. 查看python是否已经安装 5. 下载…

Maven的核心概念

Maven的核心概念 —2020年06月11日 什么是Maven Maven是一款服务于Java平台的自动化构建工具。 约定的目录结构 目录结构&#xff1a; 根目录&#xff1a;工程名src目录&#xff1a;源码pom.xml文件&#xff1a;Maven工程的核心配置文件main目录&#xff1a;存放主程序tes…

Zabbix监控系统:zabbix服务部署+基于Proxy分布式部署+zabbix主动与被动监控模式

一、Zabbix概述 1.1 简介 zabbix 是一个基于 Web 界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。 zabbix 能监视各种网络参数&#xff0c;保证服务器系统的安全运营&#xff0c;提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。 zabbix…

三、GPIO按键读取

在上一篇文章中&#xff0c;我们详细讲解了GPIO的写函数。万事万物都具有一定的相对性&#xff0c;GPIO的操作也不例外。既然有写操作&#xff0c;那么必然也有读操作。有了上一篇文章的基础&#xff0c;理解本篇内容将会更加容易。 一、这篇文章能了解什么 本篇文章将基于上一…

为什么用LeSS?

实现适应性 LeSS是一个产品开发的组织系统&#xff0c;旨在最大化一个组织的适应性。关于适应性&#xff08;或者敏捷性&#xff0c;也就是敏捷开发的初衷&#xff09;我们是指优化&#xff1a; 以相对低的成本改变方向的能力&#xff0c;主要是基于通过频繁交付产生的探索。从…

【Linux 驱动】IMX6ULL eLCDIF驱动

1. eLCDIF设备树 lcdif: lcdif021c8000 {compatible "fsl,imx6ul-lcdif", "fsl,imx28-lcdif"; //属性reg <0x021c8000 0x4000>; //起始地址 地址大小interrupts <GIC_SPI 5 IRQ_TYPE_LEVEL_HIGH>; …

1小时上手Alibaba Sentinel流控安全组件

微服务的雪崩效应 假如我们开发了一套分布式应用系统&#xff0c;前端应用分别向A/H/I/P四个服务发起调用请求&#xff1a; 但随着时间推移&#xff0c;假如服务 I 因为优化问题&#xff0c;导致需要 20 秒才能返回响应&#xff0c;这就必然会导致20秒内该请求线程会一直处于阻…

Prompt Enginnering(提示工程)

什么是提示工程 prompt enginnering是提示工程的意思&#xff0c;也有叫指令工程。 用白话讲&#xff1a;是我们对GPT说出的话&#xff0c;我们向它提问的信息&#xff0c;就是prompt。 官方一点&#xff1a;是我们使用自然语言提示来控制和优化生成式模型&#xff08;生成式…

《Milvus Cloud向量数据库指南》——SPLADE:基于BERT的Learned稀疏向量技术深度解析

在自然语言处理(NLP)领域,随着深度学习技术的飞速发展,预训练语言模型如BERT(Bidirectional Encoder Representations from Transformers)已成为推动研究与应用进步的重要基石。BERT通过其强大的上下文感知能力,在多项NLP任务中取得了显著成效,尤其是在文本表示和语义理…

深入理解Linux网络(五):TCP接收唤醒

深入理解Linux网络&#xff08;五&#xff09;&#xff1a;TCP接收唤醒 TCP接收唤醒由软中断提供服务。 软中断&#xff08;也就是 Linux ⾥的 ksoftirqd 进程&#xff09;⾥收到数据包以后&#xff0c;发现是 tcp 的包的话就会执⾏到 tcp_v4_rcv 函数。接着如果是 ESTABLISH…

GMSSL2.x编译鸿蒙静态库和动态库及使用

一、编译环境准备 1.1 开发工具 DevEco-Studio下载。 1.2 SDK下载 ​ 下载编译第三方库的SDK有两种方式&#xff0c;第一种方式从官方渠道根据电脑系统选择对应的SDK版本&#xff0c;第二种方式通过DevEco-Studio下载SDK。本文只介绍通过DevEco-Studio下载SDK的方式。 安装…

(十一)Spring教程——Bean基本配置与依赖注入之属性注入

1.Bean基本配置 在进行Bean配置的详细讲解之前&#xff0c;先来了解以下Bean配置的基础知识&#xff0c;以快速建立起Bean配置的初步概念。 1.1装配一个Bean 在Spring容器的配置文件中定义一个简要Bean的配置片段如下所示 <bean id”foo” class”com.smart.Foo”/> 一般…

【VSCode】安装 【ESP-IDF】插件及【ESP32-S3】新建工程和工程配置

一、搭建基础工程 二、基础工程的文件架构解析 三、调试相关工具介绍 1、串口下载2、JTAG 下载与调试 四、工程的文件架构解析 五、基础工程配置 一、搭建基础工程 在 VS Code 中新建 ESP-IDF 基础工程的步骤如下&#xff1a; 1、启动 VS Code 并打开命令面板 按下“Ctrl…

AI+BI结合,数据分析新方向 —— 奥威BI数据可视化引领未来

【AIBI结合&#xff0c;数据分析新方向 —— 奥威BI数据可视化引领未来】 在数字化浪潮汹涌的今天&#xff0c;企业对于数据的洞察力与决策效率的需求日益增长。奥威BI&#xff08;Business Intelligence&#xff09;数据可视化解决方案&#xff0c;以其独特的“AIBI”融合创新…

压缩视频在线免费 怎么免费压缩视频大小 哪个软件可以免费压缩视频

在数字媒体时代&#xff0c;视频文件的体积越来越大&#xff0c;这就需要我们找到高效的方式来压缩视频&#xff0c;以节省存储空间和提升分享速度。本文将为您介绍几款免费的视频压缩软件&#xff0c;帮助您轻松应对视频文件管理难题。 方法一、 安装并打开一款的视频软件。 …

通过iframe嵌套的不同域名的页面之间处理cookie存储失败的问题——js技能提升

最近同事在写mvc的后台管理系统&#xff0c;通过iframe实现不同域名的页面的嵌套。 但是有个问题&#xff0c;就是从父页面打开iframe的子页面时&#xff0c;需要登录子页面&#xff0c;此时需要将子页面登录后的token存储到子页面的cookie中&#xff0c;方便子页面的其他接口…

Python 全栈体系【三阶】(三)

第一章 Django 七、静态文件 1. 概述 静态文件是指在WEB应用中的图像文件、CSS文件、Javascript文件。 2. 静态文件的配置 settings.py中关于静态文件的配置如下&#xff1a; STATICFILES_DIRS [BASE_DIR , static, ]STATIC_URL /static/其中&#xff1a; STATICFILES…

Vue 3 + Vite 项目中安装 Tailwind CSS

官网&#xff1a;安装 - TailwindCSS中文文档 | TailwindCSS中文网 tips&#xff1a;只按照官网的配置可能会导致样式不加载/加载不生效的问题 1、正确安装指令 npm install -D tailwindcss postcss autoprefixer npx tailwindcss init -p 自动生成 ​tailwind.config.js​…

鱼眼摄像头-opencv校准(基于棋盘+畸变表)

一&#xff1a;主要参数说明 1&#xff1a;内参矩阵K 是3*3的矩阵&#xff0c;其类似格式 Knp.array([ [389.2109574522624, 0.0, 630.2525667489842], [0.0, 388.505701978078, 360.7886749292513], [0.0, 0.0, 1.0]]) 2&#xff1a;畸变系数 针对鱼眼相机&#xff1a;…