flink sink kafka的事务提交现象猜想

news2024/12/13 20:29:59

现象 

查看flink源码时 sink kafka有事务提交机制,查看源码发现是使用两阶段提交策略,而事务提交是checkpoint完成后才执行,那么如果checkpoint设置间隔时间比较长时,事务未提交之前,后端应该消费不到数据,而观察实际现象为写入kafka的消费数据可以立马消费。

测试用例

测试流程

  1. 编写任务1,设置较长的checkpoint时间,并且指定 CheckpointingMode.EXACTLY_ONCE,输出输出到kafka。
  2. 编写任务2消费任务的结果topic,打印控制台,验证结果。
  3. 根据现象查看源码,分析原因。

测试用例

测试任务1

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(1000*60l, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");

//        超时时间,checkpoint没在时间内完成则丢弃
        env.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
        //最小间隔时间(前一次结束时间,与下一次开始时间间隔)
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//        当 Flink 任务取消时,保留外部保存的 checkpoint 信息

        
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("127.0.0.1:9092")
                .setTopics("test001")
                .setGroupId("my-group")
//                .setStartingOffsets(OffsetsInitializer())
                .setStartingOffsets(OffsetsInitializer.committedOffsets())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();


        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        // 从文件读取数据
//        DataStream<SensorReading> dataStream = env.addSource( new SourceTest4.MySensorSource() );
        DataStream<String> map = kafkaSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
            
                return s;
            }
        });
        Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
        properties.setProperty("transaction.timeout.ms","900000");
//        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.65.128:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("test002")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setKafkaProducerConfig(properties)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("flink-xhaodream-")
                .build();
        map.sinkTo(sink);
        // 打印输出
        env.execute();

测试任务2

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(1000*150l, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
//        当 Flink 任务取消时,保留外部保存的 checkpoint 信息
        Properties properties1 = new Properties();
//        properties1.put("isolation.level","read_committed");
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("127.0.0.1:9092")
                .setTopics("test002")
                .setGroupId("my-group2")
                .setProperties(properties1)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        kafkaSource.print(" test2接受数据");
        // 打印输出
        env.execute();

测试结果分析

测试结果:

任务1开启后,无论是否执行checkpoint,任务checkpoint都可以正常消费数据,与预期不符合。

原因排查

查看kafkaSink 的源码,找到跟与两阶段提交相关的代码,1.18源码中TwoPhaseCommittingSink有重构。kafkasink实现TwoPhaseCommittingSink接口实现,创建Commiter和Writer。

@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
    PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;

    Committer<CommT> createCommitter() throws IOException;

    SimpleVersionedSerializer<CommT> getCommittableSerializer();

    @PublicEvolving
    public interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
    }
}

--------------------------------------
public class KafkaSink<IN>
        implements StatefulSink<IN, KafkaWriterState>,
                TwoPhaseCommittingSink<IN, KafkaCommittable> {

    private final DeliveryGuarantee deliveryGuarantee;

    private final KafkaRecordSerializationSchema<IN> recordSerializer;
    private final Properties kafkaProducerConfig;
    private final String transactionalIdPrefix;

    KafkaSink(
            DeliveryGuarantee deliveryGuarantee,
            Properties kafkaProducerConfig,
            String transactionalIdPrefix,
            KafkaRecordSerializationSchema<IN> recordSerializer) {
        this.deliveryGuarantee = deliveryGuarantee;
        this.kafkaProducerConfig = kafkaProducerConfig;
        this.transactionalIdPrefix = transactionalIdPrefix;
        this.recordSerializer = recordSerializer;
    }

    /**
     * Create a {@link KafkaSinkBuilder} to construct a new {@link KafkaSink}.
     *
     * @param <IN> type of incoming records
     * @return {@link KafkaSinkBuilder}
     */
    public static <IN> KafkaSinkBuilder<IN> builder() {
        return new KafkaSinkBuilder<>();
    }
-- 创建Committer

    @Internal
    @Override
    public Committer<KafkaCommittable> createCommitter() throws IOException {
        return new KafkaCommitter(kafkaProducerConfig);
    }

    @Internal
    @Override
    public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {
        return new KafkaCommittableSerializer();
    }
-- 创建writer

    @Internal
    @Override
    public KafkaWriter<IN> createWriter(InitContext context) throws IOException {
        return new KafkaWriter<IN>(
                deliveryGuarantee,
                kafkaProducerConfig,
                transactionalIdPrefix,
                context,
                recordSerializer,
                context.asSerializationSchemaInitializationContext(),
                Collections.emptyList());
    }

    @Internal
    @Override
    public KafkaWriter<IN> restoreWriter(
            InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException {
        return new KafkaWriter<>(
                deliveryGuarantee,
                kafkaProducerConfig,
                transactionalIdPrefix,
                context,
                recordSerializer,
                context.asSerializationSchemaInitializationContext(),
                recoveredState);
    }

    @Internal
    @Override
    public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {
        return new KafkaWriterStateSerializer();
    }

    @VisibleForTesting
    protected Properties getKafkaProducerConfig() {
        return kafkaProducerConfig;
    }
}
KafkaWriter和KafkaCommitter源码,

在KafkaWriter中snapshotState方法中发现如果deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE的开启事务的判断逻辑。

class KafkaWriter<IN>
        implements StatefulSink.StatefulSinkWriter<IN, KafkaWriterState>,
                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, KafkaCommittable> {
.... 省略代码  
 @Override
    public Collection<KafkaCommittable> prepareCommit() {
        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
            return Collections.emptyList();
        }

        // only return a KafkaCommittable if the current transaction has been written some data
        if (currentProducer.hasRecordsInTransaction()) {
            final List<KafkaCommittable> committables =
                    Collections.singletonList(
                            KafkaCommittable.of(currentProducer, producerPool::add));
            LOG.debug("Committing {} committables.", committables);
            return committables;
        }

        // otherwise, we commit the empty transaction as is (no-op) and just recycle the producer
        currentProducer.commitTransaction();
        producerPool.add(currentProducer);
        return Collections.emptyList();
    }

    @Override
    public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
-- 开启事务判断        
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            currentProducer = getTransactionalProducer(checkpointId + 1);
            currentProducer.beginTransaction();
        }
        return Collections.singletonList(kafkaWriterState);
    }
。。。。。
}

 查看 KafkaCommitter的commit()方法发现producer.commitTransaction();操作

/**
 * Committer implementation for {@link KafkaSink}
 *
 * <p>The committer is responsible to finalize the Kafka transactions by committing them.
 */
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
    public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE =
            "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n"
                    + "To avoid data loss, the application will restart.";

    private final Properties kafkaProducerConfig;

    @Nullable private FlinkKafkaInternalProducer<?, ?> recoveryProducer;

    KafkaCommitter(Properties kafkaProducerConfig) {
        this.kafkaProducerConfig = kafkaProducerConfig;
    }

    @Override
    public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
            throws IOException, InterruptedException {
        for (CommitRequest<KafkaCommittable> request : requests) {
            final KafkaCommittable committable = request.getCommittable();
            final String transactionalId = committable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", transactionalId);
            Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =
                    committable.getProducer();
            FlinkKafkaInternalProducer<?, ?> producer;
            try {
                producer =
                        recyclable
                                .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
                                .orElseGet(() -> getRecoveryProducer(committable));
  --- 事务提交
                producer.commitTransaction();
                producer.flush();
                recyclable.ifPresent(Recyclable::close);
            } catch (RetriableException e) {
                LOG.warn(
                        "Encountered retriable exception while committing {}.", transactionalId, e);
                request.retryLater();
            } catch (ProducerFencedException e) {
              ......
            }
        }
    }
。。。。
}
分析结果

发现除了设置checkpoint还需要kafkasink单独设置.才会实现输出端的开启事务,因此在任务1中添加设置setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

 KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.65.128:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("test002")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setKafkaProducerConfig(properties)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("flink-xhaodream-")
                .build();

再次验证任务任务2依然可以正常消费。这是有一点头大,不明白为什么?想到既然开启事务肯定有事务的隔离级别,查询了kafka的事务隔离级别,有两种,分别是读已提交和读未提交,默认消费事务是读未提交。

​
kafka的事务隔离级别:
读已提交(Read committed):此隔离级别保证消费者只能读取已经提交的消息。这意味着事务中的消息在提交之前对消费者是不可见的。使用此隔离级别可以避免消费者读取到未提交的事务消息,确保消费者只读取到已经持久化的消息。

读未提交(Read Uncommitted):此隔离级别允许消费者读取未提交的消息。这意味着事务中的消息在提交之前就对消费者可见。使用此隔离级别可以实现更低的延迟,但可能会导致消费者读取到未提交的事务消息。

​

在任务2中添加isolation.level="read_committed",设定读取消费事务级别为读已提交,再次测试,发现任务1执行完checkpoint前任务2消费不到数据。而命令行可以及时消费任务1的输出topic可可以消费到数据。结果与预期相同。

 Properties properties1 = new Properties();
      properties1.put("isolation.level","read_committed");
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("127.0.0.1:9092")
                .setTopics("test002")
                .setGroupId("my-group2")
                .setProperties(properties1)

注意事项

Kafka | Apache Flink

FlinkKafkaProducer 已被弃用并将在 Flink 1.15 中移除,请改用 KafkaSink

官网文档信息

Kafka | Apache Flink

Kafka Consumer 提交 Offset 的行为配置 #

Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。

配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。

  • 禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。

  • 启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。

kafkasink支持语义保证

kafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释:

  • DeliveryGuarantee.NONE 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
  • DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

推荐查看1.14版本和1.18版本结合起来看,在一些细节处理上有差异。

Kafka | Apache Flink

其他源码简介

如果查看1.18版本源码不太好理解两阶段提交,可以查看1.14.5的源码,发现FlinkKafkaProducer被标记废除请改用 KafkaSink,并将在 Flink 1.15 中移除, 在1.14.5中TwoPhaseCommitSinkFunction为抽象类,有明确定开启事务、预提交和提交的抽象方法,比较好理解。

 查看1.14.5版本的KafkaSink 的依赖,发现没有直接使用TwoPhaseCommitSinkFunction,但是查看源码可以看到使用了commiter和kafkawriter对象

public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> {   
 public static <IN> KafkaSinkBuilder<IN> builder() {
        return new KafkaSinkBuilder<>();
    }
-- KafkaWriter 中会判断是否需要开启事务

    @Override
    public SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(
            InitContext context, List<KafkaWriterState> states) throws IOException {
        final Supplier<MetricGroup> metricGroupSupplier =
                () -> context.metricGroup().addGroup("user");
        return new KafkaWriter<>(
                deliveryGuarantee,
                kafkaProducerConfig,
                transactionalIdPrefix,
                context,
                recordSerializer,
                new InitContextInitializationContextAdapter(
                        context.getUserCodeClassLoader(), metricGroupSupplier),
                states);
    }

-- 事务提交在kafkaCommitter
    @Override
    public Optional<Committer<KafkaCommittable>> createCommitter() throws IOException {
        return Optional.of(new KafkaCommitter(kafkaProducerConfig));
    }

    @Override
    public Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter()
            throws IOException {
        return Optional.empty();
    }

...
}

KafkaWriter源码


    @Override
    public List<KafkaCommittable> prepareCommit(boolean flush) {
        if (deliveryGuarantee != DeliveryGuarantee.NONE || flush) {
            currentProducer.flush();
        }
        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            final List<KafkaCommittable> committables =
                    Collections.singletonList(
                            KafkaCommittable.of(currentProducer, producerPool::add));
            LOG.debug("Committing {} committables, final commit={}.", committables, flush);
            return committables;
        }
        return Collections.emptyList();
    }
-- 快照状态开启事务
    @Override
    public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            currentProducer = getTransactionalProducer(checkpointId + 1);
            currentProducer.beginTransaction();
        }
        return ImmutableList.of(kafkaWriterState);
    }

1.14.5 版本TwoPhaseCommitSinkFunction是一个抽象类 在1.18 中是接口

/**
 * Flink Sink to produce data into a Kafka topic. By default producer will use {@link
 * FlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
 * FlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
 *
 * @deprecated Please use {@link org.apache.flink.connector.kafka.sink.KafkaSink}.
 */
@Deprecated
@PublicEvolving
public class FlinkKafkaProducer<IN>
        extends TwoPhaseCommitSinkFunction<
                IN,
                FlinkKafkaProducer.KafkaTransactionState,
                FlinkKafkaProducer.KafkaTransactionContext> {
 。。。}
-- 1.14 版本TwoPhaseCommitSinkFunction 为抽象类

@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
        implements CheckpointedFunction, CheckpointListener { }


-- 1.18 版本
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
    PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;

    Committer<CommT> createCommitter() throws IOException;

    SimpleVersionedSerializer<CommT> getCommittableSerializer();

    @PublicEvolving
    public interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
    }
}

FlinkKafkaProducer继承TwoPhaseCommitSinkFunction,会重写其中的方法,查看重写开启事务的方法
  -- FlinkKafkaProducer 中重写beginTransaction 方法
 @Override
    protected FlinkKafkaProducer.KafkaTransactionState beginTransaction()
            throws FlinkKafkaException {
        switch (semantic) {
            case EXACTLY_ONCE:
                FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
-- 开启kafka的procder的事务
                producer.beginTransaction();
                return new FlinkKafkaProducer.KafkaTransactionState(
                        producer.getTransactionalId(), producer);
            case AT_LEAST_ONCE:
            case NONE:
                // Do not create new producer on each beginTransaction() if it is not necessary
                final FlinkKafkaProducer.KafkaTransactionState currentTransaction =
                        currentTransaction();
                if (currentTransaction != null && currentTransaction.producer != null) {
                    return new FlinkKafkaProducer.KafkaTransactionState(
                            currentTransaction.producer);
                }
                return new FlinkKafkaProducer.KafkaTransactionState(
                        initNonTransactionalProducer(true));
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

只有当FlinkKafkaProducer.Semantic 为EXACTLY_ONCE时才会开启事务,查看其构造方法

 public FlinkKafkaProducer(
            String topicId,
            SerializationSchema<IN> serializationSchema,
            Properties producerConfig,
            @Nullable FlinkKafkaPartitioner<IN> customPartitioner,
            FlinkKafkaProducer.Semantic semantic,
            int kafkaProducersPoolSize) {
        this(
                topicId,
                null,
                null,
                new KafkaSerializationSchemaWrapper<>(
                        topicId, customPartitioner, false, serializationSchema),
                producerConfig,
                semantic,
                kafkaProducersPoolSize);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2258943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推送(push)项目到gitlab

文章目录 1、git init1.1、在当前目录中显示隐藏文件&#xff1a;1.2、查看已有的远程仓库1.3、确保你的本地机器已经生成了 SSH 密钥&#xff1a;1.4、将生成的公钥文件&#xff08;通常位于 ~/.ssh/id_rsa.pub&#xff09;复制到 GitLab 的 SSH 设置中&#xff1a;1.5、测试 …

7.Vue------$refs与$el详解 ------vue知识积累

$refs 与 $el是什么&#xff1f; 作用是什么? ref&#xff0c;$refs&#xff0c;$el &#xff0c;三者之间的关系是什么&#xff1f; ref (给元素或者子组件注册引用信息) 就像你要给元素设置样式&#xff0c;就需要先给元素设定一个 class 一样&#xff0c;同理&#xff0c;…

通俗易懂的 Nginx 反向代理 配置

通俗易懂的 Nginx 反向代理 配置 首先 root 与 alias 的区别 root 是直接拼接 root location location /i/ {root /data/w3; }当请求 /i/top.gif &#xff0c;/data/w3/i/top.gif 会被返回。 alias 是用 alias 替换 location location /i/ {alias /data/w3/images/; }当请…

git 导出某段时间修改的文件 windows

第一步&#xff1a;列出两次commitID之间的文件变动 git diff oldid newid --name-only// 例如 git diff 4a886c57a8b5611a2abcfcd120461c2e92f7029a HEAD --name-only 4a886c57a8b5611a2abcfcd120461c2e92f7029a 代表之前 HEAD 代表最新或者换成某次commitID 例如&#xf…

若依集成Uflo2工作流引擎

文章目录 1. 创建子模块并添加依赖1.1 新建子模块 ruoyi-uflo1.2 引入 Uflo2 相关依赖 2. 配置相关 config2.1 配置 ServletConfig2.2 配置 UfloConfig2.3 配置 TestEnvironmentProvider 3. 引入Uflo配置文件4. 启动并访问 Uflo2 是由 BSTEK 自主研发的一款基于 Java 的轻量级工…

BERT:用于语言理解的深度双向 Transformer 的预训练。

文章目录 0. 摘要1. 介绍2. 相关工作2.1 无监督的基于特征的方法2.3 无监督微调方法2.3 从受监督数据中迁移学习 3. BERT3.1 预训练 BERT3.2 微调 BERT 4. 实验4.1 GLUE4.2 SQuAD v1.14.3 SQuAD v2.04.4 SWAG 5. 消融研究5.1 预训练任务的影响5.2 模型大小的影响5.3 使用 BERT …

如何快速批量把 PDF 转为 JPG 或其它常见图像格式?

在某些特定场景下&#xff0c;将 PDF 转换为 JPG 图片格式却具有不可忽视的优势。例如&#xff0c;当我们需要在不支持 PDF 查看的设备或软件中展示文档内容时&#xff0c;JPG 图片能够轻松被识别和打开&#xff1b;此外&#xff0c;对于一些网络分享或社交媒体发布的需求&…

如何在项目中使用人大金仓替换mysql

文章目录 数据库连接配置调整驱动和连接字符串修改&#xff1a;用户名和密码&#xff1a; SQL 语法兼容性检查数据类型差异处理&#xff1a;函数差异&#xff1a;SQL语句客户端 SQL 交互工具 数据迁移数据库、用户移植数据迁移工具使用&#xff1a;迁移过程中的问题及解决方案 …

【DVWA】XSS(Stored)

倘若人生一马平川&#xff0c;活着还有什么意思呢。 1.XSS(Stored)(Low) 相关代码分析 trim(string,charlist) 函数移除字符串两侧的空白字符或其他预定义字符&#xff0c;预定义字符包括、\t、\n、\x0B、\r以及空格&#xff0c;可选参数charlist支持添加额外需要删除的字符…

数据分析python小工具录入产品信息到Excel

在没有后台管理系统的时候&#xff0c;有时候为了方便起见&#xff0c;想提供一个输入框让运营人员直接输入&#xff0c;然后数据就会以数据库的形式存进数据库 效果图&#xff1a; 输入用户名 输入数据 输入信息后点击添加到表格&#xff0c;检查后方便批量保存到excel …

HTML和JavaScript实现商品购物系统

下面是一个更全面的商品购物系统示例&#xff0c;包含新增商品、商品的增加删除以及结算找零的功能。这个系统使用HTML和JavaScript实现。 1.功能说明&#xff1a; 这个应用程序使用纯HTML和JavaScript实现。 包含一个商品列表和一个购物车区域。商品列表中有几个示例商品&a…

C# 探险之旅:第三节 - 有趣的变量命名

欢迎再次回到我们的C#魔法森林。今天&#xff0c;我们要一起探索一个既有趣又实用的技能——变量命名。想象一下&#xff0c;你正在为你的小精灵们&#xff08;变量&#xff09;起名字&#xff0c;好的名字不仅能让它们更容易被识别&#xff0c;还能让你的魔法书&#xff08;代…

JavaEE 【知识改变命运】04 多线程(3)

文章目录 多线程带来的风险-线程安全线程不安全的举例分析产出线程安全的原因&#xff1a;1.线程是抢占式的2. 多线程修改同一个变量&#xff08;程序的要求&#xff09;3. 原子性4. 内存可见性5. 指令重排序 总结线程安全问题产生的原因解决线程安全问题1. synchronized关键字…

ASP.NET|日常开发中连接Sqlite数据库详解

ASP.NET&#xff5c;日常开发中连接Sqlite数据库详解 前言一、安装和引用相关库1.1 安装 SQLite 驱动1.2 引用命名空间 二、配置连接字符串2.1 连接字符串的基本格式 三、建立数据库连接3.1 创建连接对象并打开连接 四、执行数据库操作4.1 创建表&#xff08;以简单的用户表为例…

DS记录中

DataX/hdfswriter/doc/hdfswriter.md at master alibaba/DataX GitHub DataX 写入文档 https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/architecture/task-structure DS文档 DS 项目举例 流程 数据库(Datax) -> ODS &#xff08;shell&#xff09;->ADS(…

Node.js express

1. express 介绍 express 是一个基于 Node.js 平台的极简、灵活的 WEB 应用开发框架&#xff0c;官方网址&#xff1a;https://www.expressjs.com.cn/简单来说&#xff0c;express 是一个封装好的工具包&#xff0c;封装了很多功能&#xff0c;便于我们开发 WEB 应用&#xff…

网络应用技术 实验八:防火墙实现访问控制(华为ensp)

目录 一、实验简介 二、实验目的 三、实验需求 四、实验拓扑 五、实验步骤 1、设计全网 IP 地址 2、设计防火墙安全策略 3、在 eNSP 中部署园区网 4、配置用户主机地址 5、配置网络设备 配置交换机SW-1~SW-5 配置路由交换机RS-1~RS-5 配置路由器R-1~R-3 6、配置仿…

分布式日志系统设计

一、分布式日志系统定义 分布式日志系统是一种用于收集、存储和分析大规模分布式系统日志的系统。它可以帮助开发人员和系统管理员实时监控和调试系统&#xff0c;提高系统可靠性和可用性&#xff0c;同时也可以用于日志分析和故障排查。 二、简单设计思路 日志收集&#xff…

详解RabbitMQ在Ubuntu上的安装

​​​​​​​ 目录 Ubuntu 环境安装 安装Erlang 查看Erlang版本 退出命令 ​编辑安装RabbitMQ 确认安装结果 安装RabbitMQ管理界面 启动服务 查看服务状态 通过IP:port访问 添加管理员用户 给用户添加权限 再次访问 Ubuntu 环境安装 安装Erlang RabbitMq需要…

java+springboot+mysql私人会所管理系统

项目介绍&#xff1a; 使用javaspringbootmysql开发的私人会所管理系统&#xff0c;系统包含管理员、技师、用户角色&#xff0c;功能如下&#xff1a; 管理员&#xff1a;用户管理&#xff1b;服务项目&#xff1b;技师管理&#xff1b;房间管理&#xff1b;预约管理&#x…