kafka事务(伪事务)

news2025/1/18 10:42:51

事务要点知识

  • Kafka的事务控制原理

主要原理: 开始事务-->发送一个ControlBatch消息(事务开始)

提交事务-->发送一个ControlBatch消息(事务提交)

放弃事务-->发送一个ControlBatch消息(事务终止)

  • 开启事务的必须配置参数(我不支持数据得回滚,但是我能做到,一荣俱荣,一损俱损)

Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// acks
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
// 生产者的重试次数
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
// 飞行中的请求缓存最大数量
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
// 开启幂等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
// 设置事务id
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"trans_001");

 事务控制的代码模板

// 初始化事务
producer.initTransaction( )

// 开启事务    
producer.beginTransaction( )
    
// 干活

// 提交事务
producer.commitTransaction( )


// 异常回滚(放弃事务) catch里面
producer.abortTransaction( )

消费者api是会拉取到尚未提交事务的数据的;只不过可以选择是否让用户看到!

是否让用户看到未提交事务的数据,可以通过消费者参数来配置:

isolation.level=read_uncommitted(默认值)

isolation.level=read_committed

  • kafka还有一个“高级”事务控制,只针对一种场景:

用户的程序,要从kafka读取源数据,数据处理的结果又要写入kafka

kafka能实现端到端的事务控制(比起上面的“基础”事务,多了一个功能,通过producer可以将consumer的消费偏移量绑定到事务上提交)

producer.sendOffsetsToTransaction(offsets,consumer_id)

事务api示例

为了实现事务,应用程序必须提供唯一transactional.id,并且开启生产者的幂等性

properties.put ("transactional.id","transactionid00001");
properties.put ("enable.idempotence",true);

kafka生产者中提供的关于事务的方法如下:

 “消费kafka-处理-生产结果到kafka”典型场景下的代码结构示例:

package com.doit.day04;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class Exercise_kafka2kafka {
    public static void main(String[] args) {

        Properties props = new Properties();
        //消费者的
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shouwei");
        //自动提交偏移量
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //写生产者的一些属性
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //设置ack 开启幂等性必须设置的三个参数
        props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
        props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
        props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
        //开启幂等性
        props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        //开启事务
        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"doit40");

        //消费数据
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //初始化事务
        producer.initTransactions();
        //订阅主题
        consumer.subscribe(Arrays.asList("eventlog"));
        while (true){
            //拉取数据
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            try {
                //开启事务
                producer.beginTransaction();
                for (ConsumerRecord<String, String> record : poll) {
                    String value = record.value();
                    //将value的值写入到另外一个topic中
                    producer.send(new ProducerRecord<String,String>("k2k",value));
                }
                producer.flush();
                //提交偏移量
                consumer.commitAsync();
                //提交事务
                producer.commitTransaction();

            } catch (ProducerFencedException e) {
                //放弃事务
                producer.abortTransaction();
            }
        }
    }
}

事务实战案例

在实际数据处理中,consume-transform-produce是一种常见且典型的场景;

 在此场景中,我们往往需要实现,从“读取source数据,至业务处理,至处理结果写入kafka”的整个流程,具备原子性:

要么全流程成功,要么全部失败!

(处理且输出结果成功,才提交消费端偏移量;处理或输出结果失败,则消费偏移量也不会提交)

要实现上述的需求,可以利用Kafka中的事务机制

它可以使应用程序将消费消息生产消息提交消费位移当作原子操作来处理,即使该生产或消费会跨多个topic分区;

在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。

 控制消息(ControlBatch:COMMIT/ABORT)表征事务是被提交还是被放弃

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/619628.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

瓦坎达的科技真的很厉害吗

就漫威电影宇宙来说&#xff0c;瓦坎达的科技真的很厉害吗 厉不厉害我不敢保证&#xff0c;但是这个IP段的服务器是真的好用 43.241.19.1 43.241.19.2 43.241.19.3 43.241.19.4 43.241.19.5 43.241.19.6 43.241.19.7 43.241.19.8 43.241.19.9 43.241.19.10 43.241.19.11 43.2…

使用ETLCloud强大的自定义规则实现自定义数据处理算法

实时数据处理规则有什么作用&#xff1f; 在大数据中的实时数据采集、ETL批量数据传输过程中很多数据处理过程以及数据质量都希望实时进行处理和检测并把不符合要求的脏数据过滤掉或者进行实时的数据质量告警等。 在数据仓库建设过程中&#xff0c;每家企业的数据处理过程中肯…

Java调用Groovy动态加载接口实现类,热部署不需要编译

pom <dependency><groupId>org.codehaus.groovy</groupId><artifactId>groovy-all</artifactId><version>2.4.3</version></dependency> Java接口&#xff1a; public interface Run {public void speed(int s); } Groovy实…

基于SSM的图书馆借阅管理系统

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 随着社会的发展和信息…

十二、Docker日志管理

Docker日志管理 Docker的日志大致有两种&#xff0c;一是Docker 引擎日志&#xff0c;也就是 dockerd服务自身运行时的日志&#xff1b;二是容器内的服务产生的日志。后一种有一定使用经验的童鞋应该发现有时候我们能通过docker logs查看容器日志&#xff0c;有时候又不能&…

就这水平也去大厂面试?你是怎么敢的啊

面试一直都是一个热门话题&#xff0c;软件测试员当然也逃不过~纵使你是一个技能全部满点的超优秀软件测试员&#xff0c;卡在面试这一关也是万万不可的。特别是大厂的测试员&#xff0c;他们面试所问的东西&#xff0c;你在学校通常接触不道&#xff0c;所以没有哪个应届生是一…

国内元宇宙游戏陆续开放,将带来科技娱乐新鲜感

自元宇宙概念兴起至今&#xff0c;国内多家互联网厂商纷纷参与其中&#xff0c;除了在区块链、人工智能、图像处理、云计算等专业技术领域深入布局&#xff0c;也通过大范围投资或收购的方式来扩大自身的元宇宙游戏版图。围绕这一新兴概念&#xff0c;一场产、学、研的实践正在…

高效科研工具(三):高效检索阅读paper-arXiv网站(arXiv使用大全、掌握研究热点、前沿动态)

高效科研工具&#xff08;三&#xff09;&#xff1a;高效检索阅读paper-arXiv网站&#xff08;arXiv使用大全、掌握研究热点、前沿动态&#xff09; 目录 0、前言&#x1f60f; 1、arXiv网站介绍&#x1f9d0; 2、arXiv网站使用&#x1f60e;&#x1f60e; 3、arXiv网站使用…

语义分割结果可视化(原图+语义掩码+图例)

语义分割结果可视化&#xff08;原图语义掩码图例&#xff09; 由于实习工作需要把语义分割结果可视化出来&#xff0c;要使用自定义颜色来区分不同的label&#xff0c;并绘制出图例并插入在图像右端。本文将介绍如何实现这样的语义分割结果图。 文章目录 语义分割结果可视化&a…

Mysql执行计划怎么看

执行计划就是sql的执行查询的顺序&#xff0c;以及如何使用索引查询&#xff0c;返回的结果集的行数 EXPLAIN SELECT * from A where X? and Y? 1.id :是一个有顺序的编号&#xff0c;是查询的顺序号&#xff0c;有几个 select 就显示几行。id的顺序是按 select 出现的顺序增…

13 【代理配置 插槽】

1.Vue脚手架配置代理 本案例需要下载axios库npm install axios 配置参考文档 Vue-Cli devServer.proxy vue.config.js 是一个可选的配置文件&#xff0c;如果项目的 (和 package.json 同级的) 根目录中存在这个文件&#xff0c;那么它会被 vue/cli-service自动加载。你也可以…

老人跌倒检测识别预警算法 yolov7

老人跌倒检测识别预警系统采用yolov7网络模型技术&#xff0c;老人跌倒检测识别预警算法对老人的行为进行实时监测。当老人发生跌倒时&#xff0c;系统将自动发出警报&#xff0c;及时通知现场护理人员进行处理。YOLOv7 的发展方向与当前主流的实时目标检测器不同&#xff0c;研…

MySQL表操作:提高数据处理效率的秘诀(进阶)(1)

&#x1f495;**“生命不在于相信奇迹&#xff0c;而在于创造奇迹。”——朱学恒**&#x1f495; &#x1f43c;作者&#xff1a;不能再留遗憾了&#x1f43c; &#x1f386;专栏&#xff1a;MySQL学习&#x1f386; &#x1f697;本文章主要内容&#xff1a;MySQL对表操作进阶…

Node.js从基础到项目实践

摘要&#xff1a; Node.js是一个基于Chrome V8引擎的JavaScript运行时环境&#xff0c;它允许开发者使用JavaScript构建高性能的服务器端应用程序。本文将介绍Node.js的基础知识&#xff0c;并通过项目实践的方式帮助读者深入理解Node.js的用法和功能。从基础到项目实践&#x…

建造者模式的运用

文章目录 一、建造者模式的运用1.1 介绍1.2 建造者模式结构1.3 建造者模式类图1.4 组装自行车案例1.4.1 组装自行车案例类图1.4.2 代码 1.5 优缺点1.6 使用场景1.7 模式扩展1.7.1 原始代码1.7.2 重构代码 一、建造者模式的运用 1.1 介绍 将一个复杂对象的构建与表示分离&…

Linux系统下SQLite基础使用命令

1.选择数据库 sqlite 数据库.db 当出现sqlite> 表示成功 2.列出数据库中的表 .table 这一步是在上一步成功选择数据库的基础上进行的, 当出现event, 表示event-20230407这个数据库中有且只有一张表, 表名叫event 3.查询表中数据 select * from 表名; (此处注意记得一定…

【假捻发加工生产工单下达】

假捻工单是需要下到工作中心的,比如A01机台或者A02机台。 所以下工单之前要先查询A01机台上的最新工单量。 查询结果如下: 她会按照创建时间进行排序,后下的工单排在最前面 (如果下了个新工单,那么前一个工单的执行状态会自动改为关闭。) 因此查询结果,最上面的工单执…

外贸跨境商城app,多语言,多货币,跨境电商系统开发

外贸跨境商城app是一种在线购物平台&#xff0c;专门为海外买家提供跨境电商服务。用户可以在该app上浏览和购买来自全球各地的商品&#xff0c;以及与卖家进行沟通和交流。这些商品可能包括服装、鞋子、家居用品、电子产品等等&#xff0c;价格也跨越了各个层次。外贸跨境商城…

正确认识糖化学试剂:120173-57-1,Fmoc-Ser(Ac3GalNAcα)-OH的参数和保存方法

&#xff08;文章资料汇总来源于&#xff1a;陕西新研博美生物科技有限公司小编MISSwu&#xff09;​ 【中文名称】N-芴甲氧羰基-O-(2-乙酰氨基-3,4,6-三-O-乙酰基-2-脱氧-α-D-吡喃半乳糖基)-L-丝氨酸 【英文名称】 Fmoc-Ser(Ac3GalNAcα)-OH 【结 构 式】 【CAS号】120173-…

AWTK实现汽车仪表Cluster/DashBoard嵌入式GUI开发(四):拖拽式GUI开发

前言: 如何做出炫酷的嵌入式GUI界面?GUI(Graphical User Interface) :图形化操作界面。新能源汽车对于炫酷GUI的需求也是越来越强烈。 AWTK全称为Toolkit AnyWhere,是ZLG倾心打造的一套基于C语言开发的GUI框架。旨在为用户提供一个功能强大、高效可靠、简单易用、可轻松…