DataX二次开发——(9)新增s3reader和s3writer模块

news2025/2/21 19:53:44

1 背景

DataX3.0支持阿里的OSS的读写,但没支持S3的读写,虽然OSS的也是基于S3协议去做二开的,但是一些参数有点区别,所以按照阿里的OSSReader和OSSWriter开发了S3Reader和S3Writer。

2 代码开发

2.1 s3reader

2.1.1 项目结构

 2.1.2 代码如下

package.xml

<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
                <include>plugin_job_template.json</include>
            </includes>
            <outputDirectory>plugin/reader/s3reader</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>s3reader-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/reader/s3reader</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/reader/s3reader/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

Constant.java

package com.alibaba.datax.plugin.reader.s3reader;

public class Constant {
    public static final String OBJECT = "object";
    public static final int CONNECT_TIMEOUT = 50000;
    public static final String SIGNER_OVERRIDE = "S3SignerType";
}

 Key.java

public class Key {
    public static final String ENDPOINT = "endpoint";

    public static final String ACCESSKEY = "accessKey";

    public static final String SECRETKEY = "secretKey";

    public static final String BUCKET = "bucket";

    public static final String OBJECT = "object";
}

S3Reader.java

package com.alibaba.datax.plugin.reader.s3reader;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.Sets;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;


public class S3Reader extends Reader {
    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration readerOriginConfig = null;

        @Override
        public void init() {
            LOG.debug("init() begin...");
            this.readerOriginConfig = this.getPluginJobConf();
            this.validate();
            LOG.debug("init() ok and end...");
        }

        private void validate() {
            String endpoint = this.readerOriginConfig.getString(Key.ENDPOINT);
            if (StringUtils.isBlank(endpoint)) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                        "您需要指定 endpoint");
            }

            String secretKey = this.readerOriginConfig.getString(Key.SECRETKEY);
            if (StringUtils.isBlank(secretKey)) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                        "您需要指定 secretKey");
            }

            String accessKey = this.readerOriginConfig.getString(Key.ACCESSKEY);
            if (StringUtils.isBlank(accessKey)) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                        "您需要指定 accessKey");
            }

            String bucket = this.readerOriginConfig.getString(Key.BUCKET);
            if (StringUtils.isBlank(bucket)) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                        "您需要指定 bucket");
            }

            String object = this.readerOriginConfig.getString(Key.OBJECT);
            if (StringUtils.isBlank(object)) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                        "您需要指定 object");
            }

            String fieldDelimiter = this.readerOriginConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.FIELD_DELIMITER);
            // warn: need length 1
            if (null == fieldDelimiter || fieldDelimiter.length() == 0) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                        "您需要指定 fieldDelimiter");
            }

            String encoding = this.readerOriginConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                            com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
            try {
                Charsets.toCharset(encoding);
            } catch (UnsupportedCharsetException uce) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.ILLEGAL_VALUE,
                        String.format("不支持的编码格式 : [%s]", encoding), uce);
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.ILLEGAL_VALUE,
                        String.format("运行配置异常 : %s", e.getMessage()), e);
            }

            // 检测是column 是否为 ["*"] 若是则填为空
            List<Configuration> column = this.readerOriginConfig
                    .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
            if (null != column
                    && 1 == column.size()
                    && ("\"*\"".equals(column.get(0).toString()) || "'*'"
                    .equals(column.get(0).toString()))) {
                readerOriginConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN,
                                new ArrayList<String>());
            } else {
                // column: 1. index type 2.value type 3.when type is Data, may
                // have
                // format
                List<Configuration> columns = this.readerOriginConfig
                        .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);

                if (null == columns || columns.size() == 0) {
                    throw DataXException.asDataXException(
                            S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                            "您需要指定 columns");
                }

                if (null != columns && columns.size() != 0) {
                    for (Configuration eachColumnConf : columns) {
                        eachColumnConf
                                .getNecessaryValue(
                                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE,
                                        S3ReaderErrorCode.REQUIRED_VALUE);
                        Integer columnIndex = eachColumnConf
                                .getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.INDEX);
                        String columnValue = eachColumnConf
                                .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.VALUE);

                        if (null == columnIndex && null == columnValue) {
                            throw DataXException.asDataXException(
                                    S3ReaderErrorCode.NO_INDEX_VALUE,
                                    "由于您配置了type, 则至少需要配置 index 或 value");
                        }

                        if (null != columnIndex && null != columnValue) {
                            throw DataXException.asDataXException(
                                    S3ReaderErrorCode.MIXED_INDEX_VALUE,
                                    "您混合配置了index, value, 每一列同时仅能选择其中一种");
                        }

                    }
                }
            }

            // only support compress: gzip,bzip2,zip
            String compress = this.readerOriginConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS);
            if (StringUtils.isBlank(compress)) {
                this.readerOriginConfig.set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS, null);
            } else {
                Set<String> supportedCompress = Sets.newHashSet("gzip", "bzip2", "zip");
                compress = compress.toLowerCase().trim();
                if (!supportedCompress.contains(compress)) {
                    throw DataXException.asDataXException(
                            S3ReaderErrorCode.ILLEGAL_VALUE,
                            String.format(
                                    "仅支持 gzip, bzip2, zip 文件压缩格式 , 不支持您配置的文件压缩格式: [%s]",
                                    compress));
                }
                this.readerOriginConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
                                compress);
            }
        }

        @Override
        public void prepare() {
            LOG.debug("prepare()");
        }

        @Override
        public void post() {
            LOG.debug("post()");
        }

        @Override
        public void destroy() {
            LOG.debug("destroy()");
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            LOG.debug("split() begin...");
            List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();

            // 将每个单独的 object 作为一个 slice
            List<String> objects = parseOriginObjects(readerOriginConfig
                    .getList(Constant.OBJECT, String.class));
            if (0 == objects.size()) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.EMPTY_BUCKET_EXCEPTION,
                        String.format(
                                "未能找到待读取的Object,请确认您的配置项bucket: %s object: %s",
                                this.readerOriginConfig.get(Key.BUCKET),
                                this.readerOriginConfig.get(Key.OBJECT)));
            }

            for (String object : objects) {
                Configuration splitedConfig = this.readerOriginConfig.clone();
                splitedConfig.set(Constant.OBJECT, object);
                readerSplitConfigs.add(splitedConfig);
                LOG.info(String.format("S3 object to be read:%s", object));
            }
            LOG.debug("split() ok and end...");
            return readerSplitConfigs;
        }

        private List<String> parseOriginObjects(List<String> originObjects) {
            List<String> parsedObjects = new ArrayList<String>();

            for (String object : originObjects) {
                int firstMetaChar = (object.indexOf('*') > object.indexOf('?')) ? object
                        .indexOf('*') : object.indexOf('?');

                if (firstMetaChar != -1) {
                    int lastDirSeparator = object.lastIndexOf(
                            IOUtils.DIR_SEPARATOR, firstMetaChar);
                    String parentDir = object
                            .substring(0, lastDirSeparator + 1);
                    List<String> remoteObjects = getRemoteObjects(parentDir);
                    Pattern pattern = Pattern.compile(object.replace("*", ".*")
                            .replace("?", ".?"));

                    for (String remoteObject : remoteObjects) {
                        if (pattern.matcher(remoteObject).matches()) {
                            parsedObjects.add(remoteObject);
                        }
                    }
                } else {
                    parsedObjects.add(object);
                }
            }
            return parsedObjects;
        }

        private List<String> getRemoteObjects(String parentDir) {

            LOG.debug(String.format("父文件夹 : %s", parentDir));
            List<String> remoteObjects = new ArrayList<String>();
            AmazonS3 client = S3Util.initS3Client(readerOriginConfig);
            try {
                ListObjectsRequest listObjectsRequest = new ListObjectsRequest(
                        readerOriginConfig.getString(Key.BUCKET), parentDir, null, null, null
                );
                ObjectListing objectList;
                do {
                    objectList = client.listObjects(listObjectsRequest);
                    for (S3ObjectSummary objectSummary : objectList
                            .getObjectSummaries()) {
                        LOG.debug(String.format("找到文件 : %s",
                                objectSummary.getKey()));
                        remoteObjects.add(objectSummary.getKey());
                    }
                    listObjectsRequest.setMarker(objectList.getNextMarker());
                    LOG.debug(listObjectsRequest.getMarker());
                    LOG.debug(String.valueOf(objectList.isTruncated()));

                } while (objectList.isTruncated());
            } catch (IllegalArgumentException e) {
                throw DataXException.asDataXException(
                        S3ReaderErrorCode.S3_EXCEPTION, e.getMessage());
            }

            return remoteObjects;
        }
    }

    public static class Task extends Reader.Task {
        private static Logger LOG = LoggerFactory.getLogger(Reader.Task.class);

        private Configuration readerSliceConfig;

        @Override
        public void startRead(RecordSender recordSender) {
            LOG.debug("read start");
            String bucket = readerSliceConfig.getString(Key.BUCKET);
            String object = readerSliceConfig.getString(Key.OBJECT);
            AmazonS3 client = S3Util.initS3Client(readerSliceConfig);

            S3Object s3Object = client.getObject(bucket, object);
            InputStream objectStream = s3Object.getObjectContent();
            UnstructuredStorageReaderUtil.readFromStream(objectStream, object,
                    this.readerSliceConfig, recordSender,
                    this.getTaskPluginCollector());
            recordSender.flush();
        }

        @Override
        public void init() {
            this.readerSliceConfig = this.getPluginJobConf();
        }

        @Override
        public void destroy() {

        }
    }
}

S3ReaderErrorCode.java

package com.alibaba.datax.plugin.reader.s3reader;

import com.alibaba.datax.common.spi.ErrorCode;

public enum S3ReaderErrorCode implements ErrorCode {
    // TODO: 修改错误码类型
    RUNTIME_EXCEPTION("S3Reader-00", "运行时异常"),
    S3_EXCEPTION("OssFileReader-01", "OSS配置异常"),
    CONFIG_INVALID_EXCEPTION("OssFileReader-02", "参数配置错误"),
    NOT_SUPPORT_TYPE("S3Reader-03", "不支持的类型"),
    CAST_VALUE_TYPE_ERROR("OssFileReader-04", "无法完成指定类型的转换"),
    SECURITY_EXCEPTION("S3Reader-05", "缺少权限"),
    ILLEGAL_VALUE("S3Reader-06", "值错误"),
    REQUIRED_VALUE("S3Reader-07", "必选项"),
    NO_INDEX_VALUE("S3Reader-08", "没有 Index"),
    MIXED_INDEX_VALUE("S3Reader-09", "index 和 value 混合"),
    EMPTY_BUCKET_EXCEPTION("S3Reader-10", "您尝试读取的Bucket为空");

    private final String code;
    private final String description;

    private S3ReaderErrorCode(String code, String description) {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode() {
        return this.code;
    }

    @Override
    public String getDescription() {
        return this.description;
    }

    @Override
    public String toString() {
        return String.format("Code:[%s], Description:[%s].", this.code,
                this.description);
    }
}

 S3Util.java

package com.alibaba.datax.plugin.reader.s3reader;

import com.alibaba.datax.common.util.Configuration;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description: obs 工具类
 * @Author: chenweifeng
 * @Date: 2022年11月18日 下午1:56
 **/
public class S3Util {
    public static void main(String[] args) {
        Map<String, Object> map = new HashMap<>();
        map.put(Key.ENDPOINT, "http://x.x.x.x:9383");
        map.put(Key.ACCESSKEY, "admin");
        map.put(Key.SECRETKEY, "123456");
        Configuration conf = Configuration.from(map);
        initS3Client(conf);
    }

    /**
     * 初始化S3Client
     *
     * @param conf
     * @return
     */
    public static AmazonS3 initS3Client(Configuration conf) {
        String endpoint = conf.getString(Key.ENDPOINT);
        String accessKey = conf.getString(Key.ACCESSKEY);
        String secretKey = conf.getString(Key.SECRETKEY);

        AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard();

        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProtocol(com.amazonaws.Protocol.HTTPS);
        clientConfiguration.setConnectionTimeout(Constant.CONNECT_TIMEOUT);
        clientConfiguration.setSignerOverride(Constant.SIGNER_OVERRIDE);
        amazonS3ClientBuilder.setClientConfiguration(clientConfiguration);

        AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
        AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
        amazonS3ClientBuilder.setCredentials(awsCredentialsProvider);

        AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, null);
        amazonS3ClientBuilder.setEndpointConfiguration(endpointConfiguration);

        AmazonS3 amazonS3 = amazonS3ClientBuilder.build();
        return amazonS3;
    }
}

plugin.json

{
    "name": "s3reader",
    "class": "com.alibaba.datax.plugin.reader.s3reader.S3Reader",
    "description": "",
    "developer": "alibaba"
}

2.2 s3writer

2.2.1 项目结构

 2.2.2 代码开发

package.xml

<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
                <include>plugin_job_template.json</include>
            </includes>
            <outputDirectory>plugin/writer/s3writer</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>s3writer-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/writer/s3writer</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/writer/s3writer/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

Constant.java

package com.alibaba.datax.plugin.writer.s3writer;


public class Constant {
    public static final String OBJECT = "object";
    public static final int CONNECT_TIMEOUT = 50000;
    public static final String SIGNER_OVERRIDE = "S3SignerType";
}

Key.java

package com.alibaba.datax.plugin.writer.s3writer;

public class Key {
    public static final String ENDPOINT = "endpoint";

    public static final String ACCESSKEY = "accessKey";

    public static final String SECRETKEY = "secretKey";

    public static final String BUCKET = "bucket";

    public static final String OBJECT = "object";

}

S3Util.java 如上

S3Writer.java

package com.alibaba.datax.plugin.writer.s3writer;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.unstructuredstorage.writer.TextCsvWriterManager;
import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredStorageWriterUtil;
import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredWriter;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;


public class S3Writer extends Writer {
    public static class Job extends Writer.Job {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration writerSliceConfig = null;
        private AmazonS3 s3client = null;

        @Override
        public void init() {
            this.writerSliceConfig = this.getPluginJobConf();
            this.validateParameter();
            this.s3client = S3Util.initS3Client(this.writerSliceConfig);
        }

        private void validateParameter() {
            this.writerSliceConfig.getNecessaryValue(Key.ENDPOINT,
                    S3WriterErrorCode.REQUIRED_VALUE);
            this.writerSliceConfig.getNecessaryValue(Key.SECRETKEY,
                    S3WriterErrorCode.REQUIRED_VALUE);
            this.writerSliceConfig.getNecessaryValue(Key.ACCESSKEY,
                    S3WriterErrorCode.REQUIRED_VALUE);
            this.writerSliceConfig.getNecessaryValue(Key.BUCKET,
                    S3WriterErrorCode.REQUIRED_VALUE);
            this.writerSliceConfig.getNecessaryValue(Key.OBJECT,
                    S3WriterErrorCode.REQUIRED_VALUE);

            // warn: do not support compress!!
            String compress = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.COMPRESS);
            if (StringUtils.isNotBlank(compress)) {
                String errorMessage = String.format("S3写暂时不支持压缩, 该压缩配置项[%s]不起效用", compress);
                LOG.error(errorMessage);
                throw DataXException.asDataXException(S3WriterErrorCode.ILLEGAL_VALUE, errorMessage);
            }

            UnstructuredStorageWriterUtil.validateParameter(this.writerSliceConfig);
        }

        @Override
        public void prepare() {
            LOG.info("begin do prepare...");
            String bucket = this.writerSliceConfig.getString(Key.BUCKET);
            String object = this.writerSliceConfig.getString(Key.OBJECT);
            String writeMode = this.writerSliceConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE);
            // warn: bucket is not exists, create it
            try {
                // warn: do not create bucket for user
                if (!this.s3client.doesBucketExist(bucket)) {
                    // this.s3client.createBucket(bucket);
                    String errorMessage = String.format(
                            "您配置的bucket [%s] 不存在, 请您确认您的配置项.", bucket);
                    LOG.error(errorMessage);
                    throw DataXException.asDataXException(
                            S3WriterErrorCode.ILLEGAL_VALUE, errorMessage);
                }
                LOG.info(String.format("access control details [%s].",
                        this.s3client.getBucketAcl(bucket).toString()));

                // truncate option handler
                if ("truncate".equals(writeMode)) {
                    LOG.info(String
                            .format("由于您配置了writeMode truncate, 开始清理 [%s] 下面以 [%s] 开头的Object",
                                    bucket, object));
                    // warn: 默认情况下,如果Bucket中的Object数量大于100,则只会返回100个Object
                    while (true) {
                        ObjectListing listing = null;
                        LOG.info("list objects with listObject(bucket, object)");
                        listing = this.s3client.listObjects(bucket, object);
                        List<S3ObjectSummary> objectSummarys = listing.getObjectSummaries();
                        for (S3ObjectSummary objectSummary : objectSummarys) {
                            LOG.info(String.format("delete oss object [%s].", objectSummary.getKey()));
                            this.s3client.deleteObject(bucket, objectSummary.getKey());
                        }
                        if (objectSummarys.isEmpty()) {
                            break;
                        }
                    }
                } else if ("append".equals(writeMode)) {
                    LOG.info(String
                            .format("由于您配置了writeMode append, 写入前不做清理工作, 数据写入Bucket [%s] 下, 写入相应Object的前缀为  [%s]",
                                    bucket, object));
                } else if ("nonConflict".equals(writeMode)) {
                    LOG.info(String
                            .format("由于您配置了writeMode nonConflict, 开始检查Bucket [%s] 下面以 [%s] 命名开头的Object",
                                    bucket, object));
                    ObjectListing listing = this.s3client.listObjects(bucket,
                            object);
                    if (0 < listing.getObjectSummaries().size()) {
                        StringBuilder objectKeys = new StringBuilder();
                        objectKeys.append("[ ");
                        for (S3ObjectSummary ossObjectSummary : listing.getObjectSummaries()) {
                            objectKeys.append(ossObjectSummary.getKey() + " ,");
                        }
                        objectKeys.append(" ]");
                        LOG.info(String.format("object with prefix [%s] details: %s", object, objectKeys.toString()));
                        throw DataXException.asDataXException(
                                S3WriterErrorCode.ILLEGAL_VALUE,
                                String.format("您配置的Bucket: [%s] 下面存在其Object有前缀 [%s].", bucket, object));
                    }
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        S3WriterErrorCode.S3_COMM_ERROR, e.getMessage());
            }
        }

        @Override
        public void post() {

        }

        @Override
        public void destroy() {

        }

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            LOG.info("begin do split...");
            List<Configuration> writerSplitConfigs = new ArrayList<Configuration>();
            String object = this.writerSliceConfig.getString(Key.OBJECT);
            String bucket = this.writerSliceConfig.getString(Key.BUCKET);

            Set<String> allObjects = new HashSet<String>();
            try {
                List<S3ObjectSummary> ossObjectlisting = this.s3client.listObjects(bucket).getObjectSummaries();
                for (S3ObjectSummary objectSummary : ossObjectlisting) {
                    allObjects.add(objectSummary.getKey());
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        S3WriterErrorCode.S3_COMM_ERROR, e.getMessage());
            }

            String objectSuffix;
            for (int i = 0; i < mandatoryNumber; i++) {
                // handle same object name
                Configuration splitedTaskConfig = this.writerSliceConfig
                        .clone();

                String fullObjectName = null;
                objectSuffix = StringUtils.replace(
                        UUID.randomUUID().toString(), "-", "");
                fullObjectName = String.format("%s__%s", object, objectSuffix);
                while (allObjects.contains(fullObjectName)) {
                    objectSuffix = StringUtils.replace(UUID.randomUUID()
                            .toString(), "-", "");
                    fullObjectName = String.format("%s__%s", object,
                            objectSuffix);
                }
                allObjects.add(fullObjectName);

                splitedTaskConfig.set(Key.OBJECT, fullObjectName);

                LOG.info(String.format("splited write object name:[%s]",
                        fullObjectName));

                writerSplitConfigs.add(splitedTaskConfig);
            }
            LOG.info("end do split.");
            return writerSplitConfigs;
        }
    }

    public static class Task extends Writer.Task {
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);

        private AmazonS3 s3client;
        private Configuration writerSliceConfig;
        private String bucket;
        private String object;
        private String nullFormat;
        private String encoding;
        private char fieldDelimiter;
        private String dateFormat;
        private DateFormat dateParse;
        private String fileFormat;
        private List<String> header;
        private Long maxFileSize;// MB
        private String suffix;

        @Override
        public void init() {
            this.writerSliceConfig = this.getPluginJobConf();
            this.s3client = S3Util.initS3Client(this.writerSliceConfig);
            this.bucket = this.writerSliceConfig.getString(Key.BUCKET);
            this.object = this.writerSliceConfig.getString(Key.OBJECT);
            this.nullFormat = this.writerSliceConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.NULL_FORMAT);
            this.dateFormat = this.writerSliceConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT,
                            null);
            if (StringUtils.isNotBlank(this.dateFormat)) {
                this.dateParse = new SimpleDateFormat(dateFormat);
            }
            this.encoding = this.writerSliceConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.ENCODING,
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_ENCODING);
            this.fieldDelimiter = this.writerSliceConfig
                    .getChar(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FIELD_DELIMITER,
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_FIELD_DELIMITER);
            this.fileFormat = this.writerSliceConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_FORMAT,
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.FILE_FORMAT_TEXT);
            this.header = this.writerSliceConfig
                    .getList(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.HEADER,
                            null, String.class);
            this.maxFileSize = this.writerSliceConfig
                    .getLong(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.MAX_FILE_SIZE,
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.MAX_FILE_SIZE);
            this.suffix = this.writerSliceConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Key.SUFFIX,
                            com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_SUFFIX);
            this.suffix = this.suffix.trim();// warn: need trim
        }

        @Override
        public void startWrite(RecordReceiver lineReceiver) {
            // 设置每块字符串长度
            final long partSize = 1024 * 1024 * 10L;
            long numberCacul = (this.maxFileSize * 1024 * 1024L) / partSize;
            final long maxPartNumber = numberCacul >= 1 ? numberCacul : 1;
            int objectRollingNumber = 0;
            //warn: may be StringBuffer->StringBuilder
            StringWriter sw = new StringWriter();
            StringBuffer sb = sw.getBuffer();
            UnstructuredWriter unstructuredWriter = TextCsvWriterManager
                    .produceUnstructuredWriter(this.fileFormat,
                            this.fieldDelimiter, sw);
            Record record = null;

            LOG.info(String.format(
                    "begin do write, each object maxFileSize: [%s]MB...",
                    maxPartNumber * 10));
            String currentObject = this.object;
            InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest = null;
            InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
            boolean gotData = false;
            List<PartETag> currentPartETags = null;
            // to do:
            // 可以根据currentPartNumber做分块级别的重试,InitiateMultipartUploadRequest多次一个currentPartNumber会覆盖原有
            int currentPartNumber = 1;
            try {
                // warn
                boolean needInitMultipartTransform = true;
                while ((record = lineReceiver.getFromReader()) != null) {
                    gotData = true;
                    // init:begin new multipart upload
                    if (needInitMultipartTransform) {
                        if (objectRollingNumber == 0) {
                            if (StringUtils.isBlank(this.suffix)) {
                                currentObject = this.object;
                            } else {
                                currentObject = String.format("%s%s",
                                        this.object, this.suffix);
                            }
                        } else {
                            // currentObject is like(no suffix)
                            // myfile__9b886b70fbef11e59a3600163e00068c_1
                            if (StringUtils.isBlank(this.suffix)) {
                                currentObject = String.format("%s_%s",
                                        this.object, objectRollingNumber);
                            } else {
                                // or with suffix
                                // myfile__9b886b70fbef11e59a3600163e00068c_1.csv
                                currentObject = String.format("%s_%s%s",
                                        this.object, objectRollingNumber,
                                        this.suffix);
                            }
                        }
                        objectRollingNumber++;
                        currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
                                this.bucket, currentObject);
                        currentInitiateMultipartUploadResult = this.s3client
                                .initiateMultipartUpload(currentInitiateMultipartUploadRequest);
                        currentPartETags = new ArrayList<PartETag>();
                        LOG.info(String
                                .format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
                                        this.bucket, currentObject,
                                        currentInitiateMultipartUploadResult
                                                .getUploadId()));

                        // each object's header
                        if (null != this.header && !this.header.isEmpty()) {
                            unstructuredWriter.writeOneRecord(this.header);
                        }
                        // warn
                        needInitMultipartTransform = false;
                        currentPartNumber = 1;
                    }

                    // write: upload data to current object
                    UnstructuredStorageWriterUtil.transportOneRecord(record,
                            this.nullFormat, this.dateParse,
                            this.getTaskPluginCollector(), unstructuredWriter);

                    if (sb.length() >= partSize) {
                        this.uploadOnePart(sw, currentPartNumber,
                                currentInitiateMultipartUploadResult,
                                currentPartETags, currentObject);
                        currentPartNumber++;
                        sb.setLength(0);
                    }

                    // save: end current multipart upload
                    if (currentPartNumber > maxPartNumber) {
                        LOG.info(String
                                .format("current object [%s] size > %s, complete current multipart upload and begin new one",
                                        currentObject, currentPartNumber
                                                * partSize));
                        CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
                                this.bucket, currentObject,
                                currentInitiateMultipartUploadResult
                                        .getUploadId(), currentPartETags);
                        CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.s3client
                                .completeMultipartUpload(currentCompleteMultipartUploadRequest);
                        LOG.info(String.format(
                                "final object [%s] etag is:[%s]",
                                currentObject,
                                currentCompleteMultipartUploadResult.getETag()));
                        // warn
                        needInitMultipartTransform = true;
                    }
                }

                if (!gotData) {
                    LOG.info("Receive no data from the source.");
                    currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
                            this.bucket, currentObject);
                    currentInitiateMultipartUploadResult = this.s3client
                            .initiateMultipartUpload(currentInitiateMultipartUploadRequest);
                    currentPartETags = new ArrayList<PartETag>();
                    // each object's header
                    if (null != this.header && !this.header.isEmpty()) {
                        unstructuredWriter.writeOneRecord(this.header);
                    }
                }
                // warn: may be some data stall in sb
                if (0 < sb.length()) {
                    this.uploadOnePart(sw, currentPartNumber,
                            currentInitiateMultipartUploadResult,
                            currentPartETags, currentObject);
                }
                CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
                        this.bucket, currentObject,
                        currentInitiateMultipartUploadResult.getUploadId(),
                        currentPartETags);
                CompleteMultipartUploadResult completeMultipartUploadResult = this.s3client
                        .completeMultipartUpload(completeMultipartUploadRequest);
                LOG.info(String.format("final object etag is:[%s]",
                        completeMultipartUploadResult.getETag()));
            } catch (IOException e) {
                // 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
                // 都是字符串不认为有脏数据
                throw DataXException.asDataXException(
                        S3WriterErrorCode.Write_OBJECT_ERROR, e.getMessage());
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        S3WriterErrorCode.Write_OBJECT_ERROR, e.getMessage());
            }
            LOG.info("end do write");
        }

        /**
         * 对于同一个UploadID,该号码不但唯一标识这一块数据,也标识了这块数据在整个文件内的相对位置。
         * 如果你用同一个part号码,上传了新的数据,那么OSS上已有的这个号码的Part数据将被覆盖。
         *
         * @throws Exception
         */
        private void uploadOnePart(
                final StringWriter sw,
                final int partNumber,
                final InitiateMultipartUploadResult initiateMultipartUploadResult,
                final List<PartETag> partETags, final String currentObject)
                throws Exception {
            final String encoding = this.encoding;
            final String bucket = this.bucket;
            final AmazonS3 s3client = this.s3client;
            RetryUtil.executeWithRetry(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    byte[] byteArray = sw.toString().getBytes(encoding);
                    InputStream inputStream = new ByteArrayInputStream(
                            byteArray);
                    // 创建UploadPartRequest,上传分块
                    UploadPartRequest uploadPartRequest = new UploadPartRequest();
                    uploadPartRequest.setBucketName(bucket);
                    uploadPartRequest.setKey(currentObject);
                    uploadPartRequest.setUploadId(initiateMultipartUploadResult
                            .getUploadId());
                    uploadPartRequest.setInputStream(inputStream);
                    uploadPartRequest.setPartSize(byteArray.length);
                    uploadPartRequest.setPartNumber(partNumber);
                    UploadPartResult uploadPartResult = s3client
                            .uploadPart(uploadPartRequest);
                    partETags.add(uploadPartResult.getPartETag());
                    LOG.info(String
                            .format("upload part [%s] size [%s] Byte has been completed.",
                                    partNumber, byteArray.length));
                    IOUtils.closeQuietly(inputStream);
                    return true;
                }
            }, 3, 1000L, false);
        }

        @Override
        public void prepare() {

        }

        @Override
        public void post() {

        }

        @Override
        public void destroy() {

        }
    }
}

S3WriterErrorCode.java

package com.alibaba.datax.plugin.writer.s3writer;

import com.alibaba.datax.common.spi.ErrorCode;

public enum S3WriterErrorCode implements ErrorCode {

    CONFIG_INVALID_EXCEPTION("S3Writer-00", "您的参数配置错误."),
    REQUIRED_VALUE("S3Writer-01", "您缺失了必须填写的参数值."),
    ILLEGAL_VALUE("S3Writer-02", "您填写的参数值不合法."),
    Write_OBJECT_ERROR("S3Writer-03", "您配置的目标Object在写入时异常."),
    S3_COMM_ERROR("S3Writer-05", "执行相应的S3操作异常."),
    ;

    private final String code;
    private final String description;

    private S3WriterErrorCode(String code, String description) {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode() {
        return this.code;
    }

    @Override
    public String getDescription() {
        return this.description;
    }

    @Override
    public String toString() {
        return String.format("Code:[%s], Description:[%s].", this.code,
                this.description);
    }

}

plugin.json

{
    "name": "s3writer",
    "class": "com.alibaba.datax.plugin.writer.s3writer.S3Writer",
    "description": "",
    "developer": "chenweifeng"
}

3 调试运行和性能测试

s3reader 和 s3writer 模板

s3reader 和 s3writer 运行结果

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28926.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pytorch快速上手(8)-----pytorch优化器简介

文章目录一、简介二、optimizer属性方法1. zero_grad()2. step()3. add_param_group()4. state_dict()5. load_state_dict()学习率动量三、常见优化器介绍1. BGD&#xff08;Batch Gradient Descent&#xff09;2. Stochastic Gradient Descent&#xff08;SGD&#xff09;3. M…

记录一次因执行时间过长锁已经释放导致finally块再次unlock引发的异常

一、前言 因为我的一个需求需要请求一个耗时比较长的接口&#xff08;耗时长其实是对接方的锅&#xff09;&#xff0c;该接口交给了Spring事务管理&#xff0c;并且使用了分布式锁&#xff0c;但是在请求的时候&#xff0c;出现error,看日志发现是unlock的时候没有锁可以去解…

牛客网语法篇练习复合类型(二)

1.输入NxM矩阵&#xff0c;矩阵元素均为整数&#xff0c;计算其中大于零的元素之和。 a,b map(int,input().split()) list1 [] sum 0 for i in range(a):list1.extend(list(map(int,input().split()))) for i in list1:if i>0:sumi print(sum) 2.给你一个整数n&#xff…

辨别代码能否引发线程安全问题--避免在平时写代码时引发线程安全问题

前景提要&#xff1a; 本篇文章只是入门&#xff0c;目的在于在脑海中构建一个Java运行的模型&#xff0c;然后可以在平时写代码时对是否引发线程安全问题有感知。 文章目录引入了解辨别线程安全问题之前先来构建一个计算机运行模型了解线程安全问题怎么能不知道线程和进程了解…

搭建repo服务器管理多个git工程

参考自&#xff1a;搭建repo服务器管理多个git工程     repo系列讲解 —— Android系统源码(AOSP)下载 1、repo介绍 Android使用git作为代码管理工具&#xff0c;开发了gerrit进行代码审核&#xff0c;以便更好的对代码进行集中式管理。还开发了repo命令行工具&#xff0…

MySQL8.0 binlog进阶

MySQL8.0经过这几年的操揉磨治&#xff0c;已经上升到海平面了。其中binlog也悄然无声带来了不一样的变化。高可用核心复制基础binlog变化更应该进一步了解。从参数入手&#xff0c;了解带来的变化。 slave回放算法 slave_rows_search_algorithms 当使用基于 row-based复制格…

牛客网语法篇练习循环控制(二)

1.今天牛牛学到了回文串&#xff0c;他想在数字里面找回文&#xff0c;即回文数&#xff0c;回文数是正着读与倒着读都一样的数&#xff0c;比如1221&#xff0c;343是回文数&#xff0c;433不是回文数。请输出不超过n的回文数。 a int(input()) for i in range(1,a1):n str…

CSS-counter 计数器详细教程+使用场景示例

counter一. counter计数器二. 属性和方法1. 计数器命名/重置2. 计数器-值递增规则3. 计数器显示 counter() / counters() 函数三 代码示例1. 重新开始计数2. counters嵌套使用3. 借助CSS计数器呈现CSS var变量值一. counter计数器 计数器是一种特殊的数字跟踪器&#xff0c;通常…

systemd的unit配置文件详解

Systemd 是 Linux 的系统和服务的管理器&#xff0c;兼容 SysV 和 LSB初始化脚本&#xff0c;Systemd有以下特性&#xff1a; 积极的并行化能力使用套接字和 D-Bus 激活来启动服务提供按需启动守护进程&#xff0c;使用 Linux cgroups 跟踪进程支持系统状态的快照和恢复维护挂…

艾美捷Cas9核酸酶应用说明及实例展示

Product Description:Recombinant Streptococcus pyogenes Cas9 (wt) protein expressed in an E. coli . Form:Liquid Preparation Method:E. coli expression system Purity:≥ 95% by SDS-PAGE Activity:20 nM CRISPR/Cas9-C-NLS nuclease incubated for 1 hour at 37℃…

【新知实验室 TRTCIM】实时互动课堂最佳实践

【新知实验室 TRTC&IM】实时互动课堂最佳实践一、新知实验室-TRTC腾讯云音视频产品体验官计划活动简介二、产品简介TRTCIM三、最佳实践3.1 官方快速上手TRTC(快速跑通)3.1.1 注册腾讯云账号3.1.2 使用实时音视频(需先开通)3.1.3 创建应用3.1.4 查看项目(查看密钥和快速上手…

java基础—String

我们都知道 创建一个字符串最简单的方式是 String meaasge "java资讯";当然还可以用构造来创建 &#xff08;不推荐&#xff0c;开发中不要用&#xff09; String str2new String("java资讯");这两种创建最主要的区别在于&#xff0c;一个在公共池中&…

gitlab CI/CD 自动化部署vue项目到阿里云服务器步骤

目录1&#xff0c;gitlab托管vue项目2&#xff0c;本地项目连接到远程仓库3&#xff0c;设置gitlab-runner4&#xff0c;编写yml文件5&#xff0c;部署到阿里云服务器&#xff08;本地设置&#xff09;5.1 安装相关依赖5.2 vue项目中添加deploy.js文件5.3 注册deploy命令5.4 验…

文本生成图像工作简述2--常用数据集分析与汇总

文本到图像的 AI 模型仅根据简单的文字输入就可以生成图像。用户可以输入他们喜欢的任何文字提示——比如&#xff0c;“一只可爱的柯基犬住在一个用寿司做的房子里”——然后&#xff0c;人工智能就像施了魔法一样&#xff0c;会产生相应的图像。 文本生成图像&#xff08;te…

实验2:Arduino的nRF24L01双向收发实验

实验结果: 00节点向01发送:00ReqMesFor01 01节点向00发送:CodeNewNiceBoy 并且在串口打印出相应信息 硬件电路: 01 软件 00节点代码: /*00 */#include <SPI.h> #include <nRF24L01.h> #include <RF24.h> RF24 radio(9, 10);// CE, CSNconst char te…

dolphinscheduler 2.0.5 性能手动测试

目录&#x1f42c;官方配置文件说明&#x1f42c;测试并发量&#x1f420;线程数量设置100&#x1f420;线程数量设置200&#x1f420;线程数量设置500&#x1f42c;测试结论&#x1f42c;官方配置文件说明 官方说明 master.exec.threads&#xff1a; master工作线程数量,用于…

智能晾衣架(二)--功能实现

本文素材来源于红河学院 工学院 作者&#xff1a;赵德森 张艺锦 潘志慧 曹紫康 指导老师&#xff1a;江洁 张龙超 1. 自动升降功能 我们设计时采用了热释电传感器&#xff08;人体红外传感器&#xff09;&#xff0c;在热释电传感器感应到有人靠近时&#xff0c;晾衣架通…

C++:内存管理:C++内存管理详解(二):带你攻破内存管理

前言&#xff1a; 任何程序运行起来都需要分配内存空间存放该进程的资源信息&#xff0c;C程序也不例外。C程序中的变量、常量、函数、代码等等信息所存放的区域都有所不同&#xff0c;不同的区域又有不同的特性。 欺骗C进程 每一个C语言的程序被执行起来的时候系统为了方便开…

字符串的简单介绍和字符串的大小比较

以前就写过一篇关于String的文章&#xff0c;今天再来写一篇&#xff0c;更加深入了解一下String类 &#x1f550;1.String类的定义 &#x1f551;2.String类的创建 &#x1f552;3.字符串的大小比较 1.之前在C语言中我们已经学到了字符类型&#xff0c;但是C语言没有Strin…

Oracle-Rman duplicate文件坏块问题处理ORA-19849 19612

前言: 最近&#xff0c;在使用rman duplicate进行备库环境搭建时&#xff0c;遇到了ORA-19849 19612坏块报错&#xff0c;最终分析是发现由于网络的配置导致。 问题: 在 ORACLE 12.2.0.1.180417 通过RMAN duplicate进行备库初始化&#xff0c;在复制文件的过程中&#xff0c;…