解决SeaTunnel 2.3.4版本写入S3文件报错问题

news2024/12/24 0:34:11

在使用Apache SeaTunnel时,我遇到了一个写入S3文件的报错问题。通过深入调试和分析,找到了问题所在,并提出了相应的解决方案。 file

本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向,希望对大家有帮助!

一、详细报错

2024-04-12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

二、参考资料

  • HADOOP-16027:https://issues.apache.org/jira/browse/HADOOP-16027
  • CSDN Blog:https://blog.csdn.net/a18262285324/article/details/112470363
  • AWS SDK Java Issue #2337:https://github.com/aws/aws-sdk-java/issues/2337
  • Amazon SQS Java Messaging Lib Issue #96:https://github.com/awslabs/amazon-sqs-java-messaging-lib/issues/96
  • 博客园:https://www.cnblogs.com/xhy-shine/p/10772736.html

三、解决思路

1. 远程调试

在本地IDEA中进行debug未发现报错,但在服务器上执行时却报错,因此决定进行远程debug。执行以下命令添加JVM参数:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

实际命令是:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. 定位问题

通过调试发现问题出在hadoop-aws使用的缓存连接池对象。关键在于if判断部分,如果上游传递了fs.s3a.impl.disable.cache=true,则不使用缓存。深入debug发现:有时hadoopConf.getSchema获取的不是s3a而是s3n

s3和s3n / s3a的区别

  • s3:基于块的文件系统
  • s3n:基于对象存储的文件系统,支持高达5GB的对象
  • s3a:基于对象存储的文件系统,支持高达5TB的对象,并具有更高的性能

在配置文件中设置的是s3a,但实际获取到的是s3n,这显然不合理。

3. 深入挖掘

我仔细看了一下报错的截图发现:

file

确实是commit期间报的错:那么也就是说commit初始化s3conf并没有走buildWithConfig方法,而是用的默认值,而且我根本没找到commit里面有new s3Conf的代码,再次debug看看谁去重新初始化了S3Conf

file

定位到这里就很头疼了,已经涉及到引擎层而非插件层面了,涉及到classloader的使用以及反序列化操作:

file

反序列化代码:

        logicalDag =
                CustomClassLoadedObject.deserializeWithCustomClassLoader(
                        nodeEngine.getSerializationService(),
                        classLoader,
                        jobImmutableInformation.getLogicalDag());

很明显可以看出,S3Conf(静态类)被重新初始化了,导致SHEMA被重新赋值成s3n

file

因为s3conf它本身的属性都是静态的,而对classloader反序列化是时会重新加载静态属性的,所以导致shema被重新赋值为默认s3n

综上所述

除了sourcesink阶段,AggregatedCommit操作也会写入s3File。错误发生在commit期间,说明初始化S3Conf时并没有走buildWithConfig方法,而是使用了默认值。

由于S3Conf类的属性是静态的,反序列化时会重新加载静态属性,导致SCHEMA被重新赋值为默认的s3n

资料参考:https://wiki.apache.org/hadoop/AmazonS3

s3:基于Block块的文件系统

S3 Block FileSystem(URI scheme:s3)由S3支持的基于块的文件系统。 文件存储为块,就像HDFS一样。 这样可以有效地实现重命名。 此文件系统需要您为文件系统专用一个存储桶 - 您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区。 此文件系统存储的文件大于5GB,但不能与其他S3工具进行互操作。

s3n:基于对象存储的文件系统

S3 Native FileSystem(URI scheme:s3n)用于在S3上读取和写入常规文件的本机文件系统。 这个文件系统的优点是您可以访问使用其他工具编写的S3上的文件。 相反,其他工具可以访问使用Hadoop编写的文件。 缺点是S3的文件大小限制为5GB。

s3a:基于对象存储的文件系统

S3A(URI方案:s3a)是S3 Native,s3n fs的继承者,S3a:系统使用Amazon的库与S3进行交互。 这允许S3A支持较大的文件(不超过5GB的限制),更高的性能操作等等。 文件系统旨在替代S3 Native:从s3n:// URL可访问的所有对象也应该通过替换URL模式从s3a访问。

public class S3Conf extends HadoopConf {
    private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
    private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
    private static final String S3A_SCHEMA = "s3a";
    private static final String DEFAULT_SCHEMA = "s3n";
    private static String SCHEMA = DEFAULT_SCHEMA;

    @Override
    public String getFsHdfsImpl() {
        return switchHdfsImpl();
    }

    @Override
    public String getSchema() {
        return SCHEMA;
    }

    private S3Conf(String hdfsNameKey) {
        super(hdfsNameKey);
    }

    public static HadoopConf buildWithConfig(Config config) {

        HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));
        String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
        if (bucketName.startsWith(S3A_SCHEMA)) {
            SCHEMA = S3A_SCHEMA;
        }
        HashMap<String, String> s3Options = new HashMap<>();
        putS3SK(s3Options, config);
        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
        }

        s3Options.put(
                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
                config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));
        s3Options.put(
                S3ConfigOptions.FS_S3A_ENDPOINT.key(),
                config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));
        hadoopConf.setExtraOptions(s3Options);
        return hadoopConf;
    }

    public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
        Config config = readonlyConfig.toConfig();
        HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));
        String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
        if (bucketName.startsWith(S3A_SCHEMA)) {
            SCHEMA = S3A_SCHEMA;
        }
        HashMap<String, String> s3Options = new HashMap<>();
        putS3SK(s3Options, config);
        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
        }

        s3Options.put(
                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
                readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
        s3Options.put(
                S3ConfigOptions.FS_S3A_ENDPOINT.key(),
                readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));
        hadoopConf.setExtraOptions(s3Options);
        return hadoopConf;
    }

    private String switchHdfsImpl() {
        switch (SCHEMA) {
            case S3A_SCHEMA:
                return HDFS_S3A_IMPL;
            default:
                return HDFS_S3N_IMPL;
        }
    }

    private static void putS3SK(Map<String, String> s3Options, Config config) {
        if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
                && !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
            return;
        }
        String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
        String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
        if (S3A_SCHEMA.equals(SCHEMA)) {
            s3Options.put("fs.s3a.access.key", accessKey);
            s3Options.put("fs.s3a.secret.key", secretKey);
            return;
        }
        // default s3n
        s3Options.put("fs.s3n.awsAccessKeyId", accessKey);
        s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);
    }
}

参考了反序列的知识才了解到这个情况:

当对一个包含静态成员的类进行反序列化时,静态成员不会恢复为之前的状态,而是保持在其初始状态。任何静态变量的值都是与该类本身相关的,

4. 解决方案

  • 1.去掉stastic修饰,把有参构造换成无参构造和静态工厂方法:

  • 2.保留stastic静态方法,使用getSchema方法代替静态属性调用:

由此可见,代码中的细节问题,即使看似微不足道,也可能引发严重的后果。一个简单的静态修饰符的误用,不仅能导致程序行为异常,更可能导致系统稳定性和安全性的大问题。

相关的issues已提交,大家有兴趣可以查看:

  • [bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be non-static to avoid being reassigned after deserialization by LeonYoah · Pull Request #6717 · apache/seatunnel (github.com)

  • [Bug] [S3File] [zeta-local] Error writing to S3File in version 2.3.4:: Java lang. An IllegalStateException: Connection pool shut down · Issue #6678 · apache/seatunnel (github.com)

四、有待研究

1.为什么只有local模式会报错:

推测可能是cluster模式是分布式的,每个算子分布在不同的机器上,所以本地缓存不会被使用,类似于没有走缓存。

2.为什么本地IDEA执行local模式却没问题

可能是Windows和Linux的线程调度机制不同导致的。

结论

通过这次对Apache SeaTunnel S3 File写入报错问题的分析与解决,希望这些经验能帮助到遇到类似问题的开发者,同时也提醒大家在处理分布式系统时注意细节问题,以免引发不必要的故障。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1894538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

江门雅图仕职业技术学校领导一行莅临泰迪智能科技参观调研

7月2日&#xff0c;江门雅图仕职业技术学校总校长肖胜阳、校长许昌、办公室主任任志娟等莅临广东泰迪智能科技股份有限公司产教融合实训中心参观调研。泰迪智能科技董事长张良均、副总经理施兴、产品中心总监周东平、校企合作经理吴桂锋进行热情接待&#xff0c;双方就学校专业…

Python用于处理 DNS 查询库之Dnspython 使用详解

概要 Dnspython 是一个开源的 Python 库,专门用于处理 DNS 查询。它被设计为既简单易用又功能强大,可以满足从简单到复杂的各种 DNS 相关需求。无论是进行基础的 DNS 查询还是进行高级的 DNS 服务器管理,dnspython 都能提供相应的功能。 这个库支持包括 A、AAAA、MX、TXT …

汉光联创HGLM2200N黑白激光多功能一体机加粉及常见问题处理

基本参数&#xff1a; 机器型号&#xff1a;HGLM2200N 产品名称&#xff1a;A4黑白激光多功能一体机 基础功能&#xff1a;打印、扫描、复印 打印速度&#xff1a;22页/分钟 纸张输入容量&#xff1a;150-249页 单面支持纸张尺寸&#xff1a;A4、A5、A6 产品尺寸&#x…

拓展欧几里得和裴蜀定理

裴蜀定理&#xff08;或贝祖定理&#xff09;说明了对任何整数a、b和它们的最大公约数d&#xff0c;关于未知数x和y的线性不定方程&#xff08;称为裴蜀等式&#xff09;&#xff1a;若a,b是整数,且gcd(a,b)d&#xff0c;那么对于任意的整数x,y,axby都一定是d的倍数&#xff0c…

pytorch中的contiguous()

官方文档&#xff1a;https://pytorch.org/docs/stable/generated/torch.Tensor.contiguous.html 其描述contiguous为&#xff1a; Returns a contiguous in memory tensor containing the same data as self tensor. If self tensor is already in the specified memory forma…

音乐发行平台无加密开源源码

适用于唱片公司&#xff0c;用于接收物料&#xff0c;下载物料功能&#xff1a;个人或机构认证&#xff0c;上传专辑和歌曲&#xff0c;版税结算环境要求php7.4Nginx 1、导入数据库 2、/inc/conn.php里填写数据库密码等后台路径/admin&#xff08;可自行修改任意入口名称&…

Java中子类继承和方法重写_java重写父类方法参数变了怎么改

public(非私有)private私有()构造方法不能继承不能继承成员变量能继承能继承成员方法能继承不能继承 1.也不能继承父类的有参构造方法,具体看构造函数继承特点 2.私有的成员变量相当于从父类拷贝一份拿过来用的,不能直接用,需要get/set方法 继承特点 继承中 成员变量访问特点:如…

重参数化技巧

Q&#xff1a;标准正态分布 P&#xff1a;预期的分布&#xff08;假设符合正态分布&#xff09; 学习与 - 手推 Diffusion Model (DDPM) 1/3 &#xff1a;数学原理推导_哔哩哔哩_bilibili

【test】小爱同学通过esp32控制电脑开关

文章目录 一、环境准备二、开关机原理数据传输框架 三、环境搭建1.巴法云平台设置2.米家设置3.windows网络唤醒设置4.搭建esp32开发环境并部署&#xff08;1&#xff09;新建项目&#xff08;2&#xff09;导入esp32库&#xff08;3&#xff09; 添加库&#xff08;4&#xff0…

透过 Go 语言探索 Linux 网络通信的本质

大家好&#xff0c;我是码农先森。 前言 各种编程语言百花齐放、百家争鸣&#xff0c;但是 “万变不离其中”。对于网络通信而言&#xff0c;每一种编程语言的实现方式都不一样&#xff1b;但其实&#xff0c;调用的底层逻辑都是一样的。linux 系统底层向上提供了统一的 Sock…

君子签区块链+AI,驱动组织实现高效合同管理、精准风险控制

在传统合同签署的过程中&#xff0c;企业、组织、机构都面临着合同签署与管理的诸多问题和挑战&#xff1a;合同种类繁多、数量庞大导致起草效率低下&#xff1b;管理流程繁琐、权限分散使得审批周期冗长且效率低下&#xff1b;合同签订版本难以精准复核&#xff0c;风险防控更…

大型网站软件系统架构演进过程

在我们的生活中,通常会使用大型网站系统,比如购物网站淘宝,京东,阿里1688;大型搜索引擎网站百度,社交类的如腾讯旗下的微信,QQ及新浪旗下的微博等,他们通常都有一下特点: 高并发、大流量&#xff1a;这些系统必须能够处理成千上万甚至数百万的并发用户请求&#xff0c;以及持续…

深入理解pytest fixture:提升测试的灵活性和可维护性!

在现代软件开发中&#xff0c;测试是保证代码质量的重要环节。pytest作为一个强大的测试框架&#xff0c;以其灵活的fixture系统脱颖而出。本文将详细介绍pytest中的fixture概念&#xff0c;通过具体案例展示其应用&#xff0c;并说明如何利用fixture提高测试的灵活性和可维护性…

CVPR 2024最佳论文:“神兵”的组合器 Generative Image Dynamics

CVPR 2024的最佳论文来自谷歌、美国加州大学圣迭戈分校。两篇都来至于视频生成领域&#xff0c;可见今年外界对视频生成领域关注度很高。今天的这篇是“Generative Image Dynamics”&#xff0c;Google Research发布的。它的研究成果令人震惊&#xff0c;从单张RGB图像生成连续…

VIM介绍

VIM&#xff08;Vi IMproved&#xff09;是一种高度可配置的文本编辑器&#xff0c;用于有效地创建和更改任何类型的文本。它是从 vi 编辑器发展而来的&#xff0c;后者最初是 UNIX 系统上的一个文本编辑器。VIM 以其键盘驱动的界面和强大的文本处理能力而闻名&#xff0c;是许…

【pytorch14】感知机

单层感知机模型 对于单层的感知机&#xff0c;它的激活函数是一个sigmoid 对于符号的定义做一个规范化&#xff0c;输入层每一层进行一个编号 输入是第0层&#xff0c;上标0表示属于输入层&#xff0c;下标0到n表示一共有n个节点(这里严格来说应该是0~n-1&#xff0c;为了书写…

阿里Qwen2-72B大模型已是开源榜的王者,为什么还要推出其他参数模型,被其他模型打榜?

6 月 27 日&#xff0c;全球知名的开源平台 Hugging Face 的联合创始人兼首席执行官 Clem 在社交平台激动宣布&#xff0c;阿里 Qwen2-72B 成为了开源模型排行榜的王者。 这是一件大好事&#xff0c;说明了我们在大模型领域从先前的追赶&#xff0c;逐渐走向了领导&#xff0c;…

【 VIPKID-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

Redis 7.x 系列【17】四种持久化策略

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 案例演示2.1 无持久化2.2 RDB2.3 AOF2.4 混合模式2.4.1 方式一&#xff1a;…

初学Spring之自动装配 Bean

Bean 的作用域&#xff1a; 1.单例模式&#xff08;Spring 默认机制&#xff09; scope“singleton” 2.原型模式&#xff1a;每次从容器中 get 时&#xff0c;都会产生一个新对象 scope"prototype" 3. request、session、application&#xff0c;只能在 web 开…