flink重温笔记(十二): flink 高级特性和新特性(1)——End-to-End Exactly-Once(端到端精确一致性语义)

news2025/1/7 7:22:04

Flink学习笔记

前言:今天是学习 flink 的第 12 天啦!学习了 flink 高级特性和新特性之 End-to-End Exactly-Once(端到端精确一致性语义),主要是解决大数据领域数据从数据源到数据落点的一致性,不会容易造成数据丢失的问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:端到端的一致性语义,说明每一步都算术,每一天的努力都不会白费,明天也要继续努力!


文章目录

  • Flink学习笔记
    • 四、Flink 高级特性和新特性
      • 1. End-to-End Exactly-Once
        • 1.1 流处理的数据语义
          • 1.1.1 At most once(最多一次)
          • 1.1.2 At least once(至少一次)
          • 1.1.3 Exactly once(精确一次)
          • 1.1.4 End to End Exactly once(端到端精确一次)
          • 1.1.5 流计算系统如何支持一次性语义
            • (1) At least once + 去重
            • (2) At least once + 幂等
            • (3) 分布式快照
            • (4) 方法汇总
        • 1.2 End-to-End Exactly-Once 实现
          • 1.2.1 Source
          • 1.2.2 Transformation
          • 1.2.3 sink
        • 1.3 Flink + Kafka 的 End-to-End Exactly Once
          • 1.3.1 版本声明
          • 1.3.2 两阶段提交-API
          • 1.3.3 两阶段提交-流程
        • 1.4 案例演示
          • 1.4.1 Flink + Kafka 实现 End-to-End Exactly Once
          • 1.4.2 Flink + MySQL 实现 End-to-End Exactly Once

四、Flink 高级特性和新特性

1. End-to-End Exactly-Once

1.1 流处理的数据语义

顺序:At most once(最多一次)< At least once(至少一次)< Exactly once(精确一次)< End to End Exactly once(端到端一次)


1.1.1 At most once(最多一次)

最简单的恢复方式,直接从失败的下个数据恢复程序,丢失刚刚失败的数据。


1.1.2 At least once(至少一次)

由于事件是可以重传的,可能造成数据重复。


1.1.3 Exactly once(精确一次)

依赖 checkpoint 机制,回滚恢复数据,保持所有记录仅影响内部状态一次,即不考虑部分数据泄露到下游。


1.1.4 End to End Exactly once(端到端精确一次)

Flink 应用从 Source 端开始到 Sink 端结束,保持所有记录影响内部和外部状态一次,即考虑部分数据泄露到下游。


1.1.5 流计算系统如何支持一次性语义
(1) At least once + 去重


(2) At least once + 幂等


(3) 分布式快照


(4) 方法汇总
Exactly Once 实现方式优点缺点
At least once + 去重1. 故障对性能的影响是局部的;
2. 故障的影响不一定随着拓扑大小而增加
1. 可能需要大量的存储和基础设施来支持;
2. 每个算子的每个事件都有资源开销
At least once + 幂等1. 实现简单,开销较低1. 依赖存储特性和数据特征
分布式快照1. 较小的性能和资源开销1. barrier 同步;
2. 任何算子发生故障都需要全局暂停和状态回滚;
3. 拓扑越大,对性能的潜在影响越大

1.2 End-to-End Exactly-Once 实现
1.2.1 Source

发生故障时需要支持重设数据的读取位置,如Kafka可以通过offset来实现(其他的没有offset系统,可以自己实现累加器计数)


1.2.2 Transformation
  • 分布式快照机制

    • 同 Spark 相比,Spark 仅仅是针对 Driver 的故障恢复 Checkpoint,
    • 而 Flink 的快照可以到算子级别,并且对全局数据也可以做快照,
    • Flink 的分布式快照受到 Chandy-Lamport 分布式快照算法启发,同时进行了量身定做。
  • Barrier

    • 数据栅栏是一个标记,不会干扰正常数据处理,
    • 一个数据源可以有多个 barrier,
    • 多个数据源,快流等慢流。
  • 异步和增量

    • 异步快照不会阻塞任务,
    • 增量快照,每次进行的全量快照是根据上一次更新的。

1.2.3 sink
  • 幂等写入

    • 任意多次向一个系统写入数据,只对目标系统产生一次结果影响。
    • key,和 value 可以控制不重复
  • 事务写入

    • 借鉴数据库的事务机制,结合自身 checkpoint 机制,

    • 分阶段快照,先保存数据不向外部系统提交,checkpoint 确认过上下游一致后,才向外部系统 commit。

    • 实现方式:

      • 1- 预写日志(Write-Ahead-Log,WAL)

        通用性强,但不能保证百分比,因为要写入内存这个易失介质。

      • 2- 两阶段提交(Two-Phase-Commit,2PC)

        如果外部系统自身支持事务(比如MySQL、Kafka),可以使用2PC方式,百分百端到端的Exactly-Once。

    • 缺点:

      • 牺牲了延迟
      • 输出不是实时写入,而是分批写入

1.3 Flink + Kafka 的 End-to-End Exactly Once
1.3.1 版本声明

Flink 1.4 版本之前,支持 Exactly Once 语义,仅限于应用内部。

Flink 1.4 版本之后,通过两阶段提交 (TwoPhaseCommitSinkFunction) 支持 End-To-End Exactly Once,而且要求 Kafka 0.11+。


1.3.2 两阶段提交-API

实现方法封装在抽象类:TwoPhaseCommitSinkFunction ,重写方法:

  • beginTransaction:

    开启事务前,在目标文件系统的临时目录中创建一个临时文件,处理数据时将数据写入此文件;

  • preCommit:

    在预提交阶段,刷写(flush)文件,然后关闭文件,之后就不能写入到文件了,将为下一检查点的任何后续写入启动新事务;

  • commit:

    在提交阶段,将预提交的文件原子性移动到真正的目标目录中,注意,会增加输出数据可见性的延迟;

  • abort:

    在中止阶段,删除临时文件。


1.3.3 两阶段提交-流程
  • 1- 数据源阶段

    对接数据源系统

  • 2- 预提交阶段(pre-commit)-内部状态

    Flink 开始 checkpoint,就会进入 pre-commit 阶段,同时 JobManager 的 Coordinator 会将 Barrier 注入数据流中

  • 3- 预提交阶段(pre-commit)-外部状态

    当所有的 barrier 在算子中成功进行一遍传递(就是 Checkpoint 完成),并完成快照后,则“预提交”阶段完成;

  • 4- commit 阶段

    所有算子完成“预提交”,就会发起一个commit“提交”动作,任何一个“预提交”失败都会回滚到最近的 checkpoint;


1.4 案例演示
1.4.1 Flink + Kafka 实现 End-to-End Exactly Once

例子1:普通方式——内部一致性语义,重点在生产者 API 设置上,只是简单序列化为字节流 SimpleStringSchema

package cn.itcast.day12.endtoend;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;


import java.util.Properties;

/**
 * @author lql
 * @time 2024-03-07 14:51:04
 * @description TODO:topic:test3 终端生产生数据,控制台打印 topic:test4数据
 */
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce {
    public static void main(String[] args) throws Exception {

        //todo 1)初始化flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        
        //todo 2)判断当前的环境
        env.setStateBackend(new HashMapStateBackend());
        if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){
            env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");
        }else{
            env.getCheckpointConfig().setCheckpointStorage(args[0]);
        }

        //todo 3)设置checkpoint的其他参数
        //设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(2000L);
        //同一个时间只能有一个栅栏在运行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //设置checkpoint的执行模式。仅执行一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);


        //todo 4)接入数据源
        //指定topic的名称
        String topicName = "test";
        //实例化kafkaConsumer对象
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offset
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);
        //在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
        kafkaSource.setCommitOffsetsOnCheckpoints(true);

        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //todo 5)单词计数操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //todo 6)单词分组操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result_1 = wordAndOne
                .keyBy(t -> t.f0)
                .sum(1);

        //todo 7)打印计算结果
        result_1.print();

        SingleOutputStreamOperator<String> result = result_1.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + "_" + value.f1;
            }
        });

        result.printToErr();

        //todo 8)创建kafka的生产者实例
        //指定topic的名称
        String distTopicName = "test1";
        //实例化FlinkKafkaProducer对象
        Properties distProps = new Properties();
        distProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
                distTopicName,
                new SimpleStringSchema(),
                distProps
        );// 容错
        //todo 4)将数据写入到kafka
        result.addSink(myProducer);

        //todo 8)启动作业
        env.execute();
    }
}

结果:在 node1 的 kafka 生产者模式终端输入数据到 test,词频统计结果写入到 topic:test1,但不保证外部一致性语义


例子2:超级方式——内部外部一致性语义

package cn.itcast.day12.endtoend;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @author lql
 * @time 2024-03-07 14:51:04
 * @description TODO:topic:test3 终端生产生数据,控制台打印 topic:test4数据
 */
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce_pro {
    public static void main(String[] args) throws Exception {

        //todo 1)初始化flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 在这里就不能开启了,因为 kafka
        //env.enableCheckpointing(5000);
        //todo 2)判断当前的环境
        env.setStateBackend(new HashMapStateBackend());
        if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){
            env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");
        }else{
            env.getCheckpointConfig().setCheckpointStorage(args[0]);
        }

        //todo 3)设置checkpoint的其他参数
        //设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(2000L);
        //同一个时间只能有一个栅栏在运行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //设置checkpoint的执行模式。仅执行一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);


        //todo 4)接入数据源
        //指定topic的名称
        String topicName = "test";
        //实例化kafkaConsumer对象
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offset
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);
        //在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
        kafkaSource.setCommitOffsetsOnCheckpoints(true);

        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //todo 5)单词计数操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //todo 6)单词分组操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result_1 = wordAndOne
                .keyBy(t -> t.f0)
                .sum(1);

        //todo 7)打印计算结果
        result_1.print();

        SingleOutputStreamOperator<String> result = result_1.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + "_" + value.f1;
            }
        });

        result.printToErr();

        //todo 8)创建kafka的生产者实例
        //指定topic的名称
        String distTopicName = "test1";
        //实例化FlinkKafkaProducer对象
        Properties distProps = new Properties();
        distProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
                distTopicName,
                new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),
                distProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );
        //todo 4)将数据写入到kafka
        result.addSink(myProducer);

        //todo 8)启动作业
        env.execute();
    }
}

结果:在 node1 的 kafka 生产者模式终端输入数据到 test,词频统计结果写入到 topic:test1,保证了内外部一致性语义

总结:

  • 在普通模式设置下,需要提前开启 checkpoint 模式
  • 在超级模式设置下,不要提前开启 checkpoint 模式,不然写不进数据
  • 在超级模式设置下,不是简单序列化而是事务写入:
    • new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
    • FlinkKafkaProducer.Semantic.EXACTLY_ONCE

1.4.2 Flink + MySQL 实现 End-to-End Exactly Once

例子:读取 socket 数据,写入 MySQL 数据库,删除数据库数据,也能继续累加结果,实现端到端一致性。

SQL建表:

create table test.t_wordcount
(
    word   varchar(255) not null primary key,
    counts int default 0 null
);

代码:

package cn.itcast.day12.endtoend;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Kafka_Flink_MySQL_EndToEnd_ExactlyOnce {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        //todo 2)如果实现端对端一次性语义,必须要开启checkpoint
        env.enableCheckpointing(5000L);

        //todo 3)判断当前的环境
        env.setStateBackend(new HashMapStateBackend());
        if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){
            env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");
        }else{
            env.getCheckpointConfig().setCheckpointStorage(args[0]);
        }

        //todo 4)设置checkpoint的其他参数
        //设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(2000L);
        //同一个时间只能有一个栅栏在运行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //设置checkpoint的执行模式。仅执行一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);

        //todo 5)接入数据源,读取文件获取数据
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //todo 3)数据处理
        //  3.1:使用flatMap对单词进行拆分
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                //返回数据
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        //  3.2:对拆分后的单词进行记一次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //  3.3:使用分组算子对key进行分组
        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

        //  3.4:对分组后的key进行聚合操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);

        //todo 6)将消费到的数据实时写入mysql
        sumed.addSink(new MysqlTwoPhaseCommitSink());

        //todo 7)运行作业
        env.execute();
    }

    /**
     * 通过两端递交的方式实现数据写入mysql
     */
    public static class MysqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, ConnectionState, Void> {

        public MysqlTwoPhaseCommitSink() {
            super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
        }

        /**
         * 每条数据执行一次该方法
         * @param connectionState
         * @param value
         * @param context
         * @throws Exception
         */
        @Override
        protected void invoke(ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception {
            System.err.println("start invoke.......");
            Connection connection = connectionState.connection;
            // 插入一条记录,但如果该记录的主键或唯一键已经存在,则更新该记录。
            PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?");
            pstm.setString(1, value.f0);
            pstm.setInt(2, value.f1);
            pstm.setInt(3, value.f1);
            // 插入数据一定是 executeUpdate
            pstm.executeUpdate();
            pstm.close();
            //手动制造异常
            if(value.f0.equals("hive")) {
                System.out.println(1/0);
            }
        }

        /**
         * 开启事务
         * @return
         * @throws Exception
         */
        @Override
        protected ConnectionState beginTransaction() throws Exception {
            System.out.println("=====> beginTransaction... ");
            Class.forName("com.mysql.jdbc.Driver");
            //closing inbound before receiving peer's close_notify,链接地址中追加参数:useSSL=false
            Connection connection = DriverManager.getConnection("jdbc:mysql://node1:3306/test?characterEncoding=UTF-8&useSSL=false", "root", "123456");
            connection.setAutoCommit(false);
            return new ConnectionState(connection);
        }

        /**
         * 预递交
         * @param connectionState
         * @throws Exception
         */
        @Override
        protected void preCommit(ConnectionState connectionState) throws Exception {
            System.out.println("start preCommit...");
        }

        /**
         * 递交操作
         * @param connectionState
         */
        @Override
        protected void commit(ConnectionState connectionState) {
            System.out.println("start transaction...");
            Connection connection = connectionState.connection;
            try {
                connection.commit();
                connection.close();
            } catch (SQLException e) {
                throw new RuntimeException("提交事物异常");
            }
        }

        /**
         * 回滚操作
         * @param connectionState
         */
        @Override
        protected void abort(ConnectionState connectionState) {
            System.out.println("start abort...");
            Connection connection = connectionState.connection;
            try {
                connection.rollback();
                connection.close();
            } catch (SQLException e) {
                throw new RuntimeException("回滚事物异常");
            }
        }
    }

    static class ConnectionState {
        // transient 的变量能被忽略序列化
        private final transient Connection connection;
        ConnectionState(Connection connection) {
            this.connection = connection;
        }
    }
}

结果:mysql 数据库中删除数据后,再次添加数据后,仍会叠加数据。

总结:

  • 1- 两段递交:自定义 sink 中 需要继承 TwoPhaseCommitSinkFunction
  • 2- kyro 序列化连接状态,VoidSerializer 需要接上 INSTANCE 作为 Void 的序列化
  • 3- 开启事务时,要放弃自动提交
  • 4- transient 的变量能被忽略序列化,此处用于连接变量
  • 5- 数据库插入计算时,要使用 executeUpdate

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1500810.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL面试题-锁(答案版)

锁 1、MySQL 有哪些锁&#xff1f; &#xff08;1&#xff09;全局锁 加了全局锁之后&#xff0c;整个数据库就处于只读状态了&#xff0c;这时其他线程执行以下操作&#xff0c;都会被阻塞&#xff1a; 对数据的增删改操作&#xff0c;比如 insert、delete、update等语句&…

【OJ比赛日历】快周末了,不来一场比赛吗? #03.09-03.15 #13场

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 2024-03-09&#xff08;周六&#xff09; #6场比赛2024-03-10…

c# combox 行间距调整

初始化combox comboBox1.DropDownStyle ComboBoxStyle.DropDownList;comboBox1.ItemHeight 25; // 设置 combox 的行高comboBox1.DrawMode DrawMode.OwnerDrawVariable; 添加 DrawItem 事件 private void comboBox1_DrawItem(object sender, DrawItemEventArgs e){if (…

Web核心

JavaWeb技术栈 B/S架构&#xff1a;Browser/Server &#xff0c; 浏览器/服务器 架构模式&#xff0c;其特点为&#xff0c;客户端只需要浏览器&#xff0c;应用程序的逻辑和数据都存储在服务器端。浏览器只需要请求服务器&#xff0c;获取Web资源&#xff0c;服务器把Web资源…

RocketMQ快速入门_2. rocketmq 的应用场景、与其他mq的差异

0. 引言 之前我们讲解过rabbitMQ&#xff0c;本期我们将进入吞吐量更加强大的rocketMQ的学习。 1. 基础概念 如果你是刚接触MQ的同学&#xff0c;还不清楚消息队列的基础概念的&#xff0c;可以参考我之前这篇文章&#xff1a; https://wu55555.blog.csdn.net/article/deta…

凌鲨微应用开发流程

微应用开发流程 使用vite,nextjs等框架创建前端项目引入需要的api包通过调试界面进行调试 创建前端项目 vite yarn create vitenextjs yarn create next-app引入需要的api包 名称权限说明http跨域访问跨域http访问tauri提供的apilinksaas-minapp/api打开浏览器读本地文件…

每日OJ题_牛客CM24 最近公共祖先

目录 牛客CM24 最近公共祖先 解析代码 牛客CM24 最近公共祖先 最近公共祖先_牛客题霸_牛客网 解析代码 class LCA { public:int getLCA(int a, int b) {// 左孩 父 * 2 1&#xff0c;右孩 父 * 2 2&#xff1b;// 父 &#xff08;孩 - 1&#xff09; / 2&#xff1b;/…

掘根教你拿捏C++异常(try,catch,throw,栈解退,异常规范,异常的重新抛出)

在介绍异常之前&#xff0c;我觉得很有必要带大家了解一下运行时错误和c异常出现之前的处理运行时错误的方式。这样子能更深入的了解异常的作用和工作原理 运行阶段错误 我们知道&#xff0c;程序有时候会遇到运行阶段错误&#xff0c;导致程序无法正常运行下去 C在运行时可…

离散数学——特殊图思维导图

离散数学——特殊图思维导图 目录 前言 内容 大纲 参考 前言 这是当初学习离散数学时整理的笔记大纲&#xff0c;其中包含了自己对于一些知识点的体悟。现将其放在这里作为备份&#xff0c;也希望能够对你有所帮助。 当初记录这些笔记只是为了在复习时更快地找到对应的知…

跨境电商趋势解析:社交电商携手私域流量运营,精准触达与转化

随着全球化的深入发展&#xff0c;跨境电商逐渐成为全球贸易的重要组成部分。在这一背景下&#xff0c;社交电商作为一种新兴的商业模式&#xff0c;正逐渐在跨境电商领域崭露头角&#xff0c;并对私域流量的运营产生了深远的影响。本文Nox聚星将和大家分析社交电商在跨境电商中…

Postman中文文档——安装与更新

前言 postman好不好用&#xff0c;只有自己去用过了才知道&#xff0c;如果你之前没有使用过的&#xff0c;那我建议尝试去安装使用一下。 postman是一款支持http协议的接口调试与测试工具&#xff0c;其主要特点就是功能强大&#xff0c;使用简单且易用性好 。 无论是开发人…

程序运行的基本流程

操作系统&#xff08;应用程序&#xff09;&#xff1a; 装系统就是将操作系统安装到硬盘1中 计算机启动的基本过程&#xff1a; 总结&#xff1a; 程序一般保存在硬盘中&#xff0c;软件安装的过程就是将程序写入硬盘的过程程序在运行时会加载进入内存&#xff0c;然后由CPU…

浅谈碳化硅MOSFET TO-247封装单管引入开尔文管脚必要性

相较于传统的硅MOSFET和硅IGBT 产品&#xff0c;基于宽禁带碳化硅材料设计的碳化硅 MOSFET 具有耐压高、导通电阻低&#xff0c;开关损耗小的特点&#xff0c;可降低器件损耗、减小产品尺寸&#xff0c;从而提升系统效率。而在实际应用中&#xff0c;我们发现&#xff1a;带辅助…

Springboot + Vue用户管理系统

Springboot Vue用户管理系统 主要实现了管理员的登录&#xff0c;用户管理&#xff0c;用户的增删改查等操作&#xff0c; 技术实现&#xff0c;前端采用Vue 后端采用Springboot ,前后端分离系统&#xff0c;数据库使用mysql 还用到了redis,mybatis-plus。。。。。。。。。…

我从200条留言中,挑选出3款免费又实用的软件,且用且珍惜

在浩如烟海的软件世界中&#xff0c;真正免费且实用的软件如同珍珠般稀少&#xff0c;能够遇见它们实属不易。 1、OBS 当你在各种平台上欣赏那些清晰且色彩绚丽的短视频时&#xff0c;或许不会想到&#xff0c;它们很可能是由一款名为OBS Studio的免费开源录屏与直播软件制作…

[云原生] k8s之存储卷

一、emptyDir存储卷 当Pod被分配给节点时&#xff0c;首先创建emptyDir卷&#xff0c;并且只要该Pod在该节点上运行&#xff0c;该卷就会存在。正如卷的名字所述&#xff0c;它最初是空的。Pod 中的容器可以读取和写入emptyDir卷中的相同文件&#xff0c;尽管该卷可以挂载到每…

妇女节:打开AI视界,成就“她力量”

根据国内招聘平台猎聘发布的《2024女性人才数据洞察报告》&#xff0c;从2023年3月到2024年2月&#xff0c;女性在AIGC领域的求职人次同比增长了190.49%。随着人工智能时代的降临&#xff0c;女性正以前所未有的姿态&#xff0c;在技术的助力下&#xff0c;蜕变成为新生的力量。…

YOLOv9来了:实时目标检测新SOTA,完胜各种轻量或大型模型!

距离YOLOv8发布仅1年的时间&#xff0c;v9诞生了&#xff01; 这个新版本主打用“可编程梯度信息来学习你想学的任何内容”。 无论是轻量级还是大型模型&#xff0c;它都完胜&#xff0c;一举成为目标检测领域新SOTA&#xff1a; 网友的心情be like: 鉴于源码已经发布&#xf…

【Linux】--- Linux编译器-gcc/g++、调试器-gdb、项目自动化构建工具-make/Makefile 使用

目录 一、Linux编译器-gcc/g1.1 gcc/g 使用方法1.2 程序的翻译过程1.3 链接 -- 动静态链接特点及区别 二、Linux调试器-gdb2.1 背景2.2 使用方法 三、Linux项目自动化构建工具-make/Makefile3.1 背景3.2 原理3.3 项目清理 一、Linux编译器-gcc/g 1.1 gcc/g 使用方法 格式&…

JuiceSSH结合Cpolar实现公网远程SSH访问内网Linux系统

文章目录 1. Linux安装cpolar2. 创建公网SSH连接地址3. JuiceSSH公网远程连接4. 固定连接SSH公网地址5. SSH固定地址连接测试 处于内网的虚拟机如何被外网访问呢?如何手机就能访问虚拟机呢? cpolarJuiceSSH 实现手机端远程连接Linux虚拟机(内网穿透,手机端连接Linux虚拟机) …