错误展示:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(CollectionDemo.java:33)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:543)
at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:192)
at org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:117)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:292)
at com.example.bigdata.flink.dataStreamApi.source.CollectionDemo.main(CollectionDemo.java:38)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:560)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:177)
at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:611)
at com.example.bigdata.flink.dataStreamApi.source.CollectionDemo.main(CollectionDemo.java:33)
错误原因
The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
// 缺少“Collector”的泛型类型参数。在许多情况下,当涉及Java泛型时,lambda方法不能为自动类型提取提供足够的信息
原因描述
java 8 在使用Java API 写 Lambda 的时候,JVM 运行时会擦除类型(泛型类型)
Flink 无法准确获取到数据类型,此时就需要我们手动指定类型
处理方案
source.flatMap(()->{
...
//手动指定类型
},Types.类型)
- 案例:
SingleOutputStreamOperator<Tuple2<Object, Integer>> map = flatMap
.map(word -> Tuple2.of(word, 1)
,Types.TUPLE(Types.STRING, Types.INT));