文章目录
- 导航
- clean()
- 案例
- 贴上源码
导航
涉及到文章:
Flink从入门到放弃—Stream API—Join实现(即多流操作)
Flink从入门到放弃—Stream API—常用算子(map和flatMap)
Flink从入门到放弃—Stream API—常用算子(filter 和 keyBy)
Flink从入门到放弃—Stream API—常用算子(reduce
Flink从入门到放弃—Stream API—常用算子(window和windowAll)
在咱们看上面文章的时候,看到的那些算子源码里面总会用到clean(),业界称为闭包清除。
大家都知道Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的,很多时候大家比较喜欢直接用匿名内部类实现算子,而匿名内部类就会带来闭包问题,当匿名内部类引用的外部对象没有实现序列化接口时,就会导致内部类无法被序列化,因此Flink框架底层必须做好清除工作,
如下源码所示:
//map
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
return this.map(mapper, outType);
}
//flatmap
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
return this.flatMap(flatMapper, outType);
}
太多了,每一个算子在追源码的时候都会出现好几次,既然出现了这么多次,那就很感兴趣到底这个clean操作会影响什么呢?
我从addSource源码追到sink源码clean()方法都有被用到,值得我们去看一下源码是怎么实现的。
clean()
同一包的clean方法,这个限制了使用范围
protected <F> F clean(F f) {
return this.getExecutionEnvironment().clean(f);
}
直接调用到环境类,这个clean方法,几乎所有的算子都是从这里调用clean方法的,当然除了一些框架本身的调用哈
// 返回给定函数的“闭包清理”版本。仅在{org.apache.flink.api.common.ExecutionConfig}中未禁用闭包清理时才清除。
@Internal
public <F> F clean(F f) {
// 闭包清除禁用标识
if (this.getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, this.getConfig().getClosureCleanerLevel(), true);
}
// 再次确保对象可序列化,防止上一步执行无效
// 确认序列化失败会抛出异常:对象不能被序列化
ClosureCleaner.ensureSerializable(f);
return f;
}
看下这个闭包清除标识逻辑
this.getConfig().isClosureCleanerEnabled();
// Returns whether the ClosureCleaner is enabled.
public boolean isClosureCleanerEnabled() {
return !(closureCleanerLevel == ClosureCleanerLevel.NONE);
}
ClosureCleanerLevel 枚举类型,默认是RECURSIVE,根据text里的内容大概可以了解到这个三个枚举的意思。
public enum ClosureCleanerLevel implements DescribedEnum {
NONE(text("Disables the closure cleaner completely.")),
TOP_LEVEL(text("Cleans only the top-level class without recursing into fields.")),
RECURSIVE(text("Cleans all fields recursively."));
}
然后继续深入clean()方法
// 追到ClosureCleaner闭包类,重载了一下,没有卵用,直接看下面的重载方法
public static void clean(Object func, ClosureCleanerLevel level, boolean checkSerializable) {
clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap()));
}
这才是正主,不会再嵌套下去了,但是会根据清理闭包枚举值ClosureCleanerLevel 递归调用clean()
private static void clean(
Object func,
ExecutionConfig.ClosureCleanerLevel level,
boolean checkSerializable,
Set<Object> visited) {
// 判空处理,跳出本方法之后,进入到确认是否可以序列化逻辑,异常则抛出
if (func == null) {
return;
}
// 序列化对象集合,递归的时候会将子对象也加入到Set
if (!visited.add(func)) {
return;
}
// 利用类反射机制,通过调用运行时类的生成对象实例
final Class<?> cls = func.getClass();
// 是否是原始类型和包装类型,是就直接返回
if (ClassUtils.isPrimitiveOrWrapper(cls)) {
return;
}
// 跳过使用自定义序列化方法的类
if (usesCustomSerialization(cls)) {
return;
}
// First find the field name of the "this$0" field, this can
// be "this$x" depending on the nesting
boolean closureAccessed = false;
// 遍历cls所包含的字段信息
for (Field f : cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
// found a closure referencing field - now try to clean
// 翻译上面英文:发现一个闭包引用字段-现在尝试清理
closureAccessed |= cleanThis0(func, cls, f.getName());
} else {
Object fieldObject;
try {
f.setAccessible(true);
fieldObject = f.get(func);
} catch (IllegalAccessException e) {
throw new RuntimeException(
String.format(
"Can not access to the %s field in Class %s",
f.getName(), func.getClass()));
}
/*
* we should do a deep clean when we encounter an anonymous class, inner class and local class, but should skip the class with custom serialize method.
* 翻译:当遇到匿名类、内部类和局部类时,我们应该做一个深度清理,但应该跳过使用自定义序列化方法的类。
* There are five kinds of classes (or interfaces):
* a) Top level classes(顶级类)
* b) Nested classes (static member classes 静态成员类)
* c) Inner classes (non-static member classes 内部类(非静态成员类))
* d) Local classes (named classes declared within a method) 局部类(在方法中声明的命名类)
* e) Anonymous classes(匿名类)
*/
if (level == ExecutionConfig.ClosureCleanerLevel.RECURSIVE
&& needsRecursion(f, fieldObject)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
}
// 深度清理
clean(
fieldObject,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
true,
visited);
}
}
}
// 序列化标识
if (checkSerializable) {
try {
// 直接序列化和ensureSerializable(Object obj)是一个底层实现方法
InstantiationUtil.serializeObject(func);
} catch (Exception e) {
String functionType = getSuperClassOrInterfaceName(func.getClass());
String msg =
functionType == null
? (func + " is not serializable.")
: ("The implementation of the "
+ functionType
+ " is not serializable.");
if (closureAccessed) {
msg +=
" The implementation accesses fields of its enclosing class, which is "
+ "a common reason for non-serializability. "
+ "A common solution is to make the function a proper (non-inner) class, or "
+ "a static inner class.";
} else {
msg += " The object probably contains or references non serializable fields.";
}
throw new InvalidProgramException(msg, e);
}
}
}
cleanThis0这个私有方法是真正的闭包清除逻辑
private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
getClassReader(cls).accept(this0Finder, 0);
final boolean accessesClosure = this0Finder.isThis0Accessed();
if (LOG.isDebugEnabled()) {
LOG.debug(this0Name + " is accessed: " + accessesClosure);
}
if (!accessesClosure) {
Field this0;
try {
this0 = func.getClass().getDeclaredField(this0Name);
} catch (NoSuchFieldException e) {
// has no this$0, just return
throw new RuntimeException("Could not set " + this0Name + ": " + e);
}
try {
this0.setAccessible(true);
// 真正的清理动作
this0.set(func, null);
} catch (Exception e) {
// should not happen, since we use setAccessible
throw new RuntimeException(
"Could not set " + this0Name + " to null. " + e.getMessage(), e);
}
}
return accessesClosure;
}
案例
光讲源码太枯燥,通过源码咱们能知道闭包清除逻辑,但是如果不执行这个clean动作会发生什么事情呢?咱们通过一个案例尝试理解一下。
Flink任务常使用内部类来完成业务逻辑开发,在编译代码的时候,默认内部类会持有一个外部对象的引用。如果外部对象没有实现序列化接口,序列化内部类对象就会失败。clean()方法就是将内部类指向外部类的引用设置为null,确保序列化过程的成功。在clean()方法中首先调用ClosureCleaner.clean()方法,然后再调用ClosureCleaner.ensureSerializable(f);
上面这句话援引自: https://blog.csdn.net/qq_20064763/article/details/116857794
贴上源码
package com.happy.core.clean.entity;
import java.io.Serializable;
/**
* @author DeveloperZJQ
* @since 2022-12-14
*/
public class Human implements Serializable{
private String name;
private String gender;
private Integer age;
private IdentityInfo identityInfo;
public class IdentityInfo implements Serializable {
private String idCard;
private Student.Clazz clazz = new Student().new Clazz();
}
}
package com.happy.core.clean.entity;
import java.io.Serializable;
/**
* @author DeveloperZJQ
* @since 2022-12-14
*/
public class Student implements Serializable{
class Clazz implements Serializable {
}
}
package com.happy.core.clean;
import com.happy.core.clean.entity.Human;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
/**
* @author DeveloperZJQ
* @link org.apache.flink.api.java.ClosureCleaner
* @since 2022-12-14
*/
public class LearnClosureCleanerClient {
public static void main(String[] args) {
Human human = new Human();
Human.IdentityInfo identityInfo = human.new IdentityInfo();
// ClosureCleaner.clean(identityInfo, ExecutionConfig.ClosureCleanerLevel.NONE, false);
ClosureCleaner.ensureSerializable(identityInfo);
}
}
-
场景1:全部类都实现序列化接口,不用clean操作
该场景,直接复制上面代码运行即可看到是正常执行。 -
场景2:Human 类没实现序列化接口,子类实现序列化接口的时候,需要clean操作
去掉Human的实现序列化接口操作,在没添加clean操作的前提下,会报下面的错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.happy.core.clean.entity.Human$IdentityInfo@458c1321 is not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:195)
at com.happy.core.clean.LearnClosureCleanerClient.main(LearnClosureCleanerClient.java:17)
Caused by: java.io.NotSerializableException: com.happy.core.clean.entity.Human
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:193)
... 1 more
Process finished with exit code 1
当添加了clean操作之后,执行正常,代码如下:
通过idea打下断点debug下究其原因:
很显然,在调用ClosureCleaner.ensureSerializable(identityInfo);序列化identityInfo对象出现了问题。是不是很意外,为什么IdentityInfo类明明实现了Serializable接口,为什么还会序列化失败呢?很显然,identityInfo对象中包含了某些导致序列化失败。通过debug方式我们可以知道identityInfo对象实际上包含了this$0变量,并且该值指向外部引用对象Human@738。
通过上图可以看到报错了,Human类并没有实现Serializable,显然,序列化失败的原因找到了,也正如异常栈所说的Caused by: java.io.NotSerializableException: com.happy.core.clean.entity.Human。
再次加上ClosureCleaner.clean(identityInfo, ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL, false);语句后,再次执行,顺利执行没有报错了。