Flink LookupJoin攒批查询

news2025/3/1 2:56:01

Flink LookupJoin攒批查询

需求背景

使用Lookup Join进行维表关联时,流表数据需要实时与维表数据进行关联。使用Cache会导致数据关联不准确,不使用Cache会造成数据库服务压力。攒批查询是指攒够一定批数量的数据,相同的查询Key只查询一次,从而减少查询次数。对短时间Key重复率比较高的场景有不错的性能提升。

技术实现

如下流程图所示,技术实现主要包含两个部分:

  1. 解析Flink SQL中的Hints参数,从而来推断是否要开启攒批处理
  2. 实现攒批查询的处理逻辑,即BatchLookupJoinRunner

img

解析Hints参数

Flink官网有对SQL提示的详情描述,具体参考:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/sql/queries/hints/#lookup。

LOOKUP提示允许用户对Flink优化器进行建议配置,如:

  1. 使用同步或异步的查找函数
  2. 配置异步查找相关参数
  3. 启用延迟重试查找策略

利用这个提示机制,我们可以通过提示配置来判断是否需要进行攒批处理,主要涉及到两个参数:

  1. batch-size: 攒批条数,达到设置条数后执行查询操作,默认值为0,0表示不开启攒批
  2. batch-interval: 攒批间隔,达到设置间隔后执行查询操作,默认为1 s

最终实现的效果如下:

SELECT /*+ LOOKUP('table'='o','batch-size'='10000','batch-interval'='1s') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id

新增BatchLookupOptions实现

该类用来描述攒批的参数,以及从Hints中提取对应的参数。具体试下如下:

package org.apache.flink.table.planner.plan.utils;

import org.apache.calcite.rel.hint.RelHint;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;

import static org.apache.flink.configuration.ConfigOptions.key;

/** BatchLookupOptions includes async related options. */
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("BatchOptions")
public class BatchLookupOptions {

    public static final String FIELD_NAME_BATCH_SIZE = "batch-size";
    public static final String FIELD_NAME_BATCH_INTERVAL = "batch-interval";


    public static final ConfigOption<Integer> BATCH_SIZE =
            key("batch-size")
                    .intType()
                    .defaultValue(0)
                    .withDescription("The batch size for batch lookup. If the batch size is 0, it means that the batch lookup is disabled.");

    public static final ConfigOption<Duration> BATCH_INTERVAL =
            key("batch-interval")
                    .durationType()
                    .defaultValue(Duration.ofSeconds(1))
                    .withDescription("The batch interval for batch lookup.");

    @JsonProperty(FIELD_NAME_BATCH_SIZE)
    public final Integer batchSize;

    @JsonProperty(FIELD_NAME_BATCH_INTERVAL)
    public final Duration batchInterval;


    @JsonCreator
    public BatchLookupOptions(
            @JsonProperty(FIELD_NAME_BATCH_SIZE) Integer batchSize,
            @JsonProperty(FIELD_NAME_BATCH_INTERVAL) Duration batchInterval
    ) {
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
    }

    public boolean enabled() {
        return batchSize > 0;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        BatchLookupOptions that = (BatchLookupOptions) o;
        return Objects.equals(batchSize, that.batchSize) && batchInterval == that.batchInterval;
    }

    @Override
    public int hashCode() {
        return Objects.hash(batchSize, batchInterval);
    }

    @Override
    public String toString() {
        return "BatchLookupOptions{" +
                ", batchSize=" + batchSize +
                ", batchInterval=" + batchInterval +
                '}';
    }

    @Nullable
    public static BatchLookupOptions fromJoinHint(@Nullable RelHint lookupJoinHint) {
        if (null != lookupJoinHint) {
            Configuration conf = Configuration.fromMap(lookupJoinHint.kvOptions);
            Integer batchSize = conf.get(BATCH_SIZE);
            Duration batchInterval = conf.get(BATCH_INTERVAL);
            return new BatchLookupOptions(batchSize, batchInterval);
        }
        return null;
    }
}

修改StreamPhysicalLookupJoin

目的是通过解析后的Hints生成Batch参数,并传递给StreamExecLookupJoin。

该类是用Scala编写,需要在对应scala包下开发,org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin。主要做两个变动:

  1. 构造代码中新增batchOptions参数
  // Support batch lookup
  lazy val batchOptions:Option[BatchLookupOptions] = Option.apply(BatchLookupOptions.fromJoinHint(lookupHint.orNull))
  1. 将batchOptions传入StreamExecLookupJoin构造方法中
  override def translateToExecNode(): ExecNode[_] = {
    val (projectionOnTemporalTable, filterOnTemporalTable) = calcOnTemporalTable match {
      case Some(program) =>
        val (projection, filter) = FlinkRexUtil.expandRexProgram(program)
        (JavaScalaConversionUtil.toJava(projection), filter.orNull)
      case _ =>
        (null, null)
    }

    new StreamExecLookupJoin(
      tableConfig,
      JoinTypeUtil.getFlinkJoinType(joinType),
      remainingCondition.orNull,
      new TemporalTableSourceSpec(temporalTable),
      allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava,
      projectionOnTemporalTable,
      filterOnTemporalTable,
      lookupKeyContainsPrimaryKey(),
      upsertMaterialize,
      asyncOptions.orNull,
      retryOptions.orNull,
      // add options for Batch
      batchOptions.orNull,
      inputChangelogMode,
      InputProperty.DEFAULT,
      FlinkTypeFactory.toLogicalRowType(getRowType),
      getRelDetailedDescription)
  }

修改CommonExecLookupJoin

目的是接受BatchLookupOptions参数,并根据参数创建BatchLookupJoinRunner实例。

类路径:

org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin

  1. 新增属性
    public static final String FIELD_NAME_BATCH_OPTIONS = "batchOptions";

    @JsonProperty(FIELD_NAME_BATCH_OPTIONS)
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private final @Nullable BatchLookupOptions batchOptions;
  1. 新增构造方法,支持BatchLookupOptions参数的传入
    protected CommonExecLookupJoin(
            int id,
            ExecNodeContext context,
            ReadableConfig persistedConfig,
            FlinkJoinType joinType,
            @Nullable RexNode joinCondition,
            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
            TemporalTableSourceSpec temporalTableSourceSpec,
            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
            @Nullable List<RexNode> projectionOnTemporalTable,
            @Nullable RexNode filterOnTemporalTable,
            @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
            @Nullable LookupJoinUtil.RetryLookupOptions retryOptions,
            // 新增batch参数
            @Nullable BatchLookupOptions batchOptions,
            ChangelogMode inputChangelogMode,
            List<InputProperty> inputProperties,
            RowType outputType,
            String description) {
        super(id, context, persistedConfig, inputProperties, outputType, description);
        checkArgument(inputProperties.size() == 1);
        this.joinType = checkNotNull(joinType);
        this.joinCondition = joinCondition;
        this.lookupKeys = Collections.unmodifiableMap(checkNotNull(lookupKeys));
        this.temporalTableSourceSpec = checkNotNull(temporalTableSourceSpec);
        this.projectionOnTemporalTable = projectionOnTemporalTable;
        this.filterOnTemporalTable = filterOnTemporalTable;
        this.inputChangelogMode = inputChangelogMode;
        this.asyncLookupOptions = asyncLookupOptions;
        this.retryOptions = retryOptions;
        this.batchOptions = batchOptions;
    }
    /**
     * 兼容之前的构造方法
     */
    protected CommonExecLookupJoin(
            int id,
            ExecNodeContext context,
            ReadableConfig persistedConfig,
            FlinkJoinType joinType,
            @Nullable RexNode joinCondition,
            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
            TemporalTableSourceSpec temporalTableSourceSpec,
            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
            @Nullable List<RexNode> projectionOnTemporalTable,
            @Nullable RexNode filterOnTemporalTable,
            @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
            @Nullable LookupJoinUtil.RetryLookupOptions retryOptions,
            ChangelogMode inputChangelogMode,
            List<InputProperty> inputProperties,
            RowType outputType,
            String description) {
        this(id,
                context,
                persistedConfig,
                joinType,
                joinCondition,
                temporalTableSourceSpec,
                lookupKeys,
                projectionOnTemporalTable,
                filterOnTemporalTable,
                asyncLookupOptions,
                retryOptions,
                null,
                inputChangelogMode,
                inputProperties,
                outputType,
                description);
    }
  1. 修改createSyncLookupJoinFunction,支持根据参数创建不同的LookupRunner
    private ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(
            RelOptTable temporalTable,
            ExecNodeConfig config,
            ClassLoader classLoader,
            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
            TableFunction<?> syncLookupFunction,
            RelBuilder relBuilder,
            RowType inputRowType,
            RowType tableSourceRowType,
            RowType resultRowType,
            boolean isLeftOuterJoin,
            boolean isObjectReuseEnabled) {

        DataTypeFactory dataTypeFactory =
                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();

        int[] orderedLookupKeys = LookupJoinUtil.getOrderedLookupKeys(allLookupKeys.keySet());

        GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
                LookupJoinCodeGenerator.generateSyncLookupFunction(
                        config,
                        classLoader,
                        dataTypeFactory,
                        inputRowType,
                        tableSourceRowType,
                        resultRowType,
                        allLookupKeys,
                        orderedLookupKeys,
                        syncLookupFunction,
                        StringUtils.join(temporalTable.getQualifiedName(), "."),
                        isObjectReuseEnabled);

        RelDataType projectionOutputRelDataType = getProjectionOutputRelDataType(relBuilder);
        RowType rightRowType =
                getRightOutputRowType(projectionOutputRelDataType, tableSourceRowType);
        GeneratedCollector<ListenableCollector<RowData>> generatedCollector =
                LookupJoinCodeGenerator.generateCollector(
                        new CodeGeneratorContext(config, classLoader),
                        inputRowType,
                        rightRowType,
                        resultRowType,
                        JavaScalaConversionUtil.toScala(Optional.ofNullable(joinCondition)),
                        JavaScalaConversionUtil.toScala(Optional.empty()),
                        true);
        ProcessFunction<RowData, RowData> processFunc;
        // if batch mode is enabled, use BatchLookupJoinRunner
        if (batchOptions != null && batchOptions.enabled()) {
            processFunc = new BatchLookupJoinRunner(
                    generatedFetcher,
                    generatedCollector,
                    LookupJoinUtil.getOrderedLookupKeys(allLookupKeys.keySet()),
                    tableSourceRowType,
                    isLeftOuterJoin,
                    rightRowType.getFieldCount(),
                    batchOptions.batchSize,
                    batchOptions.batchInterval.toMillis()
            );
        } else if (projectionOnTemporalTable != null) {
            // a projection or filter after table source scan
            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
                    LookupJoinCodeGenerator.generateCalcMapFunction(
                            config,
                            classLoader,
                            JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                            filterOnTemporalTable,
                            projectionOutputRelDataType,
                            tableSourceRowType);

            processFunc =
                    new LookupJoinWithCalcRunner(
                            generatedFetcher,
                            generatedCalc,
                            generatedCollector,
                            isLeftOuterJoin,
                            rightRowType.getFieldCount());
        } else {
            // right type is the same as table source row type, because no calc after temporal table
            processFunc =
                    new LookupJoinRunner(
                            generatedFetcher,
                            generatedCollector,
                            isLeftOuterJoin,
                            rightRowType.getFieldCount());
        }
        return processFunc;
    }

修改StreamExecLookupJoin

目的是接受BatchLookupOptions参数

类路径:org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin

  1. 新增构造函数
    public StreamExecLookupJoin(
            ReadableConfig tableConfig,
            FlinkJoinType joinType,
            @Nullable RexNode joinCondition,
            TemporalTableSourceSpec temporalTableSourceSpec,
            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
            @Nullable List<RexNode> projectionOnTemporalTable,
            @Nullable RexNode filterOnTemporalTable,
            boolean lookupKeyContainsPrimaryKey,
            boolean upsertMaterialize,
            @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions,
            @Nullable LookupJoinUtil.RetryLookupOptions retryOptions,
            // 新增batch参数
            @Nullable BatchLookupOptions batchOptions,
            ChangelogMode inputChangelogMode,
            InputProperty inputProperty,
            RowType outputType,
            String description) {
        this(
                ExecNodeContext.newNodeId(),
                ExecNodeContext.newContext(StreamExecLookupJoin.class),
                ExecNodeContext.newPersistedConfig(StreamExecLookupJoin.class, tableConfig),
                joinType,
                joinCondition,
                temporalTableSourceSpec,
                lookupKeys,
                projectionOnTemporalTable,
                filterOnTemporalTable,
                lookupKeyContainsPrimaryKey,
                upsertMaterialize,
                asyncLookupOptions,
                retryOptions,
                batchOptions,
                inputChangelogMode,
                Collections.singletonList(inputProperty),
                outputType,
                description);
    }

实现攒批查询的处理逻辑

攒批支持按照条数、按照时间段攒批。

img

包路径:org.apache.flink.table.runtime.operators.join.lookup.BatchLookupJoinRunner

实现代码:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.runtime.operators.join.lookup;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/** The join runner to lookup the dimension table. */
public class BatchLookupJoinRunner extends ProcessFunction<RowData, RowData> implements CheckpointedFunction {
    private static final long serialVersionUID = -4521543015709964734L;

    private final GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher;
    private final GeneratedCollector<ListenableCollector<RowData>> generatedCollector;
    private final int[] lookupKeyIndicesInOrder;
    private final RowType tableRowType;
    protected final boolean isLeftOuterJoin;
    protected final int tableFieldsCount;
    private final List<RowData> recordBuffers;
    private transient ListState<RowData> bufferState;
    private transient FlatMapFunction<RowData, RowData> fetcher;

    protected transient JoinedRowData outRow;
    private transient GenericRowData nullRow;

    private transient RowData.FieldGetter[] keyFieldGetters;
    private transient Map<RowData, Collection<RowData>> cache;
    private transient CollectorWrapper collector;
    private transient ScheduledExecutorService executorService;
    private transient Collector<RowData> collectorHolder;


    private final int batchSize;
    private final long batchInterval;

    private transient Counter bufferCounter;

    public BatchLookupJoinRunner(
            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher,
            GeneratedCollector<ListenableCollector<RowData>> generatedCollector,
            int[] lookupKeyIndicesInOrder,
            RowType tableRowType,
            boolean isLeftOuterJoin,
            int tableFieldsCount,
            int batchSize,
            long batchInterval
    ) {
        this.generatedFetcher = generatedFetcher;
        this.generatedCollector = generatedCollector;
        this.isLeftOuterJoin = isLeftOuterJoin;
        this.tableFieldsCount = tableFieldsCount;
        this.lookupKeyIndicesInOrder = lookupKeyIndicesInOrder;
        this.tableRowType = tableRowType;
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
        this.recordBuffers = new ArrayList<>(batchSize);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());

        FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
        FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext());
        FunctionUtils.openFunction(fetcher, parameters);
        FunctionUtils.openFunction(collector, parameters);

        this.nullRow = new GenericRowData(tableFieldsCount);
        this.outRow = new JoinedRowData();

        this.keyFieldGetters = Arrays.stream(lookupKeyIndicesInOrder)
                .mapToObj(i -> RowData.createFieldGetter(tableRowType.getTypeAt(i), i))
                .toArray(RowData.FieldGetter[]::new);

        this.cache = new HashMap<>(batchSize);
        this.collector = new CollectorWrapper(generatedCollector.newInstance(getRuntimeContext().getUserCodeClassLoader()));

        //  Start a timer to emit the buffered records
        this.executorService = Executors.newScheduledThreadPool(1);
        this.executorService.scheduleAtFixedRate(() -> {
            try {
                synchronized (recordBuffers) {
                    if (collectorHolder != null && !recordBuffers.isEmpty()) {
                        emit(collectorHolder);
                    }
                }

            } catch (Exception e) {
                throw new TableException("Failed to emit the buffered records.", e);
            }
        }, 0, batchInterval, TimeUnit.MILLISECONDS);

        bufferCounter = getRuntimeContext().getMetricGroup().counter("batchBuffers");

    }

    @Override
    public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception {
        synchronized (recordBuffers) {
            bufferCounter.inc();
            recordBuffers.add(in);
            if (recordBuffers.size() >= batchSize) {
                emit(out);
            } else {
                this.collectorHolder = out;
            }
        }
    }

    public void emit(Collector<RowData> out) throws Exception {
        for (RowData input : recordBuffers) {
            GenericRowData key = new GenericRowData(lookupKeyIndicesInOrder.length);
            for (int i = 0; i < lookupKeyIndicesInOrder.length; i++) {
                key.setField(i, keyFieldGetters[i].getFieldOrNull(input));
            }
            prepareCollector(input, out);
            Collection<RowData> value = cache.get(key);
            if (value != null) {
                value.forEach(collector::collect);
            } else {
                doFetch(input);
                if (collector.isCollected()) {
                    cache.put(key, new ArrayList<>(collector.records));
                }
            }
            padNullForLeftJoin(input, out);
        }
        bufferCounter.dec(recordBuffers.size());
        recordBuffers.clear();
    }

    public void prepareCollector(RowData in, Collector<RowData> out) {
        collector.setCollector(out);
        collector.setInput(in);
        collector.reset();
    }

    public void doFetch(RowData in) throws Exception {
        // fetcher has copied the input field when object reuse is enabled
        fetcher.flatMap(in, getFetcherCollector());
    }

    public void padNullForLeftJoin(RowData in, Collector<RowData> out) {
        if (isLeftOuterJoin && !collector.isCollected()) {
            outRow.replace(in, nullRow);
            outRow.setRowKind(in.getRowKind());
            out.collect(outRow);
        }
    }

    public Collector<RowData> getFetcherCollector() {
        return collector;
    }


    @Override
    public void close() throws Exception {
        if (fetcher != null) {
            FunctionUtils.closeFunction(fetcher);
        }
        if (collector != null) {
            FunctionUtils.closeFunction(collector);
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        bufferState.clear();
        bufferState.addAll(recordBuffers);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.bufferState = context
                .getOperatorStateStore()
                .getListState(
                        new ListStateDescriptor<>(
                                "batch-lookup-buffers-state"
                                , RowData.class)
                );
        this.recordBuffers.addAll((List<RowData>) bufferState.get());

    }

    public static class CollectorWrapper extends ListenableCollector<RowData> {
        private final ListenableCollector<RowData> delegate;

        private final List<RowData> records;

        public CollectorWrapper(ListenableCollector<RowData> delegate) {
            this.delegate = delegate;
            this.records = new ArrayList<>();
        }

        @Override
        public void setCollectListener(@Nullable CollectListener<RowData> collectListener) {
            this.delegate.setCollectListener(collectListener);
        }


        @Override
        public void setInput(Object input) {
            this.delegate.setInput(input);
        }

        @Override
        public Object getInput() {
            return this.delegate.getInput();
        }

        @Override
        public void setCollector(Collector<?> collector) {
            this.delegate.setCollector(collector);
        }

        @Override
        public void outputResult(Object result) {
            this.delegate.outputResult(result);
        }

        @Override
        public boolean isCollected() {
            return this.delegate.isCollected();
        }

        @Override
        public void close() {
            this.delegate.close();
        }

        @Override
        public void setRuntimeContext(RuntimeContext t) {
            this.delegate.setRuntimeContext(t);
        }

        @Override
        public RuntimeContext getRuntimeContext() {
            return this.delegate.getRuntimeContext();
        }

        @Override
        public IterationRuntimeContext getIterationRuntimeContext() {
            return this.delegate.getIterationRuntimeContext();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            this.delegate.open(parameters);
        }

        @Override
        public void reset() {
            super.reset();
            this.records.clear();
        }

        @Override
        public void collect(RowData record) {
            this.records.add(record);
            delegate.collect(record);
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1681673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

苹果永久版安装PD虚拟机:Parallels Desktop 19 一键激活版

Parallels Desktop 19是一款功能强大的虚拟机软件&#xff0c;专为Mac用户设计&#xff0c;允许用户在同一台Mac电脑上同时运行Windows、Linux等多个操作系统&#xff0c;而无需额外的硬件设备。 下载地址&#xff1a;https://www.macz.com/mac/9581.html?idOTI2NjQ5Jl8mMjcuM…

力扣HOT100 - 32. 最长有效括号

解题思路&#xff1a; 栈 class Solution {public int longestValidParentheses(String s) {int max 0;// 也可以使用 Stack<Integer> stacknew Stack<>();但Stack是遗留类&#xff0c;不推荐Deque<Integer> stack new LinkedList<>();stack.push(…

Nginx HTTPS配置:一篇文章解决所有痛点

你好呀&#xff0c;我是赵兴晨&#xff0c;文科程序员。 今天&#xff0c;我将与大家分享一些关于Nginx的实用知识。这次的主题是&#xff1a;如何为Nginx配置HTTPS。我将从HTTP与HTTPS的区别讲起&#xff0c;然后逐步介绍Nginx的安装与HTTPS配置的详细步骤。 HTTP与HTTPS的区…

Github项目部署到自己的域名

天坑&#xff0c;买的是华为的域名&#xff0c;不过新用户才一块&#xff0c;就忍了 要买个域名&#xff0c;我买的是华为的&#xff08;此处是购买地址&#xff09; 购买后去控制台&#xff0c;点击“总览”进入域名页面 点击想要修改的域名后的“管理解析” 点击快速解析&…

vant添加列表, 日期选择总是填充到最后一个组内原因

添加多个行程, 无论在哪个行程上修改时间, 时间总是只显示在最后一个行程里 错误代码: <div class"journey"><divv-for"(item, index) in ruleform.hrms_business_item":key"index"><div class"journey-title">&l…

人工智能创新领衔,Android系统如虎添翼:2024 Google I/O 大会深度解析

人工智能创新领衔&#xff0c;Android系统如虎添翼&#xff1a;2024 Google I/O 大会深度解析 2024年5月14日举行的Google I/O大会&#xff0c;犹如一场精彩的科技盛宴&#xff0c;吸引了全球的目光。大会上&#xff0c;谷歌发布了一系列重磅产品和技术更新&#xff0c;展现了…

TMC4671超越传感器和摄像头之外——将物联网从云端转移到现实世界[应用案例]

在早期&#xff0c;物联网 (IoT) 在很大程度上充当了云端服务的“眼睛和耳朵”&#xff0c;收集传感器、摄像头和其他物理世界输入设备的数据&#xff0c;而不太注重操纵或控制它所监视的事物。支持物联网的自动化和机器人应用已经开始合并&#xff0c;但是一般而言&#xff0c…

SIP-7041 20W SIP广播网络有源音箱 校园广播20W木质SIP音箱

SIP-7041 20W SIP广播网络有源音箱 校园广播20W木质SIP音箱 一、描述 SIP-7041 20W SIP广播网络有源音箱 支持标准SIP协议 SIP-7041是我司的一款壁挂式网络有源音箱&#xff0c;具有10/100M以太网接口&#xff0c;可将网络音源通过自带的功放和喇叭输出播放&#xff0c;可达到…

win10专业版或者企业版安装docker,亲测过,请放心安装

in10专业版系统首先需要开启硬件虚拟化及Hyper-V功能&#xff0c;才能进行Docker for Windows软件安装 1.鼠标在桌面最下面&#xff0c;右击–任务管理器—性能—虚拟化&#xff08;已启用&#xff09;&#xff0c;看到这个已启用就说明OK啦&#xff1b; 2.win10系统&#xff…

iPhone携手ChatGPT?苹果OpenAI或将强强联手

近年来&#xff0c;人工智能技术的蓬勃发展掀起了一场席卷全球的科技浪潮&#xff0c;而智能手机作为人们日常生活中不可或缺的一部分&#xff0c;自然成为了这场AI竞赛的重要战场。各大科技巨头纷纷加码布局&#xff0c;力图在AI领域占据领先地位。 近日&#xff0c;据知情人士…

STAR-Echo:一种使用时空分析和基于Transformer的影像组学模型预后慢性肾脏病患者 MACE 预后的新型生物标志物

文章目录 STAR-Echo: A Novel Biomarker for Prognosis of MACE in Chronic Kidney Disease Patients Using Spatiotemporal Analysis and Transformer-Based Radiomics Models摘要方法实验结果 STAR-Echo: A Novel Biomarker for Prognosis of MACE in Chronic Kidney Disease…

LLMjacking:针对云托管AI大模型服务的新型攻击

Sysdig威胁研究团队(TRT)观察到一种新型攻击&#xff0c;命名为LLMjacking。它利用窃取的云凭证&#xff0c;对托管在云上的十个大型语言模型(LLM)服务发起攻击。 这些凭证是从一个流行的目标获得&#xff0c;即运行着一个存在漏洞的Laravel版本&#xff08;CVE-2021-3129&…

【介绍下Python多线程,什么是Python多线程】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

[笔试训练](二十四)070:判断是不是平衡二叉树071:最大子矩阵072:小葱的01串

目录 070:判断是不是平衡二叉树 071:最大子矩阵 072:小葱的01串 070:判断是不是平衡二叉树 题目链接:判断是不是平衡二叉树_牛客题霸_牛客网 (nowcoder.com) 题目&#xff1a; 题解&#xff1a; 递归&#xff1a;对于左右子树&#xff0c;为平衡二叉树时&#xff0c;返回…

废品回收微信小程序基于FastAdmin+ThinkPHP+UniApp

一款基于FastAdminThinkPHPUniApp开发的废品回收系统&#xff0c;适用废品回收站、再生资源回收公司上门回收使用的小程序。 一、FastAdmin框架特色功能及优势 模块化开发&#xff1a;控制器、模型、视图、JS一一对应&#xff0c;使用RequireJS进行插件机制&#xff0c;支持插…

计算机SCI期刊,中科院2区TOP,对国人相当友好,一周内出版!

一、期刊名称 ISA Transactions 二、期刊简介概况 期刊类型&#xff1a;SCI 学科领域&#xff1a;计算机科学 影响因子&#xff1a;7.3 中科院分区&#xff1a;2区TOP 出版方式&#xff1a;订阅模式/开放出版 版面费&#xff1a;选择开放出版需支付$3950 三、期刊征稿范…

python开发的学习路线

I. 基础知识学习 A. Python基础语法 变量和数据类型 学习如何定义变量&#xff0c;理解并使用不同的数据类型&#xff08;整数、浮点数、字符串、布尔值等&#xff09;。 掌握数字类型的转换和操作。 熟悉字符串的基本操作&#xff0c;如拼接、切片、替换和查找。 …

python常用基础知识

目录 &#xff08;1&#xff09;print函数 &#xff08;2&#xff09;注释 &#xff08;3&#xff09;input函数 &#xff08;4&#xff09;同时赋值和连续赋值 &#xff08;5&#xff09;type函数和id函数 &#xff08;6&#xff09;python赋值是地址赋值 &#xff08;…

vue加密传输,后端获取进行解密。

文章目录 概要Vue前端加密后端进行解密小结 概要 vue界面加密传输&#xff0c;后端获取进行解密&#xff0c;适用于登录时密码加密传输。 Vue前端加密 1.安装jsencrypt包&#xff1a; npm install jsencrypt安装完成后package.json会有jsencrypt依赖 2.引入jsencrypt.js到文…

CRM与SCRM:联系与区别

引言 在当今数字化时代&#xff0c;企业与客户之间的互动变得日益频繁而复杂。为了更好地管理客户关系并提供更个性化的服务&#xff0c;许多企业采用了客户关系管理&#xff08;CRM&#xff09;系统。与此同时&#xff0c;随着社交媒体的普及和社交化互动的增加&#xff0c;社…