Flink从入门到放弃—Stream API—clean()方法

news2025/1/27 12:04:05

文章目录

  • 导航
  • clean()
  • 案例
    • 贴上源码

导航

涉及到文章:
Flink从入门到放弃—Stream API—Join实现(即多流操作)
Flink从入门到放弃—Stream API—常用算子(map和flatMap)
Flink从入门到放弃—Stream API—常用算子(filter 和 keyBy)
Flink从入门到放弃—Stream API—常用算子(reduce
Flink从入门到放弃—Stream API—常用算子(window和windowAll)

在咱们看上面文章的时候,看到的那些算子源码里面总会用到clean(),业界称为闭包清除。

大家都知道Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的,很多时候大家比较喜欢直接用匿名内部类实现算子,而匿名内部类就会带来闭包问题,当匿名内部类引用的外部对象没有实现序列化接口时,就会导致内部类无法被序列化,因此Flink框架底层必须做好清除工作,

如下源码所示:

//map
 public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
        return this.map(mapper, outType);
    }

//flatmap
 public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
        return this.flatMap(flatMapper, outType);
    }

太多了,每一个算子在追源码的时候都会出现好几次,既然出现了这么多次,那就很感兴趣到底这个clean操作会影响什么呢?

我从addSource源码追到sink源码clean()方法都有被用到,值得我们去看一下源码是怎么实现的。

clean()

同一包的clean方法,这个限制了使用范围

protected <F> F clean(F f) {
        return this.getExecutionEnvironment().clean(f);
    }

直接调用到环境类,这个clean方法,几乎所有的算子都是从这里调用clean方法的,当然除了一些框架本身的调用哈

// 返回给定函数的“闭包清理”版本。仅在{org.apache.flink.api.common.ExecutionConfig}中未禁用闭包清理时才清除。
  @Internal
    public <F> F clean(F f) {
    // 闭包清除禁用标识
        if (this.getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(f, this.getConfig().getClosureCleanerLevel(), true);
        }

	// 再次确保对象可序列化,防止上一步执行无效
	// 确认序列化失败会抛出异常:对象不能被序列化
        ClosureCleaner.ensureSerializable(f);
        return f;
    }

看下这个闭包清除标识逻辑

this.getConfig().isClosureCleanerEnabled()// Returns whether the ClosureCleaner is enabled.
public boolean isClosureCleanerEnabled() {
        return !(closureCleanerLevel == ClosureCleanerLevel.NONE);
    }

ClosureCleanerLevel 枚举类型,默认是RECURSIVE,根据text里的内容大概可以了解到这个三个枚举的意思。

public enum ClosureCleanerLevel implements DescribedEnum {
        NONE(text("Disables the closure cleaner completely.")),
        TOP_LEVEL(text("Cleans only the top-level class without recursing into fields.")),
        RECURSIVE(text("Cleans all fields recursively."));
    }

然后继续深入clean()方法

// 追到ClosureCleaner闭包类,重载了一下,没有卵用,直接看下面的重载方法
 public static void clean(Object func, ClosureCleanerLevel level, boolean checkSerializable) {
        clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap()));
    }

这才是正主,不会再嵌套下去了,但是会根据清理闭包枚举值ClosureCleanerLevel 递归调用clean()

private static void clean(
            Object func,
            ExecutionConfig.ClosureCleanerLevel level,
            boolean checkSerializable,
            Set<Object> visited) {
            // 判空处理,跳出本方法之后,进入到确认是否可以序列化逻辑,异常则抛出
        if (func == null) {
            return;
        }
		// 序列化对象集合,递归的时候会将子对象也加入到Set
        if (!visited.add(func)) {
            return;
        }
		// 利用类反射机制,通过调用运行时类的生成对象实例
        final Class<?> cls = func.getClass();
		// 是否是原始类型和包装类型,是就直接返回
        if (ClassUtils.isPrimitiveOrWrapper(cls)) {
            return;
        }
		// 跳过使用自定义序列化方法的类
        if (usesCustomSerialization(cls)) {
            return;
        }

        // First find the field name of the "this$0" field, this can
        // be "this$x" depending on the nesting
        boolean closureAccessed = false;
		// 遍历cls所包含的字段信息
        for (Field f : cls.getDeclaredFields()) {
            if (f.getName().startsWith("this$")) {
                // found a closure referencing field - now try to clean
                // 翻译上面英文:发现一个闭包引用字段-现在尝试清理
                closureAccessed |= cleanThis0(func, cls, f.getName());
            } else {
                Object fieldObject;
                try {
                    f.setAccessible(true);
                    fieldObject = f.get(func);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(
                            String.format(
                                    "Can not access to the %s field in Class %s",
                                    f.getName(), func.getClass()));
                }

                /*
                 * we should do a deep clean when we encounter an anonymous class, inner class and local class, but should skip the class with custom serialize method.
                 * 翻译:当遇到匿名类、内部类和局部类时,我们应该做一个深度清理,但应该跳过使用自定义序列化方法的类。
                 * There are five kinds of classes (or interfaces):
                 * a) Top level classes(顶级类)
                 * b) Nested classes (static member classes 静态成员类)
                 * c) Inner classes (non-static member classes 内部类(非静态成员类))
                 * d) Local classes (named classes declared within a method) 局部类(在方法中声明的命名类)
                 * e) Anonymous classes(匿名类)
                 */
                if (level == ExecutionConfig.ClosureCleanerLevel.RECURSIVE
                        && needsRecursion(f, fieldObject)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
                    }
					// 深度清理
                    clean(
                            fieldObject,
                            ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
                            true,
                            visited);
                }
            }
        }
		// 序列化标识
        if (checkSerializable) {
            try {
            // 直接序列化和ensureSerializable(Object obj)是一个底层实现方法
                InstantiationUtil.serializeObject(func);
            } catch (Exception e) {
                String functionType = getSuperClassOrInterfaceName(func.getClass());

                String msg =
                        functionType == null
                                ? (func + " is not serializable.")
                                : ("The implementation of the "
                                        + functionType
                                        + " is not serializable.");

                if (closureAccessed) {
                    msg +=
                            " The implementation accesses fields of its enclosing class, which is "
                                    + "a common reason for non-serializability. "
                                    + "A common solution is to make the function a proper (non-inner) class, or "
                                    + "a static inner class.";
                } else {
                    msg += " The object probably contains or references non serializable fields.";
                }

                throw new InvalidProgramException(msg, e);
            }
        }
    }

cleanThis0这个私有方法是真正的闭包清除逻辑

private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {

        This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
        getClassReader(cls).accept(this0Finder, 0);

        final boolean accessesClosure = this0Finder.isThis0Accessed();

        if (LOG.isDebugEnabled()) {
            LOG.debug(this0Name + " is accessed: " + accessesClosure);
        }

        if (!accessesClosure) {
            Field this0;
            try {
                this0 = func.getClass().getDeclaredField(this0Name);
            } catch (NoSuchFieldException e) {
                // has no this$0, just return
                throw new RuntimeException("Could not set " + this0Name + ": " + e);
            }

            try {
            
                this0.setAccessible(true);
            	// 真正的清理动作
                this0.set(func, null);
            } catch (Exception e) {
                // should not happen, since we use setAccessible
                throw new RuntimeException(
                        "Could not set " + this0Name + " to null. " + e.getMessage(), e);
            }
        }

        return accessesClosure;
    }

案例

光讲源码太枯燥,通过源码咱们能知道闭包清除逻辑,但是如果不执行这个clean动作会发生什么事情呢?咱们通过一个案例尝试理解一下。

Flink任务常使用内部类来完成业务逻辑开发,在编译代码的时候,默认内部类会持有一个外部对象的引用。如果外部对象没有实现序列化接口,序列化内部类对象就会失败。clean()方法就是将内部类指向外部类的引用设置为null,确保序列化过程的成功。在clean()方法中首先调用ClosureCleaner.clean()方法,然后再调用ClosureCleaner.ensureSerializable(f);

上面这句话援引自: https://blog.csdn.net/qq_20064763/article/details/116857794

贴上源码

package com.happy.core.clean.entity;

import java.io.Serializable;

/**
 * @author DeveloperZJQ
 * @since 2022-12-14
 */
public class Human implements Serializable{

    private String name;
    private String gender;
    private Integer age;
    private IdentityInfo identityInfo;


    public class IdentityInfo implements Serializable {
        private String idCard;
        private Student.Clazz clazz = new Student().new Clazz();
    }
}

package com.happy.core.clean.entity;

import java.io.Serializable;

/**
 * @author DeveloperZJQ
 * @since 2022-12-14
 */
public class Student implements Serializable{
    class Clazz implements Serializable {

    }
}

package com.happy.core.clean;

import com.happy.core.clean.entity.Human;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;

/**
 * @author DeveloperZJQ
 * @link org.apache.flink.api.java.ClosureCleaner
 * @since 2022-12-14
 */
public class LearnClosureCleanerClient {
    public static void main(String[] args) {
        Human human = new Human();
        Human.IdentityInfo identityInfo = human.new IdentityInfo();
//        ClosureCleaner.clean(identityInfo, ExecutionConfig.ClosureCleanerLevel.NONE, false);
        ClosureCleaner.ensureSerializable(identityInfo);

    }
}
  • 场景1:全部类都实现序列化接口,不用clean操作
    该场景,直接复制上面代码运行即可看到是正常执行。

  • 场景2:Human 类没实现序列化接口,子类实现序列化接口的时候,需要clean操作
    去掉Human的实现序列化接口操作,在没添加clean操作的前提下,会报下面的错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.happy.core.clean.entity.Human$IdentityInfo@458c1321 is not serializable
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:195)
	at com.happy.core.clean.LearnClosureCleanerClient.main(LearnClosureCleanerClient.java:17)
Caused by: java.io.NotSerializableException: com.happy.core.clean.entity.Human
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
	at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
	at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:193)
	... 1 more

Process finished with exit code 1

当添加了clean操作之后,执行正常,代码如下:
在这里插入图片描述

通过idea打下断点debug下究其原因:
在这里插入图片描述

很显然,在调用ClosureCleaner.ensureSerializable(identityInfo);序列化identityInfo对象出现了问题。是不是很意外,为什么IdentityInfo类明明实现了Serializable接口,为什么还会序列化失败呢?很显然,identityInfo对象中包含了某些导致序列化失败。通过debug方式我们可以知道identityInfo对象实际上包含了this$0变量,并且该值指向外部引用对象Human@738。
在这里插入图片描述
通过上图可以看到报错了,Human类并没有实现Serializable,显然,序列化失败的原因找到了,也正如异常栈所说的Caused by: java.io.NotSerializableException: com.happy.core.clean.entity.Human。

再次加上ClosureCleaner.clean(identityInfo, ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL, false);语句后,再次执行,顺利执行没有报错了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/91951.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

讲解Redis的主从复制

Redis 主从复制1 主从复制2 作用3 主从复制架构图4 搭建主从复制5 使用规则1 主从复制 主从复制架构仅仅用来解决数据的冗余备份,从节点仅仅用来同步数据 无法解决: 1.master节点出现故障的自动故障转移 主从复制&#xff0c;是指将一台Redis服务器的数据&#xff0c;复制到…

数据挖掘的步骤有哪些?

所谓数据挖掘就是从海量的数据中&#xff0c;找到隐藏在数据里有价值的信息。因为这个数据是隐式的&#xff0c;因此想要挖掘出来并不简单。那么&#xff0c;如何进行数据挖掘呢&#xff1f;数据挖掘的步骤有哪些呢&#xff1f;一般来讲&#xff0c;数据挖掘需要经历数据收集、…

openEuler 倡议建立 eBPF 软件发布标准

eBPF 是一个能够在内核运行沙箱程序的技术&#xff0c;提供了一种在内核事件和用户程序事件发生时安全注入代码的机制&#xff0c;使得非内核开发人员也可以对内核进行控制。随着内核的发展&#xff0c;eBPF 逐步从最初的数据包过滤扩展到了网络、内核、安全、跟踪等&#xff0…

软件测试之测试用例评审

一、评审目的 一般来说&#xff0c;参加测试用例评审的人员包括对应项目的产品人员、设计人员、开发人员和测试人员。 图1-1 测试用例评审相关人员 测试用例评审会议的发起者一般是测试人员&#xff0c;既然我们是发起者&#xff0c;那我们发起这个会议的目的是什么呢&#x…

【Java开发】 Spring 11 :Spring Boot 配置 Web 运行参数并部署至服务器

Spring Boot 项目开发结束后的工作便是运维&#xff0c;简单来说需要配置 Web 运行参数和项目部署两大工作&#xff0c;本文将尽可能详细地给大家讲全&#xff01; 目录 1 定制 Web 容器运行参数 1.1 运行参数介绍 1.2 项目搭建 ① 通过 IDEA Spring Initializr 创建项目 …

使用nginx代理服务器上的docker容器接口

假如项目通过自动化部署在了docker容器里面&#xff1a;教程在这里vue使用dockernodenginxlinux自动化部署_1024小神的博客-CSDN博客 怎么将nginx通过域名绑定到这个容器呢&#xff1f; 例如我们将一个二级域名绑定second.1024shen.com/#/ 到我们的服务 我们需要先在域名解析…

阿里P8专家整理的面试秘籍,我看了三个月,成功入职京东,税前30K

入职京东&#xff0c;月薪30K 今年三月份的时候&#xff0c;我从上一家外包公司裸辞。那时正值疫情期间&#xff0c;实在受不了公司的压迫&#xff0c;毅然决然的选择了离职。 卧薪尝胆三个月后&#xff0c;才收到京东的offer&#xff0c;谈到了30k的月薪顺利入职。 本来那时…

DPDK源码分析之l2fwd

什么是L2转发 2层转发&#xff0c;即对应OSI模型中的数据链路层&#xff0c;该层以Mac帧进行传输&#xff0c;运行在2层的比较有代表性的设备就是交换机了。 当交换机收到数据时&#xff0c;它会检查它的目的MAC地址&#xff0c;然后把数据从目的主机所在的接口转发出去。 交…

SSM 学习管理系统

SSM 学习管理系统 SSM 学习管理系统 功能介绍 首页 图片轮播展示 网站公告 学生注册 教师注册 课程资料 视频学习 友情链接 资料详情 学习进度 评论 收藏 后台管理 登录 管理员管理 修改密码 网站公告管理 友情链接管理 轮播图管理 学生管理 班级管理 我的班级管理 教师管理…

微信小程序 | 小程序的内置组件

&#x1f5a5;️ 微信小程序专栏&#xff1a;微信小程序 | 小程序的内置组件 &#x1f9d1;‍&#x1f4bc; 个人简介&#xff1a;一个不甘平庸的平凡人&#x1f36c; ✨ 个人主页&#xff1a;CoderHing的个人主页 &#x1f340; 格言: ☀️ 路漫漫其修远兮,吾将上下而求索☀️…

【轻量级开源ROS 的机器人设备(4)】--(3)通信实现

前文链接 【轻量级开源ROS 的机器人设备&#xff08;4&#xff09;】--&#xff08;2&#xff09;通信实现_无水先生的博客-CSDN博客 六、数据流 数据流 虽然 XML-RPC 为远程方法调用提供了一种简单而干净的协议&#xff0c;但其冗长和以文本为中心的编码使其不适合高带宽和低…

手把手分享:如何将小程序游戏引入自有APP?(Android篇)

上一期的为大家分享了&#xff1a;如何在iOS中引入FinClip SDK&#xff0c;并将小程序游戏运行到自有App 中。点击查看&#xff1a;&#x1f449;手把手系列&#xff1a;如何将小程序游戏引入自有APP&#xff1f;&#xff08;iOS篇&#xff09; 本周继续分享如何在Android系统…

小红书如何做推广增粉?怎样让小红书快速增加粉丝?

互联网时代&#xff0c;任何什么平台的推广都需要流量&#xff0c;没有流量的账号是做不起来的&#xff0c;也就没有宣传或是转化的效果。 小红书账号粉丝数量和权重是成正比的&#xff0c;涨粉越多的账号&#xff0c;说明越受到用户的喜欢&#xff0c;账号笔记的数据就会越好…

5G“新引擎”,助力矿山向无人化、智慧化转型

导语 | 5G 商用已过去三年&#xff0c;其发展已步入规模化应用的关键期。无论是在诸如矿山、港口远程驾驶的行业应用领域&#xff0c;还是在面向C端的智能汽车、自动驾驶方面&#xff0c;都得到了广泛应用。今天&#xff0c;我们特邀了三一智矿的董事长、腾讯云 TVP 行业大使马…

java面试强基(23)

什么是线程死锁?如何避免死锁? ​ 线程死锁描述的是这样一种情况&#xff1a;多个线程同时被阻塞&#xff0c;它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞&#xff0c;因此程序不可能正常终止。 上面的例子符合产生死锁的四个必要条件&#xff1…

提高短视频的收藏下载和转发率的方法,我们可以不做但不能不会

想要提高收藏和下载&#xff0c;那就要提高你内容的价值感和获得感。 我们要清晰的知道收获感不等同于真正的收获。那我们的做法就是给出冗余的有用&#xff0c;给出熟悉的陌生&#xff0c;给出精准的表达。那这是提高收藏转发的方法。 我们看到好看的视频&#xff0c;想分享视…

Hack the box -- Responder靶机

这个靶机记录一下。。感觉会用到。 任务1 当使用IP地址访问web服务时&#xff0c;我们被重定向到的域是什么? 这里我们访问一下ip&#xff0c;然后重定向了 FLAG:unika.htb 任务2 服务器上使用哪种脚本语言生成网页? 这里因为重定向域的问题&#xff0c;我们是访问不了的会报…

408 | 【2022年】计算机统考真题 自用回顾知识点整理

一、数据结构 T1:时间复杂度 ——直接求程序执行的次数 T5:哈夫曼树(最优二叉树)与哈夫曼编码 定义 结点带权路径长度:从根到任一节点的路径长度(经过的边数)与该结点权值的乘积树的带权路径长度WPL:所有叶节点的带权路径长度之和 哈夫曼树:WPL最小的二叉树哈夫曼树的…

PHPMailer发送邮件(PHP发送电子邮件)

很多网站注册时都会要求输入电子邮箱&#xff0c;其应用场景是比较广的&#xff0c;例如注册账号接收验证码、注册成功通知、登录通知、找回密码验证通知等。本文将介绍如何使用PHP实现发送邮件。 开源项目PHPMailer 使用了开源项目PHPMailer&#xff0c;本文使用163邮箱作为…

挑战一天速通python基础语法

挑战一天速通python基础语法 文章目录挑战一天速通python基础语法0. 防止某人健忘1. 一些小点2. 输入输出2.1 输出2.2 输入3. 一些基础语法3.1 条件语句3.2 循环语句3.3 空语句4. 函数5. 列表和元组5.1 列表5.2 元组6. 字典7. 文件操作再有一个月参加一个NTU的线上科研项目。。…