Debeizum 增量快照

news2025/1/19 22:20:12

 在Debeizum1.6版本发布之后,成功推出了Incremental Snapshot(增量快照)的功能,同时取代了原有的实验性的Parallel Snapshot(并行快照)。在本篇博客中,我将介绍全新快照方式的原理,以及深入研究其实现细节。

1、快照机制

  在以往的Debezium的中,我们需要借助其提供的Snapshot机制来获取数据源中的历史数据。以MySQL为例,Debezium提供了多种锁表方式(snapshot.locking.mode),其中minimal是最小化的锁表方式,connector会在初始化过程中读取database schemas和其他元数据时获取全局读锁,耗时一般不超过1s。然后使用REPEATABLE READS的方式读取表中的记录完成后续的操作。

  看上去这种方式和mysqldump的逻辑差不多,但这种方式还是有一些硬通病:

  • 这种快照方式依然不能中断,无法暂停和恢复,一旦失败就要重新开始,这种语义类似事务机制(必须完全执行或者根本不执行);
  • 如果是运行了一段时间的connector需要重新同步历史数据,需要暂停当前增量任务并新建新的全量任务,在全量结束后重新配置增量任务并且重启;
  • 在快照生成的过程中,任何对表中进行的操作变更都无法捕获,直到快照完成。这种情况特别是在历史数据非常大时尤其严重;
  • 无法在connector运行过程中添加新表。

  直到2019年底,Netfix开发了一套参考流式系统中Watermark(水位)概念的数据捕获框架,并在DBLog: A Watermark Based Change-Data-Capture Framework 该篇论文中介绍了该框架的详细设计。其原理简单来就是将增量任务和全量任务一起执行,框架将高水位标识和低水位标识插入到事务日志中(例如MySQL的binlog),并且在二者发生在同一水位区间时做合并。

   Debezium 采取了这个思路,实现了一套增量快照机制。新的增量快照一次只读取部分数据,不需要从头到尾、持续运行,并且支持随时增加新表,还可以随时触发快照,而不是只在任务开始时执行。更重要的是,快照过程中有数据变更,它也可以近乎实时地把变更也打入Kafka流之中。下面将来介绍这一实现细节。

2、增量快照

  下面我们以Debezium-MySQL的视角介绍他们是增量快照的实现。当一个表需要获取其当前快照的时候,Debeizum会做两件事:

  1. 获取当前表中最大的主键,作为快照结束的标准,并且将该值存储在connector offset中;
  2. 根据主键的顺序,以及increment.snapshot.chunk.size配置的大小将表分成多个块(chunk)

  当查询一个块时将构建一个动态SQL语句,选择下一个increment.snapshot.chunk.size数量记录,其最小的主键大于前一个块的最后一个主键,并且小于或等于快照初始化时记录的表中最大的主键。除此之外,当增量快照异常停止恢复后,可以从记录的执行过的主键开始重新执行。

  Debezium读取到一个chunk之后,并不着急立即发送,而是将chunk放在一个叫snapshot-window的内存窗口中间。参考以下过程:

  1. 发送一个snapshot-window-open的信号;
  2. 读取当前表中的一个chunk,并记录到内存的缓冲区中;
  3. 发送一个snapshot-window-close的信号。

snapshot-window可以是需要进行快照的数据库中一个表,这里的发送信号也只是往这个表里插入一条数据。时间线可以参考下图:

   图中T1~T6分别表示数据库当前执行的事务从prepare到commit所经历的时间,注意在MySQL中只有commit的事务才会被记录到Binlog中,Debezium从发出OPEN信号到发送CLOSE信号的过程中,只有T1~T5能够被监听到。T6因为是在CLOSE信号之外提交的,所以没法监听到。(OPEN和CLOSE两个信号也属于事务,有自己的binlog记录以及commit时间)

  Debezium并不是访问数据库的唯一进程。我们可以预期大量进程同时访问数据库,可能访问当前被快照的相同主键记录。如上图所示,对数据的任何更改都会根据提交顺序写入事务日志(例如MySQL的binlog)。由于不可能精确地确定块读事务的时间以识别潜在冲突,因此添加了打开和关闭窗口事件来划分冲突可能发生的时间。Debezium的任务就是消除这些冲突。

  为此,Debezium将块生成的所有事件记录到缓冲区中。当接收到snapshot-window-open信号时,将检查来自事务日志的所有事件是否属于快照表。如果是,则检查缓冲区是否包含了事务日志中相同记录的主键。如果是,则快照事件重复主键的记录将从缓冲区中删除,因为这是一个潜在的冲突。由于不可能对快照和事务日志事件进行正确排序,因此只保留事务日志事件(事务日志新于快照日志)。当接收到快照窗口关闭信号时,缓冲区中剩余的快照事件被发送到下游。如下图所示:

  上图表示,数据库中存在了K2、K3和K4三条记录。在OPEN信号发送前,插入了一条K1记录,更新了K2记录和删除了K3记录,所以当前数据库的情况是包含了K1、K2和K4三条记录。然后在OPEN信号发送直到CLOSE信号发送这段时间里,事务日志里面包含了K4被删除、K5插入以及K6插入三个事件,而内存缓冲区里面则是读取了K1、K2、K4和刚刚插入的K5总共4条记录(没有加上锁的情况,所以在读取快照的过程中是可以读到窗口打开时插入的数据)。在窗口打开的范围内,存在K4和K5重复的主键,所以从缓冲区中删除这两条消息,然后把事务日志刷到下游(注意没有清空事务日志中的同ID记录,事务日志还是原封不动刷到下游的),遇到CLOSE事件之后,将当前缓冲区中的快照数据刷到下游去,并清空缓冲区。这里有几个注意点:

  1. 事务日志和读取快照时间不可能保持一致,所以这里一旦事务日志和缓冲区内存在了相同ID冲突,Debezium保留了事务日志刷到下游,不然可能会丢失部分删除恢复事件。(举个例子,在A窗口内K4记录被删除并发送到事务日志中,在B窗口中K4记录重新插入进数据库,但是因为增量延迟导致读取快照时增量快照只读到A窗口所在时间,这里保留了事务日志,那么会发送删除事件到下游,恢复事件在下次读取时发送)
  2. 快照事件应该有别于INSERT操作,DEBEZIUM用op:r(有的版本是op:c)表示。

3、实现分析

以下代码分析基于Debezium1.9版本介绍MySQL快照,区别于一开始的全量数据同步,增量快照是在运行增量同步的同时运行的,在Debezium运行的过程中,允许通过外部信号的方式触发增量快照,默认情况下是通过监听某个Kafka的topic获取信号的。

Debezium的源码实现中,会通过Source表示事件源。例如MySQL的增量事件源是MysqlStreamChangeEventSource,而增量快照事件源的实现放在MysqlReadOnlyIncrementalSnapshotChangeEventSource。不过,要知道如何在增量执行同时,执行全量快照,需要我们回到增量发送数据到下游时,也就是EventDispatcher.dispatchDataChangeEvent的逻辑中。

    public boolean dispatchDataChangeEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) throws InterruptedException {
        try {
            boolean handled = false;
            // 如果从binlog中获取到的数据不需要被订阅,则忽略
            if (!filter.isIncluded(dataCollectionId)) {
                LOGGER.trace("Filtered data change event for {}", dataCollectionId);
                eventListener.onFilteredEvent(partition, "source = " + dataCollectionId, changeRecordEmitter.getOperation());
                dispatchFilteredEvent(changeRecordEmitter.getPartition(), changeRecordEmitter.getOffset());
            }
            else {
                // 拿到表结构
                DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);

                // TODO handle as per inconsistent schema info option
                if (dataCollectionSchema == null) {
                    final Optional<DataCollectionSchema> replacementSchema = inconsistentSchemaHandler.handle(partition,
                            dataCollectionId, changeRecordEmitter);
                    if (!replacementSchema.isPresent()) {
                        return false;
                    }
                    dataCollectionSchema = replacementSchema.get();
                }

                // 发送到下游
                changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver<P>() {

                    @Override
                    public void changeRecord(P partition,
                                             DataCollectionSchema schema,
                                             Operation operation,
                                             Object key, Struct value,
                                             OffsetContext offset,
                                             ConnectHeaders headers)
                            throws InterruptedException {
                        if (operation == Operation.CREATE && connectorConfig.isSignalDataCollection(dataCollectionId) && sourceSignalChannel != null) {
                            sourceSignalChannel.process(value);

                            if (signalProcessor != null) {
                                // This is a synchronization point to immediately execute an eventual stop signal, just before emitting the CDC event
                                // in this way the offset context updated by signaling will be correctly saved
                                signalProcessor.processSourceSignal();
                            }
                        }

                        if (neverSkip || !skippedOperations.contains(operation)) {
                            transactionMonitor.dataEvent(partition, dataCollectionId, offset, key, value);
                            eventListener.onEvent(partition, dataCollectionId, offset, key, value, operation);
                            if (incrementalSnapshotChangeEventSource != null) {
                                // 交给下游的snapshot,但是如果window没有打开的话,这里是不会传输给snapshot的
                                // 注意这里只需要传递Key就行,因为如果value一样的话,默认忽略,由stream传递给下游
                                // 但我看了下这里是共用同一个dispatcher,所以会影响到增量的发送
                                incrementalSnapshotChangeEventSource.processMessage(partition, dataCollectionId, key, offset);
                            }
                            // 交给下游的stream增量数据
                            streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
                        }
                    }
                });
                handled = true;
            }
            ...
         

注意一个binlog的event中可能会存在修改多个row,所以这里是每发送一个row在下游之前,就z需要执行一下incrementalSnapshotChangeEventSource.processMessage

    // MySqlReadOnlyIncrementalSnapshotChangeEventSource
    public void processMessage(MySqlPartition partition, DataCollectionId dataCollectionId, Object key, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, key, window);
        // 如果当前snapshot的窗口已经关闭了,则立即发送当前window里面的event
        boolean windowClosed = getContext().updateWindowState(offsetContext);
        if (windowClosed) {
            sendWindowEvents(partition, offsetContext);
            // 重新再读一个chunk的数据
            readChunk(partition, offsetContext);
        }
        // 如果还没关闭,则delete掉重复的key数据
        else if (!window.isEmpty() && getContext().deduplicationNeeded()) {
            deduplicateWindow(dataCollectionId, key);
        }
    }

增量快照会先检测到当前读取数据窗口是否已经关闭了,如果已经关闭了则立即发送当前窗口中的所有snapshotEvent到下游中,然后读取下一个chunk的数据。

但是这里笔者在阅读时候想到一个问题,这里是在一个线程中执行的操作,检测到一个row,然后检查窗口是否关闭,关闭了就立即发送并读取下一个chunk的数据。这样就很奇怪,它这样操作会加大发送延迟不说,每次只能去检测一个row是否在一个chunk中,这样未免效率有点低。

所以这里的windowClosed,我们来看下这里的updateWindowState实现:

    /**
     * 如果一个高低水印的GTID集合不包含一个binlog事件的GTID,那么这个水印被传递并且窗口处理模式被更新。多个binlog事件可以具有相同的GTID,
     * 这就是为什么算法等待在水印的GTID之外的binlog事件来关闭窗口,而不是在达到最大事务id时立即关闭它。
     * 重复数据删除从低水位之后的第一个事件开始,因为直到GTID包含在低水位(在chunk select语句之前捕获的executed_gtid_set)。
     * 低水位之后的COMMIT用于确保块选择看到在执行之前提交的更改。
     * 所有高水位的事件继续重复数据删除。重复数据删除的块事件插入在高水位之外的第一个事件之前。
     */
    public boolean updateWindowState(OffsetContext offsetContext) {
        // 获取当前处理了的event对应的binlog中gtid的值
        String currentGtid = getCurrentGtid(offsetContext);
        // windowOpened这个可不是chunk的window打开的标志,每一个chunk读取的时候都是直接读取然后关闭的
        // 所以不需要这个值,这个值默认为false,只有在监听消息topic收到openWindow的时候这个值才会设置为true(这里不讨论这个场景)
        // 因为前面如果读过一个chunk,那么这里的lowWatermark不会为空,而是当时读取前的gtid的值
        if (!windowOpened && lowWatermark != null) {
            // 如果当前stream处理的gtid不存在于增量快照的低水位中且低水位不为空,则表示window打开,设置windowOpened为true
            // 注意这里的gtid是一个范围,类似1-100这种,所以这里的contain只需判断是否在当前低水位的范围内
            boolean pastLowWatermark = !lowWatermark.contains(currentGtid);
            if (pastLowWatermark) {
                LOGGER.debug("Current gtid {}, low watermark {}", currentGtid, lowWatermark);
                windowOpened = true;
            }
        }
        // 如果windowOpened为true,而且chunk读取完了,那么这里的highWatermark就是读取完后的gtid
        // 否则返回false,表示chunk窗口没关闭,全量还没执行完
        if (windowOpened && highWatermark != null) {
            // 正常这里读取了一大批数据的话,高水位应该是不包含当前stream处理的gtid,应该为true
            boolean pastHighWatermark = !highWatermark.contains(currentGtid);
            if (pastHighWatermark) {
                LOGGER.debug("Current gtid {}, high watermark {}", currentGtid, highWatermark);
                // 关闭窗口,同时情况高低水位信息
                closeWindow();
                return true;
            }
        }
        return false;
    }


    // GtidSet MySQL水位用gtid表示高低水位
    public boolean contains(String gtid) {
        // split获取出serverId和transactionId范围
        String[] split = GTID_DELIMITER.split(gtid);
        // 这里叫serverId才对
        String sourceId = split[0];
        // 根据serverId拿到transactionId,我估计这里用Map存储的原因是因为有可能主从切换后
        // 一个gtid里面会存在多个serverId以及对应的transactionId
        // gtid类似这样 4160e9b3-58d9-11e8-b174-005056af6f24:1-19,甚至可以是多个8eed0f5b-6f9b-11e9-94a9-005056a57a4e:1-3:11:47-49
        // GTID = server_uuid :transaction_id
        UUIDSet uuidSet = forServerWithId(sourceId);
        if (uuidSet == null) {
            return false;
        }
        // 你用show master status看的话可能是连着的多个,8eed0f5b-6f9b-11e9-94a9-005056a57a4e:1-3:11:47-49
        // 但是一个行的话只能是一个8eed0f5b-6f9b-11e9-94a9-005056a57a4e:23
        long transactionId = Long.parseLong(split[1]);
        return uuidSet.contains(transactionId);
    }

    // GtidSet
    public boolean contains(long transactionId) {
        for (Interval interval : this.intervals) {
            if (interval.contains(transactionId)) {
                return true;
            }
        }
        return false;
    }

    // GtidSet
    public boolean contains(long transactionId) {
        return getStart() <= transactionId && transactionId <= getEnd();
    }

updateWindowState返回true的时候,就会尝试发送快照窗口中的所有数据到下游,然后重新读取一个chunk的数据,否则调用deduplicateWindow删除窗口中与当前row同个ID的快照数据。

首先,通过SHOW MASTER STATUS获取到GTID,并设置为低水位,当时获取到的GTID集合应该是类似xxx:1-465,也就是在当前集群应用过的事务合集。而从binlog拿出的每一个row,其GTID应该是xxx:467这样的类型。这里的updateWindowState的逻辑,主要是用于判断当前ROW是否在低水位的后面,或者在高水位的后面,以此检测row是否在窗口的范围之内的流式数据。

一旦当前row不在低水位的范围内,那么表示窗口打开(windowOpen=true),而如果row在高水位的范围内,那么当前row应该是窗口的增量数据,直到不在这个范围里面则表示关闭且应该flush掉这些窗口中的数据到下游。所以updateWindowState的作用就是检测增量数据是否在窗口的高低水位范围内。对于在范围内的,会采用dedeplicateWindow的逻辑剔除出窗口里的快照数据。

    protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key) {
        if (context.currentDataCollectionId() == null || !context.currentDataCollectionId().getId().equals(dataCollectionId)) {
            return;
        }
        if (key instanceof Struct) {
            // 直接remove掉
            if (window.remove((Struct) key) != null) {
                LOGGER.info("Removed '{}' from window", key);
            }
        }
    }

最后看下readChunk的逻辑,这里是每次去源集群中获取足够多的数据。

    // AbstractIncrementalSnapshotChangeEventSource
    protected void readChunk(P partition, OffsetContext offsetContext) throws InterruptedException {
        if (!context.snapshotRunning()) {
            LOGGER.info("Skipping read chunk because snapshot is not running");
            postIncrementalSnapshotCompleted();
            return;
        }
        if (context.isSnapshotPaused()) {
            LOGGER.info("Incremental snapshot was paused.");
            return;
        }
        try {
            preReadChunk(context);
            // This commit should be unnecessary and might be removed later
            jdbcConnection.commit();
            // 开始读取一个新的chunk
            context.startNewChunk();
            // 打开一个新的窗口,这在Mysql中是设置GTID为一个窗口的低水位
            emitWindowOpen();
            while (context.snapshotRunning()) {
                if (isTableInvalid(partition, offsetContext)) {
                    continue;
                }
                if (connectorConfig.isIncrementalSnapshotSchemaChangesEnabled() && !schemaHistoryIsUpToDate()) {
                    // Schema has changed since the previous window.
                    // Closing the current window and repeating schema verification within the following window.
                    break;
                }
                final TableId currentTableId = (TableId) context.currentDataCollectionId().getId();
                // 当前上下文中没有关于currentTableId的key最大值
                if (!context.maximumKey().isPresent()) {
                    // 重新获取表结构
                    currentTable = refreshTableSchema(currentTable);
                    Object[] maximumKey;
                    try {
                        // 获取当前表的最大key,作为快照结束的标志
                        maximumKey = jdbcConnection.queryAndMap(
                                buildMaxPrimaryKeyQuery(currentTable, context.currentDataCollectionId().getAdditionalCondition()), rs -> {
                                    if (!rs.next()) {
                                        return null;
                                    }
                                    return keyFromRow(jdbcConnection.rowToArray(currentTable, rs,
                                            ColumnUtils.toArray(rs, currentTable)));
                                });
                        context.maximumKey(maximumKey);
                    }
                    catch (SQLException e) {
                        LOGGER.error("Failed to read maximum key for table {}", currentTableId, e);
                        nextDataCollection(partition, offsetContext);
                        continue;
                    }
                    if (!context.maximumKey().isPresent()) {
                        LOGGER.info(
                                "No maximum key returned by the query, incremental snapshotting of table '{}' finished as it is empty",
                                currentTableId);
                        nextDataCollection(partition, offsetContext);
                        continue;
                    }
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Incremental snapshot for table '{}' will end at position {}", currentTableId,
                                context.maximumKey().orElse(new Object[0]));
                    }
                }
                // 获取关于该表的dataEvent,从这里开始读取表中的数据
                if (createDataEventsForTable(partition)) {

                    String dataCollections = context.getDataCollections().stream()
                            .map(DataCollection::getId)
                            .map(DataCollectionId::identifier).collect(
                                    Collectors.joining(","));

                    // 如果窗口中捕获不到任何数据,则立即开始关于下一个dataCollection的数据获取
                    if (window.isEmpty()) {
                        LOGGER.info("No data returned by the query, incremental snapshotting of table '{}' finished",
                                currentTableId);

                        notificationService.notify(buildNotificationWith(SnapshotStatus.TABLE_SCAN_COMPLETED,
                                Map.of(
                                        "data_collections", dataCollections,
                                        "total_rows_scanned", String.valueOf(totalRowsScanned)),
                                offsetContext),
                                Offsets.of(partition, offsetContext));

                        tableScanCompleted(partition);
                        // 开始下一个表dataCollection的获取
                        nextDataCollection(partition, offsetContext);
                    }
                    else {
                        // 事件通知
                        notificationService.notify(buildNotificationWith(SnapshotStatus.IN_PROGRESS,
                                Map.of(
                                        "data_collections", dataCollections,
                                        "current_collection_in_progress", context.currentDataCollectionId().getId().identifier(),
                                        "maximum_key", context.maximumKey().orElse(new Object[0])[0].toString(),
                                        "last_processed_key", context.chunkEndPosititon()[0].toString()),
                                offsetContext),
                                Offsets.of(partition, offsetContext));
                        break;
                    }
                }
                else {
                    context.revertChunk();
                    break;
                }
            }
            // 关闭当前窗口,设置gtid为高水位
            emitWindowClose(partition, offsetContext);
        }
        catch (SQLException e) {
            throw new DebeziumException(String.format("Database error while executing incremental snapshot for table '%s'", context.currentDataCollectionId()), e);
        }
        finally {
            postReadChunk(context);
            if (!context.snapshotRunning()) {
                postIncrementalSnapshotCompleted();
            }
        }
    }

这里去读取快照数据之前,会先获取到当前table最大的主键的值,作为增量快照结束的点。关键是在这里的createDataEventsForTable(partition)这里。

    // AbstractIncrementalSnapshotChangeEventSource
    private boolean createDataEventsForTable(P partition) {
        long exportStart = clock.currentTimeInMillis();
        LOGGER.debug("Exporting data chunk from table '{}' (total {} tables)", currentTable.id(), context.dataCollectionsToBeSnapshottedCount());

        // 构建chunk查询sql
        final String selectStatement = buildChunkQuery(currentTable, context.currentDataCollectionId().getAdditionalCondition());
        LOGGER.debug("\t For table '{}' using select statement: '{}', key: '{}', maximum key: '{}'", currentTable.id(),
                selectStatement, context.chunkEndPosititon(), context.maximumKey().get());

        final TableSchema tableSchema = databaseSchema.schemaFor(currentTable.id());

        try (PreparedStatement statement = readTableChunkStatement(selectStatement);
                ResultSet rs = statement.executeQuery()) {
            // 检查表结构是否发生变化,如果失败应该返回false,并重新读取表结构和最大key
            if (checkSchemaChanges(rs)) {
                return false;
            }
            final ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, currentTable);
            long rows = 0;
            Timer logTimer = getTableScanLogTimer();

            Object[] lastRow = null;
            Object[] firstRow = null;
            while (rs.next()) {
                rows++;
                // 这里是取出表中的记录的所有字段
                final Object[] row = jdbcConnection.rowToArray(currentTable, rs, columnArray);
                if (firstRow == null) {
                    firstRow = row;
                }
                // 将获取到的快照数据塞入window这个值中,后续发送和删除重复key都是在这个值中操作
                final Struct keyStruct = tableSchema.keyFromColumnData(row);
                window.put(keyStruct, row);
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOGGER.debug("\t Exported {} records for table '{}' after {}", rows, currentTable.id(),
                            Strings.duration(stop - exportStart));
                    logTimer = getTableScanLogTimer();
                }
                lastRow = row;
            }
            final Object[] firstKey = keyFromRow(firstRow);
            // 获取到的数据都是根据id严格排序的,所以这里的lastKey可以作为下一次读取chunk的查询条件
            final Object[] lastKey = keyFromRow(lastRow);
            if (context.isNonInitialChunk()) {
                progressListener.currentChunk(partition, context.currentChunkId(), firstKey, lastKey);
            }
            else {
                progressListener.currentChunk(partition, context.currentChunkId(), firstKey, lastKey, context.maximumKey().orElse(null));
            }
            // 记录lastKey,作为下一次chunk的查询条件
            context.nextChunkPosition(lastKey);
            if (lastRow != null) {
                LOGGER.debug("\t Next window will resume from {}", (Object) context.chunkEndPosititon());
            }

            LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
                    currentTable.id(), Strings.duration(clock.currentTimeInMillis() - exportStart));
            incrementTableRowsScanned(partition, rows);
        }
        catch (SQLException e) {
            throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed", e);
        }
        return true;
    }

    // AbstractIncrementalSnapshotChangeEventSource
    protected PreparedStatement readTableChunkStatement(String sql) throws SQLException {
        final PreparedStatement statement = jdbcConnection.readTablePreparedStatement(connectorConfig, sql,
                OptionalLong.empty());
        if (context.isNonInitialChunk()) {
            final Object[] maximumKey = context.maximumKey().get();
            final Object[] chunkEndPosition = context.chunkEndPosititon();
            // Fill boundaries placeholders
            int pos = 0;
            for (int i = 0; i < chunkEndPosition.length; i++) {
                for (int j = 0; j < i + 1; j++) {
                    statement.setObject(++pos, chunkEndPosition[j]);
                }
            }
            // Fill maximum key placeholders
            for (int i = 0; i < chunkEndPosition.length; i++) {
                for (int j = 0; j < i + 1; j++) {
                    statement.setObject(++pos, maximumKey[j]);
                }
            }
        }
        return statement;
    }

这里作者考虑到表的主键可能是复合主键,在每一次重新去读取chunk的时候,都需要读取比上一次读取的最大主键大一定数量的快照数据

    // AbstractIncrementalSnapshotChangeEventSource
    protected String buildChunkQuery(Table table, int limit, Optional<String> additionalCondition) {
        String condition = null;
        // Add condition when this is not the first query
        if (context.isNonInitialChunk()) {
            final StringBuilder sql = new StringBuilder();
            // Window boundaries
            addLowerBound(table, sql);
            // Table boundaries
            sql.append(" AND NOT ");
            addLowerBound(table, sql);
            condition = sql.toString();
        }
        final String orderBy = getQueryColumns(table).stream()
                .map(c -> jdbcConnection.quotedColumnIdString(c.name()))
                .collect(Collectors.joining(", "));
        return jdbcConnection.buildSelectWithRowLimits(table.id(),
                limit,
                buildProjection(table),
                Optional.ofNullable(condition),
                additionalCondition,
                orderBy);
    }

    // AbstractIncrementalSnapshotChangeEventSource
    private void addLowerBound(Table table, StringBuilder sql) {
        // To make window boundaries working for more than one column it is necessary to calculate
        // with independently increasing values in each column independently.
        // For one column the condition will be (? will always be the last value seen for the given column)
        // (k1 > ?)
        // For two columns
        // (k1 > ?) OR (k1 = ? AND k2 > ?)
        // For four columns
        // (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?)
        // etc.
        // 获取pk column
        final List<Column> pkColumns = getQueryColumns(table);
        if (pkColumns.size() > 1) {
            sql.append('(');
        }
        // 这里的两个i,j循环的意思是,根据主键列用OR拼接出主键列数量的条件,例如主键有3个,分别是pk1,pk2,pk3
        // 那么拼接出来的条件就是 (pk1 > ?) OR (pk1 = ? AND pk2 > ?) OR (pk1 = ? AND pk2 = ? AND pk3 > ?)
        // 后面还有limit,以此获取足够多的chunk,而且根据逐渐数量递增
        for (int i = 0; i < pkColumns.size(); i++) {
            // 是否是最后一列
            final boolean isLastIterationForI = (i == pkColumns.size() - 1);
            sql.append('(');
            for (int j = 0; j < i + 1; j++) {
                final boolean isLastIterationForJ = (i == j);
                // quotedColumnIdString 是避免用户用关键字作为字段,所以加上开闭服务,类似MySQL可以用`columnName`
                sql.append(jdbcConnection.quotedColumnIdString(pkColumns.get(j).name()));
                // 这里加上  > 是用于保证id大于某个值?
                sql.append(isLastIterationForJ ? " > ?" : " = ?");
                if (!isLastIterationForJ) {
                    sql.append(" AND ");
                }
            }
            sql.append(")");
            if (!isLastIterationForI) {
                sql.append(" OR ");
            }
        }
        if (pkColumns.size() > 1) {
            sql.append(')');
        }
    }

      

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/813521.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

S32K14x FlexNVM介绍(flexible Non-volatile memory)

S32K14x是一款NXP推出的32位汽车级微控制器&#xff0c;其存储结构相对复杂。下面是对其存储结构的中文介绍&#xff1a; S32K14x采用了分层存储结构&#xff0c;包括Flash存储器和SRAM存储器。Flash存储器用于存储程序代码和常量数据&#xff0c;而SRAM存储器用于存储变量数据…

常见的几种排序

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C &#x1f525;座右铭&#xff1a;“不要等到什么都没有了&#xff0c;才下…

【分布式系统】分布式系统的8个谬误

网络可靠 对于分布式系统来说&#xff0c;网络、计算、存储是三大基石&#xff0c;系统之间进行拆分隔离之后&#xff0c;那么必定存在网络通讯&#xff0c;而网络是最不可靠的。 不管是从硬件层面还是软件层面来说&#xff0c;网络是不可靠的。&#xff08;断电、配置错误、ID…

ChatGPT结合知识图谱构建医疗问答应用 (一) - 构建知识图谱

一、ChatGPT结合知识图谱 在本专栏的前面文章中构建 ChatGPT 本地知识库问答应用&#xff0c;都是基于词向量检索 Embedding 嵌入的方式实现的&#xff0c;在传统的问答领域中&#xff0c;一般知识源采用知识图谱来进行构建&#xff0c;但基于知识图谱的问答对于自然语言的处理…

《JavaSE-第二十一章》之线程的状态与中断

前言 在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!” 博客主页&#xff1a;KC老衲爱尼姑的博客主页 博主的github&#xff0c;平常所写代码皆在于此 共勉&#xff1a;talk is cheap, show me the code 作者是爪哇岛的新手&#xff0c;水平很有限&…

Ctfshow web入门 sqli-labs特性篇 web517-web568 详细题解 全

web517 输入?id1 正常 输入?id1 报错 .0 输入?id1-- 正常判断是字符型注入&#xff0c;闭合方式是这里插一句。limit 100,1是从第100条数据开始&#xff0c;读取1条数据。limit 6是读取前6条数据。 ?id1 order by 3-- 正常判断回显位有三个。?id1 and 12 union se…

json-server详解

零、文章目录 json-server详解 1、简介 Json-server 是一个零代码快速搭建本地 RESTful API 的工具。它使用 JSON 文件作为数据源&#xff0c;并提供了一组简单的路由和端点&#xff0c;可以模拟后端服务器的行为。github地址&#xff1a;https://github.com/typicode/json-…

RWEQ模型——土壤风蚀模拟

详情点击链接&#xff1a;基于“RWEQ”集成技术在土壤风蚀模拟与风蚀模数估算、变化归因分析中的实践应用及SCI论文撰写 前沿 土壤风蚀是一个全球性的环境问题。中国是世界上受土壤风蚀危害最严重的国家之一&#xff0c;土壤风蚀是中国干旱、半干旱及部分湿润地区土地荒漠化的…

解读Spring-context的property-placeholder

在spring中&#xff0c;如果要给程序定义一些参数&#xff0c;可以放在application.properties中&#xff0c;通过<context:property-placeholder>加载这个属性文件&#xff0c;然后就可以通过value给我们的变量自动赋值&#xff0c;如果你们的程序可能运行在多个环境中&…

Android 面试题 应用程序结构 九

&#x1f525; 核心应用程序 Activity五个状态&#x1f525; Starting-> running-> paused-> stopped-> killed 启动状态&#xff08;Starting&#xff09;&#xff1a;Activity的启动状态很短暂&#xff0c;当Activity启动后便会进入运行状态&#xff08;Running…

大数据Flink(五十四):Flink用武之地

文章目录 Flink用武之地 一、Event-driven Applications【事件驱动】 二、Data Analytics Applications【数据分析】 三、​​​​​​​Data Pipeline Applications【数据管道】 Flink用武之地 应用场景 | Apache Flink 从很多公司的应用案例发现&#xff0c;其实Flink主…

tinkerCAD案例:25. 量角器 - 测量角度

tinkerCAD案例&#xff1a;25. 量角器 - 测量角度 原文 Now we’re going to make a protractor! A Protractor is one of the most basic, but essential, tools for making measurements. It is, then, surprising that the modern protractor is barely over 200 years ol…

简单实现jdk1.7HashMap

1.定义一个Map接口,Entry<K,V>对象为Map的元素 package test;public interface Map<K,V> {V put(K k,V v);V get(K k);int size();interface Entry<K,V>{K getKey();V getValue();}}2.主要实现了put,get以及size()方法 package test;public class HashMap&…

uniapp小程序,根据小程序的环境版本,控制的显页面功能按钮的示隐藏

需求&#xff1a;根据小程序环境控制控制页面某个功能按钮的显示隐藏&#xff1b; 下面是官方文档和功能实现的相关代码&#xff1a; 实现上面需要&#xff0c;用到了uni.getAccountInfoSync()&#xff1a; uni.getAccountInfoSync() 是一个 Uniapp 提供的同步方法&#xff0c…

零代码编程:用ChatGPT对Excel表格进行批量自动化处理

F盘的“北交所上市公司全部发明专利”文件夹里面有几百个这样的Excel表格&#xff0c;格式一致&#xff0c;需要合并所有表格内容到一个表格&#xff0c;方便查找内容&#xff0c;但是不要前面两行。 可以在ChatGPT中这样输入&#xff1a; 写一段Python程序&#xff1a; F盘的…

基于opencv与机器学习的摄像头实时识别数字!附带完整的代码、数据集和训练模型!!

前言 使用摄像头实时识别数字算是目标检测任务&#xff0c;总体上分为两步&#xff0c;第一步是检测到数字卡片的位置&#xff0c;第二步是对检测到的数字卡片进行分类以确定其是哪个数字。在第一步中主要涉及opencv的相关功能&#xff0c;第二步则使用机器学习的方式进行分类…

求三个球面交点的高效解法

文章目录 一、问题描述二、推导步骤代数法几何法 三、MATLAB代码 一、问题描述 如图&#xff0c;已知三个球面的球心坐标分别为 P 1 ( x 1 , y 1 , z 1 ) , P 2 ( x 2 , y 2 , z 2 ) , P 3 ( x 3 , y 3 , z 3 ) P_1(x_1,y_1,z_1),P_2(x_2,y_2,z_2),P_3(x_3,y_3,z_3) P1​(x1​,…

浏览器访问nginx转发打开oss上的html页面默认是下载,修改为预览

使用阿里云盒OSS上传了html页面&#xff0c;在nginx里配置跳转访问该页面时&#xff0c;在浏览器里直接默认下载了该页面&#xff0c;现在想实现预览功能&#xff0c;只需在nginx里的location里修改消息头的Content-Disposition为inline即可 注意要隐藏头信息proxy_hide_header…

【机器学习】西瓜书习题3.3Python编程实现对数几率回归

参考代码 结合自己的理解&#xff0c;添加注释。 代码 导入相关的库 import numpy as np import pandas as pd import matplotlib from matplotlib import pyplot as plt from sklearn import linear_model导入数据&#xff0c;进行数据处理和特征工程 # 1.数据处理&#x…

ChatGPT炒股:爬取股票官方微信公众号的新闻资讯

上市公司的微信公众号&#xff0c;现在已经成为官网之外最重要的官方信息发布渠道。有些不会在股票公告中发布的消息&#xff0c;也会在微信公众号进行发布。所以&#xff0c;跟踪持仓股票的公众号信息&#xff0c;非常重要。 下面&#xff0c;以贝特瑞的官方公众号“贝特瑞新…