Apache Flink类型及序列化研读生产应用|得物技术

news2024/11/18 3:24:02

一、背景

序列化是指将数据从内存中的对象序列化为字节流,以便在网络中传输或持久化存储。序列化在Apache Flink中非常重要,因为它涉及到数据传输和状态管理等关键部分。Apache Flink以其独特的方式来处理数据类型以及序列化,这种方式包括它自身的类型描述符、泛型类型提取以及类型序列化框架。本文将简单介绍它们背后的概念和基本原理,侧重分享在DataStream、Flink SQL自定义函数开发中对数据类型和序列的应用,以提升任务的运行效率。

二、简单理论阐述(基于Flink 1.13)

主要参考Apache Flink 1.13

支持的数据类型

  • Java Tuples and Scala Case Classes
  • Java POJOs
  • Primitive Types
  • Regular Classes
  • Values
  • Hadoop Writables
  • Special Types

具体的数据类型定义在此就不详细介绍了,具体描述可以前往Flink官网查看。

TypeInformation

Apache Flink量身定制了一套序列化框架,好处就是选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好地选取序列化方式,进行数据布局,节省数据的存储空间,甚至直接操作二进制数据。

TypeInformation类是Apache Flink所有类型描述符的基类,通过阅读源码,我们可以大概分成以下几种数据类型。

  • Basic types:所有的Java类型以及包装类:void,String,Date,BigDecimal,and BigInteger等。
  • Primitive arrays以及Object arrays
  • Composite types
  • Flink Java Tuples(Flink Java API的一部分):最多25个字段,不支持空字段
  • Scala case classes(包括Scala Tuples):不支持null字段
  • Row:具有任意数量字段并支持空字段的Tuples
  • POJO 类:JavaBeans
  • Auxiliary types (Option,Either,Lists,Maps,…)
  • Generic types:Flink内部未维护的类型,这种类型通常是由Kryo序列化。

我们简单看下该类的方法,核心是createSerializer,获取org.apache.flink.api.common.typeutils.TypeSerializer,执行序列化以及反序列化方法,主要是:

  • org.apache.flink.api.common.typeutils.TypeSerializer#serialize
  • org.apache.flink.api.common.typeutils.TypeSerializer#deserialize(org.apache.flink.core.memory.DataInputView)

何时需要数据类型获取

在Apache Flink中,算子间的数据类型传递是通过流处理的数据流来实现的。数据流可以在算子之间流动,每个算子对数据流进行处理并产生输出。当数据流从一个算子流向另一个算子时,数据的类型也会随之传递。Apache Flink使用自动类型推断机制来确定数据流中的数据类型。在算子之间传递数据时,Apache Flink会根据上下文自动推断数据的类型,并在运行时保证数据的类型一致性。

举个例子:新增一个kafka source,这个时候我们需要指定数据输出类型。

@Experimental
public <OUT> DataStreamSource<OUT> fromSource(
        Source<OUT, ?, ?> source,
        WatermarkStrategy<OUT> timestampsAndWatermarks,
        String sourceName,
        TypeInformation<OUT> typeInfo) {

    final TypeInformation<OUT> resolvedTypeInfo =
            getTypeInfo(source, sourceName, Source.class, typeInfo);

    return new DataStreamSource<>(
            this,
            checkNotNull(source, "source"),
            checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
            checkNotNull(resolvedTypeInfo),
            checkNotNull(sourceName));
}

那输入类型怎么不需要指定呢?可以简单看下OneInputTransformation(单输入算子的基类)类的getInputType()方法,就是以输入算子的输出类型为输入类型的。

/** Returns the {@code TypeInformation} for the elements of the input. */
public TypeInformation<IN> getInputType() {
    return input.getOutputType();
}

这样source的输出类型会变成下一个算子的输入。整个DAG的数据类型都会传递下去。Apache Flink获取到数据类型后,就可以获取对应的序列化方法。

还有一种情况就是与状态后端交互的时候需要获取数据类型,特别是非JVM堆存储的后端,需要频繁的序列化以及反序列化,例如RocksDBStateBackend

举个例子,当我们使用ValueState时需要调用以下API:

org.apache.flink.streaming.api.operators.StreamingRuntimeContext#getState

@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
    KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
    stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
    return keyedStateStore.getState(stateProperties);
}

public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
    if (serializerAtomicReference.get() == null) {
        checkState(typeInfo != null, "no serializer and no type info");
        // try to instantiate and set the serializer
        TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
        // use cas to assure the singleton
        if (!serializerAtomicReference.compareAndSet(null, serializer)) {
            LOG.debug("Someone else beat us at initializing the serializer.");
        }
    }
}

可以从org.apache.flink.api.common.state.StateDescriptor#initializeSerializerUnlessSet方法看出,需要通过传入的数据类型来获取具体的序列化器。来执行具体的序列化和反序列化逻辑,完成数据的交互。

数据类型的自动推断

乍一看很复杂,各个环节都需要指定数据类型。其实大部分应用场景下,我们不用关注数据的类型以及序列化方式。Flink会尝试推断有关分布式计算期间交换和存储的数据类型的信息。

这里简单介绍Flink类型自动推断的核心类:

org.apache.flink.api.java.typeutils.TypeExtractor

在数据流操作中,Flink使用了泛型来指定输入和输出的类型。例如,DataStream表示一个具有类型T的数据流。在代码中使用的泛型类型参数T会被TypeExtractor类解析和推断。在运行时,Apache Flink会通过调用TypeExtractor的静态方法来分析操作的输入和输出,并将推断出的类型信息存储在运行时的环境中。

举个例子:用的最多的flatMap算子,当我们不指定返回类型的时候,Flink会调用TypeExtractor类自动去推断用户的类型。

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
    return this.flatMap(flatMapper, outType);
}

一般看开源框架某个类的功能我都会先看类的注释,也看TypeExtractor的注释,大概意思这是一个对类进行反射分析的实用程序,用于确定返回的数据类型。

/**
 * A utility for reflection analysis on classes, to determine the return type of implementations of
 * transformation functions.
 *
 * <p>NOTES FOR USERS OF THIS CLASS: Automatic type extraction is a hacky business that depends on a
 * lot of variables such as generics, compiler, interfaces, etc. The type extraction fails regularly
 * with either {@link MissingTypeInfo} or hard exceptions. Whenever you use methods of this class,
 * make sure to provide a way to pass custom type information as a fallback.
 */

我们来看下其中一个核心的静态推断逻辑,org.apache.flink.api.java.typeutils.TypeExtractor#getUnaryOperatorReturnType

@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
        Function function,
        Class<?> baseClass,
        int inputTypeArgumentIndex,
        int outputTypeArgumentIndex,
        int[] lambdaOutputTypeArgumentIndices,
        TypeInformation<IN> inType,
        String functionName,
        boolean allowMissing) {

    Preconditions.checkArgument(
            inType == null || inputTypeArgumentIndex >= 0,
            "Input type argument index was not provided");
    Preconditions.checkArgument(
            outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
    Preconditions.checkArgument(
            lambdaOutputTypeArgumentIndices != null,
            "Indices for output type arguments within lambda not provided");

    // explicit result type has highest precedence
    if (function instanceof ResultTypeQueryable) {
        return ((ResultTypeQueryable<OUT>) function).getProducedType();
    }

    // perform extraction
    try {
        final LambdaExecutable exec;
        try {
            exec = checkAndExtractLambda(function);
        } catch (TypeExtractionException e) {
            throw new InvalidTypesException("Internal error occurred.", e);
        }
        if (exec != null) {

            // parameters must be accessed from behind, since JVM can add additional parameters
            // e.g. when using local variables inside lambda function
            // paramLen is the total number of parameters of the provided lambda, it includes
            // parameters added through closure
            final int paramLen = exec.getParameterTypes().length;

            final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);

            // number of parameters the SAM of implemented interface has; the parameter indexing
            // applies to this range
            final int baseParametersLen = sam.getParameterTypes().length;

            final Type output;
            if (lambdaOutputTypeArgumentIndices.length > 0) {
                output =
                        TypeExtractionUtils.extractTypeFromLambda(
                                baseClass,
                                exec,
                                lambdaOutputTypeArgumentIndices,
                                paramLen,
                                baseParametersLen);
            } else {
                output = exec.getReturnType();
                TypeExtractionUtils.validateLambdaType(baseClass, output);
            }

            return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
        } else {
            if (inType != null) {
                validateInputType(
                        baseClass, function.getClass(), inputTypeArgumentIndex, inType);
            }
            return new TypeExtractor()
                    .privateCreateTypeInfo(
                            baseClass,
                            function.getClass(),
                            outputTypeArgumentIndex,
                            inType,
                            null);
        }
    } catch (InvalidTypesException e) {
        if (allowMissing) {
            return (TypeInformation<OUT>)
                    new MissingTypeInfo(
                            functionName != null ? functionName : function.toString(), e);
        } else {
            throw e;
        }
    }
}
  • 首先判断该算子是否实现了ResultTypeQueryable接口,本质上就是用户是否显式指定了数据类型,例如我们熟悉的Kafka source就实现了该方法,当使用了JSONKeyValueDeserializationSchema,就显式指定了类型,用户自定义Schema就需要自己指定。
public class KafkaSource<OUT>
        implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
                ResultTypeQueryable<OUT>
//deserializationSchema 是需要用户自己定义的。
@Override
public TypeInformation<OUT> getProducedType() {
    return deserializationSchema.getProducedType();
}                
//JSONKeyValueDeserializationSchema
@Override
public TypeInformation<ObjectNode> getProducedType() {
    return getForClass(ObjectNode.class);
}
  • 未实现ResultTypeQueryable接口,就会通过反射的方法获取ReturnType,判断逻辑大概是从是否是Java 8 lambda方法开始判断的。获取到返回类型后再通过new TypeExtractor()).privateCreateTypeInfo(output,inType,(TypeInformation)null)封装成Flink内部能识别的数据类型;大致分为2类,泛型类型变量TypeVariable以及非泛型类型变量。这个封装的过程也是非常重要的,推断的数据类型是Flink内部封装好的类型,序列化基本都很高效,如果不是, 就会推断为GenericTypeInfo走Kryo等序列化方式。如感兴趣,可以看下这块的源码,在此不再赘述。

通过以上的代码逻辑的阅读,我们大概能总结出以下结论:Flink内部维护了很多高效的序列化方式,通常只有数据类型被推断为org.apache.flink.api.java.typeutils.GenericTypeInfo时我们才需要自定义序列化类型,否则性能就是灾难;或者无法推断类型的时候,例如Flink SQL复杂类型有时候是无法自动推断类型的,当然某些特殊的对象Kryo也无法序列化,比如之前遇到过TreeMap无法Kryo序列化 (也可能是自己姿势不对),建议在开发Apache Flink作业时可以养成显式指定数据类型的好习惯。

三、开发实践

Flink代码作业

如何显式指定数据类型

这个简单了,几乎所有的source、Keyby、算子等都暴露了指定TypeInformation typeInfo的构造方法,以下简单列举几个:

  • source
@Experimental
public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo) {
    TypeInformation<OUT> resolvedTypeInfo = this.getTypeInfo(source, sourceName, Source.class, typeInfo);
    return new DataStreamSource(this, (Source)Preconditions.checkNotNull(source, "source"), (WatermarkStrategy)Preconditions.checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"), (TypeInformation)Preconditions.checkNotNull(resolvedTypeInfo), (String)Preconditions.checkNotNull(sourceName));
}
  • map
public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
  • 自定义Operator
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
  • keyBy
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
    Preconditions.checkNotNull(key);
    Preconditions.checkNotNull(keyType);
    return new KeyedStream(this, (KeySelector)this.clean(key), keyType);
}
  • 状态后端
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
    super(name, typeInfo, (Object)null);
}

自定义数据类型&自定义序列化器

当遇到复杂数据类型,或者需要优化任务性能时,需要自定义数据类型,以下分享几种场景以及实现代码:

  • POJO类

例如大家最常用的POJO类,何为POJO类大家可以自行查询,Flink对POJO类做了大量的优化,大家使用Java对象最好满足POJO的规范。

举个例子,这是一个典型的POJO类:

@Data
public class BroadcastConfig implements Serializable {
    public String config_type;
    public String date;
    public String media_id;
    public String account_id;
    public String label_id;
    public long start_time;
    public long end_time;
    public int interval;
    public String msg;

    public BroadcastConfig() {
    }
    }

我们可以这样指定其数据类型,返回的数据类就是一个TypeInformation

HashMap<String, TypeInformation<?>> pojoFieldName = new HashMap<>();
pojoFieldName.put("config_type", Types.STRING);
pojoFieldName.put("date", Types.STRING);
pojoFieldName.put("media_id", Types.STRING);
pojoFieldName.put("account_id", Types.STRING);
pojoFieldName.put("label_id", Types.STRING);
pojoFieldName.put("start_time", Types.LONG);
pojoFieldName.put("end_time", Types.LONG);
pojoFieldName.put("interval", Types.INT);
pojoFieldName.put("msg", Types.STRING);

return Types.POJO(
        BroadcastConfig.class,
        pojoFieldName
);

如感兴趣,可以看下org.apache.flink.api.java.typeutils.runtime.PojoSerializer,看Flink本身对其做了哪些优化。

  • 自定义TypeInformation

某些特殊场景可能还需要复杂的对象,例如,需要极致的性能优化,在Flink Table Api中数据对象传输,大部分都是BinaryRowdata,效率非常高。我们在Flink Datastram代码作业中也想使用,怎么操作呢?这里分享一种实现方式——自定义TypeInformation,当然还有更优的实现方式,这里就不介绍了。

代码实现:本质上就是继承TypeInformation,实现对应的方法。核心逻辑是createSerializer()方法,这里我们直接使用Table Api中已经实现的BinaryRowDataSerializer,就可以达到同Flink SQL相同的性能优化。

public  class BinaryRowDataTypeInfo extends TypeInformation<BinaryRowData> {

    private static final long serialVersionUID = 4786289562505208256L;
    private final int numFields;
    private final Class<BinaryRowData> clazz;
    private final TypeSerializer<BinaryRowData> serializer;

    public BinaryRowDataTypeInfo(int numFields) {
        this.numFields=numFields;
        this.clazz=BinaryRowData.class;
        serializer= new BinaryRowDataSerializer(numFields);
    }

    @Override
    public boolean isBasicType() {
        return false;
    }

    @Override
    public boolean isTupleType() {
        return false;
    }

    @Override
    public int getArity() {
        return numFields;
    }

    @Override
    public int getTotalFields() {
        return numFields;
    }

    @Override
    public Class<BinaryRowData> getTypeClass() {
        return this.clazz;
    }

    @Override
    public boolean isKeyType() {
        return false;
    }

    @Override
    public TypeSerializer<BinaryRowData> createSerializer(ExecutionConfig config) {
        return serializer;
    }

    @Override
    public String toString() {
        return "BinaryRowDataTypeInfo<" + clazz.getCanonicalName() + ">";
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof BinaryRowDataTypeInfo) {

            BinaryRowDataTypeInfo that = (BinaryRowDataTypeInfo) obj;

            return that.canEqual(this)
                    && this.numFields==that.numFields;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return Objects.hash(this.clazz,serializer.hashCode());
    }

    @Override
    public boolean canEqual(Object obj) {
        return obj instanceof BinaryRowDataTypeInfo;
    }
}

所以这里建议Apache Flink代码作业开发可以尽可能使用已经优化好的数据类型,例如BinaryRowdata,可以用于高性能的数据处理场景,例如在内存中进行批处理或流式处理。由于数据以二进制形式存储,可以更有效地使用内存和进行数据序列化。同时,BinaryRowData还提供了一组方法,用于访问和操作二进制数据。

  • 自定义TypeSerializer

上面的例子只是自定义了TypeInformation,当然还会遇到自定义TypeSerializer的场景,例如Apache Flink本身没有封装的数据类型。

代码实现:这里以位图存储Roaring64Bitmap为例,在某些特殊场景可以使用bitmap精准去重,减少存储空间。

我们需要继承TypeSerializer,实现其核心逻辑也是serialize() 、deserialize() 方法,可以使用Roaring64Bitmap自带的序列化、反序列化方法。如果你使用的复杂对象没有提供序列化方法,你也可以自己实现或者找开源的序列化器。有了自定义的TypeSerializer就可以在你自定义的TypeInformation中调用。

public class Roaring64BitmapTypeSerializer extends TypeSerializer<Roaring64Bitmap> {
    /**
     * Sharable instance of the Roaring64BitmapTypeSerializer.
     */
    public static final Roaring64BitmapTypeSerializer INSTANCE = new Roaring64BitmapTypeSerializer();
    private static final long serialVersionUID = -8544079063839253971L;

    @Override
    public boolean isImmutableType() {
        return false;
    }

    @Override
    public TypeSerializer<Roaring64Bitmap> duplicate() {
        return this;
    }

    @Override
    public Roaring64Bitmap createInstance() {
        return new Roaring64Bitmap();
    }

    @Override
    public Roaring64Bitmap copy(Roaring64Bitmap from) {
        Roaring64Bitmap copiedMap = new Roaring64Bitmap();
        from.forEach(copiedMap::addLong);
        return copiedMap;
    }

    @Override
    public Roaring64Bitmap copy(Roaring64Bitmap from, Roaring64Bitmap reuse) {
        from.forEach(reuse::addLong);
        return reuse;
    }

    @Override
    public int getLength() {
        return -1;
    }

    @Override
    public void serialize(Roaring64Bitmap record, DataOutputView target) throws IOException {
        record.serialize(target);
    }

    @Override
    public Roaring64Bitmap deserialize(DataInputView source) throws IOException {
        Roaring64Bitmap navigableMap = new Roaring64Bitmap();
        navigableMap.deserialize(source);
        return navigableMap;
    }

    @Override
    public Roaring64Bitmap deserialize(Roaring64Bitmap reuse, DataInputView source) throws IOException {
        reuse.deserialize(source);
        return reuse;
    }

    @Override
    public void copy(DataInputView source, DataOutputView target) throws IOException {
        Roaring64Bitmap deserialize = this.deserialize(source);
        copy(deserialize);
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        } else if (obj != null && obj.getClass() == Roaring64BitmapTypeSerializer.class) {
            return true;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return this.getClass().hashCode();
    }

    @Override
    public TypeSerializerSnapshot<Roaring64Bitmap> snapshotConfiguration() {
        return new Roaring64BitmapTypeSerializer.Roaring64BitmapSerializerSnapshot();
    }

    public static final class Roaring64BitmapSerializerSnapshot
            extends SimpleTypeSerializerSnapshot<Roaring64Bitmap> {

        public Roaring64BitmapSerializerSnapshot() {
            super(() -> Roaring64BitmapTypeSerializer.INSTANCE);
        }
    }
}

Flink SQL自定义函数

如何显式指定数据类型

这里简单分享下,在自定义Function开发下遇到复杂数据类型无法在accumulator 或者input、output中使用的问题,这里我们只介绍使用复杂数据对象如何指定数据类型的场景。

我们可以先看下FunctionDefinitionConvertRule,这是Apache Flink中的一个规则(Rule),用于将用户自定义的函数定义转换为对应的实现。其中通过getTypeInference()方法返回用于执行对此函数定义的调用的类型推理的逻辑。

@Override
public Optional<RexNode> convert(CallExpression call, ConvertContext context) {
    FunctionDefinition functionDefinition = call.getFunctionDefinition();

    // built-in functions without implementation are handled separately
    if (functionDefinition instanceof BuiltInFunctionDefinition) {
        final BuiltInFunctionDefinition builtInFunction =
                (BuiltInFunctionDefinition) functionDefinition;
        if (!builtInFunction.getRuntimeClass().isPresent()) {
            return Optional.empty();
        }
    }

    TypeInference typeInference =
            functionDefinition.getTypeInference(context.getDataTypeFactory());
    if (typeInference.getOutputTypeStrategy() == TypeStrategies.MISSING) {
        return Optional.empty();
    }

    switch (functionDefinition.getKind()) {
        case SCALAR:
        case TABLE:
            List<RexNode> args =
                    call.getChildren().stream()
                            .map(context::toRexNode)
                            .collect(Collectors.toList());

            final BridgingSqlFunction sqlFunction =
                    BridgingSqlFunction.of(
                            context.getDataTypeFactory(),
                            context.getTypeFactory(),
                            SqlKind.OTHER_FUNCTION,
                            call.getFunctionIdentifier().orElse(null),
                            functionDefinition,
                            typeInference);

            return Optional.of(context.getRelBuilder().call(sqlFunction, args));
        default:
            return Optional.empty();
    }
}

那我们指定复杂类型也会从通过该方法实现,不多说了,直接上代码实现。

  • 指定accumulatorType

这是之前写的AbstractLastValueWithRetractAggFunction功能主要是为了实现具有local-global的逻辑的LastValue,提升作业性能。

accumulator对象:LastValueWithRetractAccumulator,可以看到该对象是一个非常复杂的对象,包含5个属性,还有List 复杂嵌套,以及MapView等可以操作状态后端的对象,甚至有Object这种通用的对象。

public static class LastValueWithRetractAccumulator {
    public Object lastValue = null;
    public Long lastOrder = null;
    public List<Tuple2<Object, Long>> retractList = new ArrayList<>();
    public MapView<Object, List<Long>> valueToOrderMap = new MapView<>();
    public MapView<Long, List<Object>> orderToValueMap = new MapView<>();

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof LastValueWithRetractAccumulator)) {
            return false;
        }
        LastValueWithRetractAccumulator that = (LastValueWithRetractAccumulator) o;
        return Objects.equals(lastValue, that.lastValue)
                && Objects.equals(lastOrder, that.lastOrder)
                && Objects.equals(retractList, that.retractList)
                && valueToOrderMap.equals(that.valueToOrderMap)
                && orderToValueMap.equals(that.orderToValueMap)
                ;
    }

    @Override
    public int hashCode() {
        return Objects.hash(lastValue, lastOrder, valueToOrderMap, orderToValueMap, retractList);
    }

}

getTypeInference() 是FunctionDefinition接口的方法,而所有的用户自定义函数都实现了该接口,我们只需要重新实现下该方法就可以,以下是代码实现。

这里我们还需要用到工具类TypeInference,这是Flink中的一个模块,用于进行类型推断和类型推理。

可以看出我们在accumulatorTypeStrategy方法中传入了一个构建好的TypeStrategy;这里我们将LastValueWithRetractAccumulator定义为了一个STRUCTURED,不同的属性定义为具体的数据类型,DataTypes工具类提供了很多丰富的对象形式,还有万能的RAW类型。

public TypeInference getTypeInference(DataTypeFactory typeFactory) {

    return TypeInference.newBuilder()
            .accumulatorTypeStrategy(callContext -> {

                List<DataType> dataTypes = callContext.getArgumentDataTypes();

                DataType argDataType;
                if (dataTypes.get(0)
                        .getLogicalType()
                        .getTypeRoot()
                        .getFamilies()
                        .contains(LogicalTypeFamily.CHARACTER_STRING)) {
                    argDataType = DataTypes.STRING();
                } else
                    argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));


                DataType accDataType = DataTypes.STRUCTURED(
                        LastValueWithRetractAccumulator.class,
                        DataTypes.FIELD("lastValue", argDataType.nullable()),
                        DataTypes.FIELD("lastOrder", DataTypes.BIGINT()),
                        DataTypes.FIELD("retractList", DataTypes.ARRAY(
                                DataTypes.STRUCTURED(
                                        Tuple2.class,
                                        DataTypes.FIELD("f0", argDataType.nullable()),
                                        DataTypes.FIELD("f1", DataTypes.BIGINT())
                                )).bridgedTo(List.class)),
                        DataTypes.FIELD(
                                "valueToOrderMap",
                                MapView.newMapViewDataType(
                                        argDataType.nullable(),
                                        DataTypes.ARRAY(DataTypes.BIGINT()).bridgedTo(List.class))),
                        //todo:blink 使用SortedMapView 优化性能,开源使用MapView key天然字典升序,倒序遍历性能可能不佳
                        DataTypes.FIELD(
                                "orderToValueMap",
                                MapView.newMapViewDataType(
                                        DataTypes.BIGINT(),
                                        DataTypes.ARRAY(argDataType.nullable()).bridgedTo(List.class)))
                );

                return Optional.of(accDataType);
            })
            .build()
            ;
}
  • 指定outputType

这个也很简单,直接上代码实现,主要就是outputTypeStrategy中传入需要输出的数据类型即可。

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {

    return TypeInference.newBuilder()
            .outputTypeStrategy(callContext -> {

                List<DataType> dataTypes = callContext.getArgumentDataTypes();

                DataType argDataType;

                if (dataTypes.get(0)
                        .getLogicalType()
                        .getTypeRoot()
                        .getFamilies()
                        .contains(LogicalTypeFamily.CHARACTER_STRING)) {
                    argDataType = DataTypes.STRING();
                } else
                    argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));

                return Optional.of(argDataType);
            })
            .build()
            ;
}
  • 指定intputType

在此就不做介绍了,同以上类似,在inputTypeStrategy方法传入定义好的TypeStrategy就好。

  • 根据inputType动态调整outType或者accumulatorType

在某些场景下,我们需要让函数功能性更强,比如当我输入是bigint类型的时候,我输出bigint类型等,类似的逻辑。

大家可以发现outputTypeStrategy或者 accumulatorTypeStrategy的入参都是 实现了 TypeStrategy接口的对象,并且需要实现inferType方法。在Flink框架调用该方法的时候会传入一个上下文对象CallContext,提供了获取函数入参类型的api getArgumentDataTypes();

代码实现:这里的逻辑是将获取到的第一个入参对象的类型指定为输出对象的类型。

.outputTypeStrategy(callContext -> {

    List<DataType> dataTypes = callContext.getArgumentDataTypes();

    DataType argDataType;

    if (dataTypes.get(0)
            .getLogicalType()
            .getTypeRoot()
            .getFamilies()
            .contains(LogicalTypeFamily.CHARACTER_STRING)) {
        argDataType = DataTypes.STRING();
    } else
        argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));

    return Optional.of(argDataType);
}

自定义DataType

可以发现以上分享几乎都是使用的DataTypes封装好的类型,比如DataTypes.STRING()、DataTypes.Long()等。那如果我们需要封装一些其他对象如何操作呢?上文提到DataTypes提供了一个自定义任意类型的方法。

/**
 * Data type of an arbitrary serialized type. This type is a black box within the table
 * ecosystem and is only deserialized at the edges.
 *
 * <p>The raw type is an extension to the SQL standard.
 *
 * <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link
 * #RAW(Class)} for automatically generating a serializer.
 *
 * @param clazz originating value class
 * @param serializer type serializer
 * @see RawType
 */
public static <T> DataType RAW(Class<T> clazz, TypeSerializer<T> serializer) {
    return new AtomicDataType(new RawType<>(clazz, serializer));
}

我们有这样的一个场景,需要在自定义的函数中使用bitmap计算UV值,需要定义Roaring64Bitmap为accumulatorType,直接上代码实现。

这里的Roaring64BitmapTypeSerializer已经在《自定义TypeSerializer》小段中实现,有兴趣的同学可以往上翻翻。

public TypeInference getTypeInference(DataTypeFactory typeFactory) {

    return TypeInference.newBuilder()
            .accumulatorTypeStrategy(callContext -> {
                DataType type = DataTypes.RAW(
                        Roaring64Bitmap.class,
                        Roaring64BitmapTypeSerializer.INSTANCE
                );
                return Optional.of(type);
            })
            .outputTypeStrategy(callContext -> Optional.of(DataTypes.BIGINT()))
            .build()
            ;
}

四、结语

本文主要简单分享了一些自身对Flink类型及序列化的认识和应用实践,能力有限,不足之处欢迎指正。

引用:
https://nightlies.apache.org/flink/flink-docs-release-1.13/

*文/ 木木

本文属得物技术原创,更多精彩文章请看:得物技术

未经得物技术许可严禁转载,否则依法追究法律责任!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1867803.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

彩虹PLM系统:引领汽车行业的数字化转型

彩虹PLM系统&#xff1a;引领汽车行业的数字化转型 彩虹PLM系统作为汽车行业数字化转型的引领者&#xff0c;凭借其卓越的技术实力和丰富的行业经验&#xff0c;为汽车行业带来了全面的解决方案。以下是彩虹PLM系统如何引领汽车行业数字化转型的详细分析&#xff1a; 一、整合全…

Redis 7.x 系列【7】数据类型之列表(List)

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 常用命令2.1 RPUSH2.2 LPUSH2.3 LRANGE2.4 LINDEX2.6 LREM2.7 LLEN2.8 LPOP…

8.计算机视觉—增广和迁移

目录 1.数据增广数据增强数据增强的操作代码实现2.微调 迁移学习 Transfer learning(重要的技术)网络结构微调:当目标数据集比源数据集小得多时,微调有助于提高模型的泛化能力。训练固定一些层总结代码实现1.数据增广 CES上的真实故事 有一家做智能售货机的公司,发现他们…

glif: 爆火 MeMe 生成器

glif 是一个 Agent 搭建平台&#xff0c;有人用其搭建了一个 MeMe 生成器, 短时间内已经有 280k 次使用&#xff01; 如图所示&#xff0c;glif 最大的特点是有一个实验性支持的 Canvas 节点&#xff0c;可以将生成的内容任意布局输出&#xff0c;提升了可控性。

7.计算机视觉—硬件和训练

目录 1.深度学习硬件:CPU和GPUCPU内存结构提升CPU利用率提升GPU利用率CPU与GPU牌子CPU/GPU高性能计算编程2.深度学习硬件:TPU和其他DSPFPGAAI ASIC总结3.单机多卡并行:多GPU数据并行VS模型并行总结4.多GPU训练代码实现数据同步数据分发训练多GPU的简洁实现5.分布式训练1.深度…

ICRA 2024 基于transformer大模型实现机器人自主导航

在陌生环境中进行导航的机器人需要提供决策&#xff1a;面向任务的导航&#xff08;到达设定好机器人目标点&#xff09;&#xff0c;以及 与任务无关的探索&#xff08;在新颖的环境中寻找目标&#xff09;。通常&#xff0c;这些角色由单独的模型处理&#xff0c;例如通过使用…

“北京到底有谁在啊”影视APP开发,解锁最简单的快乐

随着电视剧《玫瑰的故事》在腾讯视频APP热播&#xff0c;APP也增加了很多热度&#xff0c;一款丰富的影视APP&#xff0c;无论是热门大片、经典影视剧、还是最新综艺节目&#xff0c;能畅享无限精彩的影视内容&#xff01; 开发影视APP&#xff0c;需要专业的技术服务商来解决…

DM达梦数据库转换、条件函数整理

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

【分布式系列】分布式锁在 Redis 主从部署中的挑战

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

ABAP ALV报表性能优化 经验总结

优化ALV报表&#xff0c;最主要就是优化取数逻辑和数据库查询。因为几乎在所有的程序中都会用到数据库查询&#xff0c;所以这篇文章的内容也不仅局限于SAP、ABAP程序&#xff0c;虽然ABAP有其特殊之处。 优化的时候我遵从以下几个原则&#xff1a; 1.把数据库连接视为一种极其…

Nginx常见的基本配置(全网最详细!!!)

&#x1fa81;Nginx常用命令 &#x1f3a8;Nginx正向代理 &#x1f94f;Nginx反向代理 &#x1f52e;Nginx负载均衡 &#x1f381;Nginx基本配置详解 ## # 全局配置 ##user www-data; ## 配置 worker 进程的用户和组 worker_processes auto; ## 配置 worke…

最新版Git安装指南使用指南

首先&#xff0c;访问Git的官方网站https://git-scm.com下载适用于您操作系统的安装包。您也可以选择使用阿里云镜像来加速下载过程。 也可以用国内地址下载https://pan.quark.cn/s/0293d76e58bchttps://pan.quark.cn/s/0293d76e58bc安装过程 在这里插入图片描述 2、点击“…

前缀和算法系列|概念讲解|应用场景|大量例题讲解

大家好,我是LvZi,今天带来前缀和算法系列|概念讲解|应用场景|大量例题讲解 一.模版解析 1.一维前缀和 一维前缀和就是一个简单的dp问题 状态表示:dp[i]:以i位置为结尾的所有元素的和状态转移方程:dp[i] dp[i - 1] arr[i] 链接:一维前缀和(模版题) 代码: import java.ut…

短信群发70字计费标准及内容编辑规范详解

短信群发营销&#xff0c;作为一种简单高效的推广方式&#xff0c;备受企业和个人青睐。然而&#xff0c;在启动短信群发营销前&#xff0c;了解相关常识同样重要。 一、短信字数限制与计费标准 短信群发时&#xff0c;字数限制是一个关键因素。每个汉字、字母、数字、符号均计…

虚拟机字节码执行引擎之运行时栈帧结构

概述 执行引擎是Java虚拟机核心的组成部分之一。“虚拟机”是一个相对于“物理机”的概念,这两种机器都有代码执行能力,其区别是物理机的执行引擎是直接建立在处理器、缓存、指令集和操作系统层面上的,而虚拟机的执行引擎则是由软件自行实现的,因此可以不受物理条件制约地定…

艺术与科技的精湛融合:探讨AI绘画与AI动画的交汇点

前言 艺术与科技的精湛融合&#xff1a;探讨AI绘画与AI动画的交汇点 在当代社会中&#xff0c;艺术和科技的结合呈现出了从来灭有的创新和可能性。随着人工智能技术的不断发展&#xff0c;AI绘画与AI动画的融合愈发引人瞩目。这一融合不仅给艺术家们带来了更多创作的可能&…

【项目实训】数据库内容丰富

经团队讨论&#xff0c;对前端页面展示数据进行了增加&#xff0c;于是相应的修改数据库 经团队成员使用大模型对各公司面试经验中问题的总结优化&#xff0c;我们打算将大模型的回答存储到数据库中&#xff0c;以显示在前端页面 于是在数据库中存储大模型的回答&#xff1a;…

什么是代码签名证书?有什么作用?

代码签名证书是一种特殊的数字证书&#xff0c;主要用于软件发布领域&#xff0c;旨在确保软件代码的完整性和来源的真实性。它是由可信赖的证书颁发机构&#xff08;CA&#xff09;签发的&#xff0c;为软件开发者提供了一种方式来证明其身份&#xff0c;并保证软件在发布后未…

科技未来·无限可能“2024世亚智博会”

随着科技的飞速发展&#xff0c;人类社会正以前所未有的速度迈向一个全新的时代。科学技术作为第一生产力&#xff0c;不仅极大地推动了经济和社会的发展&#xff0c;更在不断地改变着我们的生活方式和思维方式。特别是在人工智能、物联网等前沿科技领域&#xff0c;其创新和应…

酷开系统丨开启家庭智能教育让学习成为一种乐趣

在数字化时代&#xff0c;孩子们接触的信息日益增多&#xff0c;而酷开系统洞察到了家长对孩子成长环境的关切。酷开系统&#xff0c;作为家庭娱乐与教育的融合平台&#xff0c;不仅注重提供丰富的教育资源&#xff0c;更致力于创造一个健康、有益的学习和娱乐环境。 在酷开系…