Flink+Pulsar、Kafka问题分析及方案 -- 事务阻塞

news2025/1/12 18:57:59

Pulsar、Kafka的事务设计

Pulsar跟Kafka在设计事务功能时,在消费者读取消息的顺序方面,都采用了类似的设计。
比如说,先创建txn1,然后创建txn2,这两个事务生产消息到同一个topic/partition里,但是txn2比txn1先完成了,这个时候该不该让txn2生产的消息给consumer读取到?

Kafka设计文档中介绍如下:
Discussion on Transaction Ordering. In this design, we are assuming that the consumer delivers messages in offset order to preserve the behavior that Kafka users currently expect. A different way is to deliver messages in “transaction order”: as the consumer fetches commit markers, it enables the corresponding messages to be consumed.
在这里插入图片描述
Kafka采用的是offset order,就是说按照消息持久化的顺序来分发给consumer,而不是事务完成的顺序。
即txn2比txn1先完成了,也不会让txn2的消息立刻让consumer读取到,必须等到txn1完成才行。这种事务之间可能会有阻塞的行为,是用户必须要知晓的。
另外,这种阻塞的现象仅限于在同一个topic/partition,多个topic/partition之间的事务不会互相阻塞。

Pulsar也采用了类似的设计。
They are dispatched in published order instead of committed order. Since the consumer can only read messages before maxReadPosition, it increases end-to-end latency.
deep-dive-into-transaction-buffer-apache-pulsar

可以使用下面的测试代码进行验证:

public class TransactionTest {
    private static final String topicName = "persistent://test/tb1/testTxn1";
    static PulsarAdmin admin;

    static PulsarClient client;

    static ProducerBuilder<byte[]> producerBuilder;

    static ConsumerBuilder<byte[]> consumerBuilder;

    @BeforeClass
    public static void initialize() throws PulsarClientException {
        admin = PulsarAdmin.builder()
                .serviceHttpUrl("http://164.90.77.83:8081")
                .build();

        client = PulsarClient.builder()
                .serviceUrl("pulsar://164.90.77.83:6650")
                .enableTransaction(true)
                .build();

        producerBuilder = client.newProducer()
                .sendTimeout(0,TimeUnit.SECONDS)
                .topic(topicName);
        consumerBuilder = client.newConsumer()
                .topic(topicName);

    }

    @AfterClass
    public static void end() throws PulsarClientException {
        admin.close();
        client.close();
    }

    /**
     * 使用两个事务txn1, txn2, txn2完成后查看是否能读取数据。
     * 结论:
     * 1. txn2由于在txn1后面创建,所以尽管txn2完成了,但是txn1没完成,就会阻塞txn2,txn2的数据不会被读取到。
     * 2. txn1完成后,txn1、txn2的数据都能读取到,而且数据的顺序跟数据的produce顺序是相同的。(注意:不是跟事务的创建顺序相同)
     */
    @Test
    public void task() throws Exception{
        admin.topics().resetCursor(topicName + "-partition-0", "my-subscription", MessageId.latest);
        Consumer<byte[]> consumer=consumerBuilder.clone()
                .subscriptionName("my-subscription")
                .subscribe();
        Producer<byte[]> producer=producerBuilder.clone()
                .create();
        Transaction transaction1 = client.newTransaction().build().join();
        Transaction transaction2 = client.newTransaction().build().join();
        producer.newMessage(transaction1).value("transaction1 test".getBytes()).send();
        producer.newMessage(transaction2).value("transaction2 test".getBytes()).send();
        Message<byte[]> message;

        // commit txn2, but not commit txn1
        transaction2.commit().join();
        message = consumer.receive(8, TimeUnit.SECONDS);
        assert message == null;

        // commit txn1
        transaction1.commit().join();
        message = consumer.receive(8, TimeUnit.SECONDS);
        assert message != null;
        assert new String(message.getData()).equals("transaction1 test");
        consumer.acknowledge(message);

        message = consumer.receive(8, TimeUnit.SECONDS);
        assert message != null;
        assert new String(message.getData()).equals("transaction2 test");
        consumer.acknowledge(message);
    }
}


下面是Kafka的测试代码:

public class KafkaProducerTransactionalUnitTest {
    private static final String TOPIC_NAME = "test_topic";
    private static final String KAFKA_ADDRESS = ":9092";
    private static final String CLIENT_ID = "test_client_id";
    private static final int TRANSACTION_TIMEOUT_IN_MS = 3000;

private static KafkaProducer<String, String> kafkaProducer;
    @BeforeAll
    public static void init() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_ADDRESS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka_producer_id");
        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_IN_MS);

        kafkaProducer = new KafkaProducer<>(props);
        kafkaProducer.initTransactions();
    }

    @AfterAll
    public static void cleanup() {
        kafkaProducer.close();
}

    /**
     * start two producer with different transactionalId, produce messages to the same topic.
     * producer1 start txn1 -> producer2 start txn2 -> producer2 commit txn2 -> consumer consume messages.
     * check if consumer can consume messages in txn2.
     * result: txn1 will block the messages in txn2.
     */
    @Test
    public void testTwoProducerStuck() {
        // Set up producer2 properties
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_ADDRESS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka_producer2_id");
        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30000);

        KafkaProducer<String, String> kafkaProducer2 = new KafkaProducer<>(props);
        kafkaProducer2.initTransactions();

        // set up consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_ADDRESS);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test_transaction");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        try {
            // start txn1
            kafkaProducer.beginTransaction();
            String message = "Hello, Kafka";
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            kafkaProducer.send(record);
            // kafkaProducer.commitTransaction();

            // start txn2 and commit
            kafkaProducer2.beginTransaction();
            message = "Hello, Kafka2";
            record = new ProducerRecord<>(TOPIC_NAME, message);
            kafkaProducer2.send(record);
            kafkaProducer2.commitTransaction();

            // consume messages and check
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
            assert records.isEmpty();

            // commit txn1
            kafkaProducer.commitTransaction();

            // consume messages and check
            records = consumer.poll(Duration.ofSeconds(5));
            for (ConsumerRecord<String, String> record1 : records) {
                System.out.printf("offset = %d, value = %s%n", record1.offset(), record1.value());
            }
            assert !records.isEmpty();
        } catch (Exception ex) {
            System.out.println(ex);
            Assertions.fail("Failed to produce message with transaction");
        } finally {
            kafkaProducer2.close();
        }
    }

}

在这里插入图片描述

问题分析及方案

问题

Pulsar、Kafka这么设计的原因当然还是为了流式场景,因此,当我们尝试在其他场景中使用Kafka、Pulsar,那么这种设计就可能造成不少的麻烦。

就Flink而言,问题主要在于残留某些OPEN的事务没完结,导致后续的事务数据无法读取到。
有下面一些情形:

  • case1:
    Flink可能有不从Checkpoint/Savepoint启动的场景:

    • 比如说用户创建一个topic,进行一些测试工作,这个时候会残留一些事务没结束掉。测试完成后准备用于线上生产,这个时候就会不从Checkpoint/Savepoint启动,而如果该topic还残留了OPEN的事务,就会导致线上生产的事务数据都无法读取到。
    • 还有可能前面启动过任务,但是从来没成功打过checkpoint,这也会残留OPEN的事务
  • case2:
    Flink从Checkpoint/Savepoint启动时,对于前面没完结的事务都会存储在Checkpoint/Savepoint里,启动时会调用recoverAndCommit/recoverAndAbort方法来处理掉,一般这是没啥问题的。但是如果Flink任务成功执行了initializeState方法,即成功创建了新事务,但是在成功执行snapshotState前就失败挂掉了,则这个新创建的事务也是无法记录到Checkpoint/Savepoint里的,因此也会导致遗漏某些事务没有被完结。这种case也是很常见的。
    在这里插入图片描述

简单分析

  • Kafka因为使用的事务ID号都是固定的,因此使用固定的事务ID号去abort残留的事务即可。
  • Pulsar事务ID号是不断变化的,不从checkpoint/snapshot启动就无法得知事务ID号,也就无法执行abort操作。
    因为Pulsar事务commit操作幂等性的PR引入了一个clientName的配置每个客户端使用的clientName都是固定的,因此我们可以增加一个接口,abort掉clientName对应的所有事务。

细节分析

Kafka方案

参考FlinkKafkaProducer 源码分析的 Abort残留的事务小节。

Pulsar方案

Pulsar由于是根据clientName去abort事务,而且一个subtask只有一个clientName,即
prefix + “-” + subtaskIndex (同一个job的多个并发子任务的prefix值都是相同的)
在这里插入图片描述
因此直接根据这一个clientName去abort即可。
所有并发子任务的clientName范围为[0 , parallelism)。

针对前面两个case进行处理:

  • case1:从checkpoint/savepoint中恢复
    只需要abort掉当前ClientName对应的事务即可,即下面第一个红框部分代码。
    在这里插入图片描述

  • case2:不从checkpoint/savepoint中恢复
    跟kafka一样,可能会发生重启任务后并发度增大的情况,假设前面启动时的并发度为P1,当前启动的并发度为P2,因此前面的任务执行使用的ClientName范围为[0,P1 ),我们需要把这些ClientName对应的事务都abort一次,但是P1是不可知的。

    • 如果P2大于P1,即增加并发度,则[0,P2 )肯定包含[0,P1 ),此时对[0,P2 ) 遍历abort一次即可。
    • 如果P1大于P2,即降低并发度,则[0,P2 )[0,P1 )的子集,此时无法猜测要abort多少事务ID。
      我们采用跟Kafka一样的策略:设定一个参数safeScaleDownFactor,即Flink任务减低并发度的比例不能超过这个,默认值为5。
      比如说,第一次启动Flink任务的并发度为10,则第二次启动Flink任务的并发度至少为10/5=2。
      通过这种方式,我们得知P1、P2的关系:P2*5>=P1
      [0,P2 * 5 )肯定包含[0,P1 )
      因此,我们对[0,P2 * 5 )范围内的ClientName都abort一次即可。
      [0,P2 * 5 )范围内的ClientName还会平均分摊到当前P1个子任务去abort,各个子任务之间不会互相干扰,操作的ClientName都是不重叠的。
      在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541259.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【前端知识】常见的加密算法介绍

【前端知识】常见的加密算法介绍 1 常见的加密算法&#xff08;1&#xff09;哈希函数&#xff08;2&#xff09;对称加密&#xff08;3&#xff09;非对称加密&#xff08;4&#xff09;消息认证码&#xff08;MAC&#xff09; 2.总结 1 常见的加密算法 略微介绍一下前端中常…

Kerberos

序言 kerberos 除了说帮我们验证Java程序是否具有权限来请求Hadoop的服务,也可以来帮助我们检查新增的节点是是否是真实的节点,还是黑客为了套取数据的节点. 比如为HDFS新增一个DataNode节点,如果没有Kerberos验证, 随便一个节点只要连接上NameNode就会存储数据,黑客就可以获…

LeetCode:23. 合并 K 个升序链表

23. 合并 K 个升序链表 1&#xff09;题目2&#xff09;过程3&#xff09;代码1. 最开始2.初步优化 4&#xff09;结果1. 最开始2. 初步优化 1&#xff09;题目 给你一个链表数组&#xff0c;每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中&#xff0c;返回合…

机器学习基础认识(一)

机器学习应用 机器学习的应用&#xff0c;主要分为两类&#xff1a;预测、分类 预测&#xff0c;一般是指&#xff1a;根据数据&#xff0c;预测数值 分类&#xff0c;一般是指&#xff1a;根据数据&#xff0c;进行分类 预测与分类的关系【个人理解】 分类&#xff0c;本质…

零基础怎么入门网络安全?看这篇就够啦!

由于我之前写了不少网络安全技术相关的故事文章&#xff0c;不少读者朋友知道我是从事网络安全相关的工作&#xff0c;于是经常有人在微信里问我&#xff1a; 我刚入门网络安全&#xff0c;该怎么学&#xff1f;要学哪些东西&#xff1f;有哪些方向&#xff1f;怎么选&#xff…

Centos7.6部署postgresql15主从

目录 安装pg15&#xff08;master和standby&#xff09;主数据库配置(master)初始化数据库创建归档日志目录设置数据库访问权限修改数据库配置文件开启数据库 从数据库配置(standby)同步主库的数据文件创建文件standby.signal启动从数据库 主从状态验证master上验证standby上验…

H5性能测试怎么做?这些关键指标你得搞清楚

目录 01、Http相关 02、组件是否压缩 03、图片格式和大小是否合适 04、CSS放在顶部 05、JS放在底部 06、JS &CSS压缩 07、是否添加缓存 08、避免非200返回值 09、使用CDN 03、WebView相关 学习资源分享 软件测试面试小程序 01、Http相关 01、Http请求个数 有…

新星计划 Electron+vue2 桌面应用 1 基础

/(ㄒoㄒ)/~~报名了两个新星计划&#xff0c;工作之余写博客…… 另外一个是uniapp的属于个人兴趣&#xff0c;这个桌面应用正好符合工作需要。 活动地址&#xff1a;https://marketing.csdn.net/p/1738cda78d47b2ebb920916aab7c3584 教程地址&#xff1a; 2023新星导师活动…

Java实现PDF导出/预览

网上有很多关于PDF导出的文章&#xff0c;但是个人感觉实现的过于复杂&#xff0c;又是模板又是html的&#xff0c;有的还需要字体模板的支持&#xff0c;本片文章只是实现简单的PDF表格导出&#xff0c;可以实现PDF动态表格导出/预览&#xff0c;这类文章网上很少&#xff0c;…

实践「容器镜像扫描」,Get 云原生应用的正确打开方式

&#x1f31f; 容器技术的兴起&#xff0c;让应用程序更加轻量化和可移植&#xff0c;大大提高了应用交付效率。但容器中的应用也面临各种安全威胁&#xff0c;容器及其镜像安全不可小觑。 近日&#xff0c;在「DevSecOps 软件安全开发实践」课程上&#xff0c;极狐(GitLab) 高…

Linux设置系统时间(上海时区、硬件时间、重启有效)

#查看时间 date#删除当前时区 rm -rf /etc/localtime #修改默认时区为上海 ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime #设置硬件时间 月/日/年 时:分:秒 hwclock --set --date"05/18/2023 17:11:15"#设置系统时间和硬件时间同步 hwclock --hctosys#保…

大数据发展前沿复习

对抗学习 生成对抗网络&#xff08;GAN&#xff09;是非监督式学习的一种方法&#xff0c;透过两个神经网络相互博弈的方式进行学习。生成对抗网络由一个生成网络与一个判别网络组成。生成网络以随机取样作为输入&#xff0c;其输出结果需要尽量模仿训练集中的真实样本。判别网…

vmware17pro安装激活ubuntu22版本最新教程无废话

第一步&#xff1a;下载 下载很方便 官方一键下载链接 第二步 安装 点下一步&#xff0c;一键安装即可&#xff0c;有可能会重启电脑&#xff0c;没关系的&#xff0c;是安全的 第三步&#xff1a;ji活 懂得都懂这是什么 JU090-6039P-08409-8J0QH-2YR7F 4A4RR-813DK-M81A9…

C语言算法--快速排序法

C语言算法–快速排序法 1-什么是快速排序法 快速排序&#xff08;Quicksort&#xff09;是一种常用的排序算法&#xff0c;它基于分治的思想。它的核心思想是选择一个基准元素&#xff0c;将数组划分为两个子数组&#xff0c;使得左边的子数组中的所有元素都小于等于基准元素…

【Flutter开发】Navigator2.0介绍及使用

目录 Navigator1.0Navigator2.0APPRouteInformationParserRouterDelegate 问题The Navigator.pages must not be empty to use the Navigator.pages API浏览器的回退按钮 总结 Navigator1.0 我们学习flutter一开始接触的路由管理就是Navigator1.0&#xff0c;它非常方便&#…

JAVA-Activiti 7与达梦、人大金仓兼容-nacos、服务pom文件配置(2)

目录 第一步,修改nacos服务配置 >需注意< 第二步,pom.xml依赖包配置 Activiti的源码包解决之后,接下来就好做很多了 第一步,修改nacos服务配置 spring:datasource:url: jdbc:kingbase8://127.0.0.1:54321/progress?currentSchemaprogress,productNamePostgreSQL,SYS…

保密+完整+可用+安全,规避代码安全「马奇诺防线」,构建软件供应链整体安全

近日&#xff0c;在「江狐会」广州站上&#xff0c;极狐(GitLab) 高级解决方案架构师武让分享了如何通过三大阶段 四大要点&#xff0c;规避代码安全「马奇诺防线」&#xff0c;真正确保软件供应链安全。以下内容整理自本次演讲。Enjoy&#xff5e; 先跟大家分享一个故事 一战…

计算机体系结构|MIT6.175和MIT6.375学习笔记

在2023年初&#xff0c;达坦科技发起成立硬件设计学习社区&#xff0c;邀请所有有志于从事数字芯片设计的同学加入我们的学习互助自学小组&#xff0c;以理解数字芯片设计的精髓&#xff0c;强化理论知识的同时提升实操技能&#xff0c;继而整体提升设计能力。现在&#xff0c;…

Vmware虚拟机安装MacOS13-Ventura详细教程

小编亲测 前提准备 功能强大的 Windows 电脑&#xff08;不能太差&#xff0c;不然会卡&#xff09;至少8GB内存默认是80GB的存储空间VMWare Workstation&#xff08;版本应该没什么需求&#xff0c;我装的是VMware Workstation 17 Pro&#xff09;Unlocker解锁软件MacOS Ventu…

最快的 Houdini 和 V-Ray 云渲染服务

Houdini是SideFX开发的一款3D动画软件应用。Houdini 最常用于 FX 部门&#xff0c;用于在电影和游戏中创建视觉效果。它被主要的 VFX 公司使用&#xff0c;例如 Walt Disney Animation Studios、Pixar、DreamWorks Animation、Double Negative、ILM、MPC、Framestore 等。Houdi…