DATAX自定义KafkaWriter

news2024/11/25 20:15:51

因为datax目前不支持写入数据到kafka中,因此本文主要介绍如何基于DataX自定义KafkaWriter,用来同步数据到kafka中。本文偏向实战,datax插件开发理论宝典请参考官方文档:

https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md。

首先我们需要克隆datax源码到本地,此处不再赘述。然后使用开发工具打开datax项目,项目结构如下:

可以看到不论是reader还是writer都是独立的项目,所以我们也需要新建kafkawriter的项目。可以直接新建,也可以复制一个现有的writer项目然后进行修改。注意包结构需要与现有的writer保持一致,如下所示:

基本上所有插件的目录结构都是这样,main文件夹下面分为三个目录,分别为assembly、java、resources。其中assembly下面主要用来存放打包文件,java则是存放我们的代码,resources中存放插件相关的配置信息。

创建完kafkawriter项目之后,需要在datax的pom文件中添加上我们的插件,如下图所示:然后我们需要在datax的打包文件中加入我们的kafkawriter插件,如下图所示:接下来从上到下依次介绍各文件的内容。

<assembly

	xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"

	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">

	<id></id>

	<formats>

		<format>dir</format>

	</formats>

	<includeBaseDirectory>false</includeBaseDirectory>

	<fileSets>

		<fileSet>

			<directory>src/main/resources</directory>

			<includes>

				<include>plugin.json</include>

				<include>plugin_job_template.json</include>

			</includes>

			<outputDirectory>plugin/writer/kafkawriter</outputDirectory>

		</fileSet>

		<fileSet>

			<directory>target/</directory>

			<includes>

				<include>kafkawriter-0.0.1-SNAPSHOT.jar</include>

			</includes>

			<outputDirectory>plugin/writer/kafkawriter</outputDirectory>

		</fileSet>

	</fileSets>



	<dependencySets>

		<dependencySet>

			<useProjectArtifact>false</useProjectArtifact>

			<outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>

			<scope>runtime</scope>

		</dependencySet>

	</dependencySets>

</assembly>

该文件主要配置插件的打包信息,基本上所有的插件内容也都是类似的,主要修改的地方如图所示:

如果我们想自定义s3的writer,也只需要修改图中所标识的几个地方。

  • KafkaWriter

该类为我们实现写入数据到kafka的主要逻辑实现类,其主要结构可以参照上文中提到的datax官方文档,代码示例如下,每个地方的处理逻辑可以参考代码中的注释。

package com.alibaba.datax.plugin.writer.kafkawriter;



import com.alibaba.datax.common.element.Column;

import com.alibaba.datax.common.element.Record;

import com.alibaba.datax.common.exception.DataXException;

import com.alibaba.datax.common.plugin.RecordReceiver;

import com.alibaba.datax.common.spi.Writer;

import com.alibaba.datax.common.util.Configuration;

import com.alibaba.datax.plugin.writer.kafkawriter.entity.*;

import com.alibaba.fastjson2.JSON;

import com.alibaba.fastjson2.JSONObject;

import com.alibaba.fastjson2.JSONWriter;

import org.apache.commons.lang3.ObjectUtils;

import org.apache.commons.lang3.StringUtils;

import org.apache.kafka.clients.admin.AdminClient;

import org.apache.kafka.clients.admin.ListTopicsResult;

import org.apache.kafka.clients.admin.NewTopic;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.header.internals.RecordHeader;

import org.apache.kafka.common.header.internals.RecordHeaders;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.IOException;

import java.nio.charset.StandardCharsets;

import java.util.*;

//首先自定义的writer需要继承Writer类

public class KafkaWriter extends Writer {

	//创建Job类继承Writer.Job

    public static class Job extends Writer.Job {



        private static final Logger logger = LoggerFactory.getLogger(Job.class);

        //存放同步任务的配置信息

        private Configuration conf = null;



        //重写split方法,任务切分逻辑,writer切分主要依据上游reader任务的切分数

        @Override

        public List<Configuration> split(int mandatoryNumber) {

            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);

            for (int i = 0; i < mandatoryNumber; i++) {

                configurations.add(conf);

            }

            return configurations;

        }



        //重写init方法,进行相关的初始化操作

        @Override

        public void init() {

            //获取同步任务相关的配置

            this.conf = super.getPluginJobConf();

            logger.info("kafka writer params:{}", conf.toJSON());

            //校验配置中的必要参数

            this.validateParameter();

        }



        private void validateParameter() {

            this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);

            this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);

            this.conf.getNecessaryValue(Key.COLUMN, KafkaWriterErrorCode.REQUIRED_VALUE);

        }



        //重写destroy方法,主要用来处理数据同步任务结束之后需要收尾的事情,比如删除同步过程中可能产生的临时文件,关闭连接等

        @Override

        public void destroy() {



        }

    }



    public static class Task extends Writer.Task {



        private static final Logger logger = LoggerFactory.getLogger(Task.class);

		//存放同步任务的配置信息

        private Configuration conf;

		//存放同步的数据表列信息

        private List<String> columns;

		//保存kafka的相关配置

        private Properties properties;

		//定义kafka生产者

        private Producer<String, String> producer;

		//数据同步的目标topic

        private String topic;

		//kafka消息header(如果单纯同步数据,不需要定义消息头,可以删除该字段)

        private RecordHeaders recordHeaders = new RecordHeaders();

        //控制消息是否格式化为cdc格式(如果单纯同步数据,不需要定义特殊的消息格式,可以删除该字段及下面代码的相关逻辑)

        private Boolean cdcValue = false;

        //源表主键信息(如果单纯同步数据,不需要定义特殊的消息格式,可以删除该字段及下面代码的相关逻辑)

        private List<String> primaryKeys;



        //init()方法主要对任务配置的解析赋值以及一些认证处理逻辑

        @Override

        public void init() {

            this.conf = super.getPluginJobConf();

            //kerberos逻辑处理,如果开启kerberos认证可以参考,如果没有可以删除

            String haveKerberos = conf.getUnnecessaryValue(Key.HAVE_KERBEROS, "false", null);

            if (StringUtils.isNotBlank(haveKerberos) && haveKerberos.equalsIgnoreCase("true")) {

                String kerberosPrincipal = conf.getString(Key.KERBEROS_PRINCIPAL);

                String kerberosKrb5ConfigPath = conf.getString(Key.KERBEROS_KRB5CONFIG_PATH);

                String kerberosKeytabFilePath = conf.getString(Key.KERBEROS_KEYTABFILE_PATH);

                if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKrb5ConfigPath) || StringUtils.isBlank(kerberosKeytabFilePath)) {

                    throw new DataXException(KafkaWriterErrorCode.KERBEROS_VALUE, KafkaWriterErrorCode.KERBEROS_VALUE.getDescription());

                }

                try {

                    LoginUtil.securityPrepare(kerberosPrincipal, kerberosKrb5ConfigPath, kerberosKeytabFilePath);

                } catch (IOException e) {

                    throw new DataXException(KafkaWriterErrorCode.KERBEROS_AUTH, KafkaWriterErrorCode.KERBEROS_AUTH.getDescription());

                }

            }



            //初始化message header,此处是业务特定需求需要给同步到kafka中的message加上header,如果有(特殊的处理逻辑,如果不需要可以删除)

            if (StringUtils.isNotBlank(conf.getString(Key.KAFKAHEADER))) {

                JSONObject jsonObject = JSONObject.parseObject(conf.getString(Key.KAFKAHEADER));

                jsonObject.forEach((key, value) -> {

                    if (StringUtils.isBlank(String.valueOf(value)) || String.valueOf(value).equals("null")) {

                        RecordHeader recordHeader = new RecordHeader(key, null);

                        recordHeaders.add(recordHeader);

                    } else {

                        RecordHeader recordHeader = new RecordHeader(key, String.valueOf(value).getBytes(StandardCharsets.UTF_8));

                        recordHeaders.add(recordHeader);

                    }

                });

            }

			//控制message格式的开关

            if (conf.getBool(Key.CDCVALUE) != null) {

                this.cdcValue = conf.getBool(Key.CDCVALUE);

            }



            //获取列属性并保存

            this.columns = conf.getList(Key.COLUMN, String.class);

            //获取源表主键信息并保存

            this.primaryKeys = conf.getList(Key.PRIMARYKEYS, String.class);

            //获取配置的目标topic

            this.topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);

			//设置kafka配置信息

            properties = new Properties();

            properties.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));

            properties.put("key.serializer", conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));

            properties.put("value.serializer", conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));

            properties.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));

            properties.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "0", null));

            properties.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));

            producer = new KafkaProducer<String, String>(properties);

        }



        //prepare() 方法主要用来进行一些同步前的准备工作,比如创建目标topic

        @Override

        public void prepare() {

            //创建目标topic

            if (Boolean.valueOf(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {

                ListTopicsResult topicsResult = AdminClient.create(properties).listTopics();

                try {

                    if (!topicsResult.names().get().contains(this.topic)) {

                        NewTopic newTopic = new NewTopic(

                                this.topic,

                                Integer.valueOf(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),

                                Short.valueOf(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))

                        );

               

                        AdminClient.create(properties).createTopics(Arrays.asList(newTopic));

                    }

                } catch (Exception e) {

                    throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());

                }

            }

            //特殊处理,数据开始同步前,首先发送一条特定格式的消息(特殊处理逻辑,可以参考,不需要可以删除)

            if (cdcValue) {

                FirstCdcValueTemplate firstCdcValueTemplate = new FirstCdcValueTemplate();

                JSONObject tableId = new JSONObject();

                //tableId的值为topic名称

                tableId.put("tableName", this.conf.getString(Key.TOPIC));

                firstCdcValueTemplate.setTableId(tableId);

                SchemaTemplate schemaTemplate = new SchemaTemplate();

                schemaTemplate.setColumns(buildFirstMessageColumns());

                if (this.primaryKeys != null && this.primaryKeys.size() > 0) {

                    schemaTemplate.setPrimaryKeys(this.primaryKeys);

                } else {

                    schemaTemplate.setPrimaryKeys(Arrays.asList());

                }

                schemaTemplate.setPartitionKeys(Arrays.asList());

                schemaTemplate.setOptions(new JSONObject());

                firstCdcValueTemplate.setSchema(schemaTemplate);

                producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),

                        null, null, null, JSON.toJSONString(firstCdcValueTemplate, JSONWriter.Feature.WriteMapNullValue), this.recordHeaders)

                );

            }

        }



        //依次写入数据到kafka

        @Override

        public void startWrite(RecordReceiver lineReceiver) {

            Record record = null;

            while ((record = lineReceiver.getFromReader()) != null) {

                producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),

                        null, null, null, buildMessage(record), this.recordHeaders)

                );

            }

        }



        //进行收尾工作,关闭生产者客户端

        @Override

        public void destroy() {

            if (ObjectUtils.isNotEmpty(this.producer)) {

                this.producer.close();

            }

        }



        //组装特殊message格式

        private List<ColumnTemplate> buildFirstMessageColumns() {

            List<ColumnTemplate> columnTemplates = new ArrayList<>();

            JSONObject jsonObject = JSONObject.parseObject(conf.getString(Key.KAFKAHEADER));

            List<HeaderSchemaTemplate> schemaJson = jsonObject.getList("schemaJson", HeaderSchemaTemplate.class);

            for (HeaderSchemaTemplate temp : schemaJson) {

                ColumnTemplate columnTemplate = new ColumnTemplate();

                columnTemplate.setName(temp.getName());

                columnTemplate.setType(temp.getType());

                columnTemplates.add(columnTemplate);

            }

            return columnTemplates;

        }



        //组装message格式

        private String buildMessage(Record record) {

            JSONObject jo = new JSONObject();

            for (int i = 0; i < columns.size(); i++) {

                String columnName = columns.get(i);

                Column columnValue = record.getColumn(i);

                if (!Objects.isNull(columnValue)) {

                    if (Objects.isNull(columnValue.getRawData())) {

                        jo.put(columnName, null);

                    } else {

                        switch (columnValue.getType()) {

                            case INT:

                                jo.put(columnName, columnValue.asBigInteger());

                                break;

                            case BOOL:

                                jo.put(columnName, columnValue.asBoolean());

                                break;

                            case LONG:

                                jo.put(columnName, columnValue.asLong());

                                break;

                            case DOUBLE:

                                jo.put(columnName, columnValue.asDouble());

                                break;

                            default:

                                jo.put(columnName, columnValue.asString());

                        }

                    }

                } else {

                    jo.put(columnName, null);

                }

            }

            if (cdcValue) {

                ValueTemplate valueTemplate = new ValueTemplate();

                valueTemplate.setBefore(null);

                valueTemplate.setAfter(jo);

                valueTemplate.setOp("c");

                return JSON.toJSONString(valueTemplate, JSONWriter.Feature.WriteMapNullValue);

            } else {

                return jo.toJSONString();

            }

        }



    }



}

可以注意到上文中的Task内部类中定义了几个特殊的变量:recordHeaders、cdcValue、primaryKeys,这几个变量主要是用来定义特殊的kafka消息格式,比如当前代码的逻辑是要将消息转换为CDC相关的格式,所以做了额外处理。可以参考该思路,如果有其他的类似的需求,也可以通过任务配置传递进来,然后构建消息的时候进行处理。

  • KafkaWriterErrorCode

定义错误码,抛出异常的时候用来提示。

package com.alibaba.datax.plugin.writer.kafkawriter;



import com.alibaba.datax.common.spi.ErrorCode;



public enum KafkaWriterErrorCode implements ErrorCode {



    REQUIRED_VALUE("KafkaWriter-00", "您缺失了必须填写的参数值."),



    KERBEROS_VALUE("KafkaWriter-02", "您缺失了必须填写的kerberos参数值."),

    KERBEROS_AUTH("KafkaWriter-03", "kerberos认证失败"),



    CREATE_TOPIC("KafkaWriter-01", "写入数据前检查topic或是创建topic失败.");



    private final String code;

    private final String description;



    private KafkaWriterErrorCode(String code, String description) {

        this.code = code;

        this.description = description;

    }



    @Override

    public String getCode() {

        return this.code;

    }



    @Override

    public String getDescription() {

        return this.description;

    }



    @Override

    public String toString() {

        return String.format("Code:[%s], Description:[%s].", this.code,

                this.description);

    }



}
  • Key

该类定义数据同步任务配置文件中的Key

package com.alibaba.datax.plugin.writer.kafkawriter;



public class Key {



    public static final String BOOTSTRAP_SERVERS = "bootstrapServers";



    public static final String TOPIC = "topic";



    public static final String KEY_SERIALIZER = "keySerializer";



    public static final String VALUE_SERIALIZER = "valueSerializer";



    public static final String COLUMN = "column";



    public static final String ACK = "ack";



    public static final String BATCH_SIZE = "batchSize";



    public static final String RETRIES = "retries";



    public static final String NO_TOPIC_CREATE = "noTopicCreate";



    public static final String TOPIC_NUM_PARTITION = "topicNumPartition";



    public static final String TOPIC_REPLICATION_FACTOR = "topicReplicationFactor";



    //是否开启kerberos认证

    public static final String HAVE_KERBEROS = "haveKerberos";



    public static final String KERBEROS_KEYTABFILE_PATH = "kerberosKeytabFilePath";



    public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";



    public static final String KERBEROS_KRB5CONFIG_PATH = "kerberosKrb5ConfigPath";



    public static final String KAFKAHEADER = "kafkaHeader";



    public static final String CDCVALUE = "cdcValue";



    public static final String PRIMARYKEYS = "primarykeys";



}
  • LoginUtil

该类主要是定义了kerberos认证的相关逻辑,可以参考

package com.alibaba.datax.plugin.writer.kafkawriter;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.File;

import java.io.FileWriter;

import java.io.IOException;



public class LoginUtil {

    private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);



    public enum Module {

        KAFKA("KafkaClient"), ZOOKEEPER("Client");



        private String name;



        private Module(String name) {

            this.name = name;

        }



        public String getName() {

            return name;

        }

    }



    /**

     * line operator string

     */

    private static final String LINE_SEPARATOR = System.getProperty("line.separator");



    /**

     * jaas file postfix

     */

    private static final String JAAS_POSTFIX = ".jaas.conf";



    /**

     * is IBM jdk or not

     */

    private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");



    /**

     * IBM jdk login module

     */

    private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";



    /**

     * oracle jdk login module

     */

    private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";



    /**

     * Zookeeper quorum principal.

     */

    public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";



    /**

     * java security krb5 file path

     */

    public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";



    /**

     * java security login file path

     */

    public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";



    /**

     * 设置jaas.conf文件

     *

     * @param principal

     * @param keytabPath

     * @throws IOException

     */

    public static void setJaasFile(String principal, String keytabPath)

            throws IOException {

        String jaasPath =

                new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")

                        + JAAS_POSTFIX;

        // windows路径下分隔符替换

        jaasPath = jaasPath.replace("\\", "\\\\");

        // 删除jaas文件

        deleteJaasFile(jaasPath);

        writeJaasFile(jaasPath, principal, keytabPath);

        System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);

    }



    /**

     * 设置zookeeper服务端principal

     *

     * @param zkServerPrincipal

     * @throws IOException

     */

    public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {

        System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);

        String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);

        if (ret == null) {

            throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");

        }

        if (!ret.equals(zkServerPrincipal)) {

            throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");

        }

    }



    /**

     * 设置krb5文件

     *

     * @param krb5ConfFile

     * @throws IOException

     */

    public static void setKrb5Config(String krb5ConfFile) throws IOException {

        System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);

        String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);

        if (ret == null) {

            throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");

        }

        if (!ret.equals(krb5ConfFile)) {

            throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");

        }

    }



    /**

     * 写入jaas文件

     *

     * @throws IOException 写文件异常

     */

    private static void writeJaasFile(String jaasPath, String principal, String keytabPath)

            throws IOException {

        FileWriter writer = new FileWriter(new File(jaasPath));

        try {

            writer.write(getJaasConfContext(principal, keytabPath));

            writer.flush();

        } catch (IOException e) {

            throw new IOException("Failed to create jaas.conf File");

        } finally {

            writer.close();

        }

    }



    private static void deleteJaasFile(String jaasPath) throws IOException {

        File jaasFile = new File(jaasPath);

        if (jaasFile.exists()) {

            if (!jaasFile.delete()) {

                throw new IOException("Failed to delete exists jaas file.");

            }

        }

    }



    private static String getJaasConfContext(String principal, String keytabPath) {

        Module[] allModule = Module.values();

        StringBuilder builder = new StringBuilder();

        for (Module modlue : allModule) {

            builder.append(getModuleContext(principal, keytabPath, modlue));

        }

        return builder.toString();

    }



    private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {

        StringBuilder builder = new StringBuilder();

        if (IS_IBM_JDK) {

            builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);

            builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);

            builder.append("credsType=both").append(LINE_SEPARATOR);

            builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);

            builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);

            builder.append("debug=true;").append(LINE_SEPARATOR);

            builder.append("};").append(LINE_SEPARATOR);

        } else {

            builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);

            builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);

            builder.append("useKeyTab=true").append(LINE_SEPARATOR);

            builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);

            builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);

            builder.append("useTicketCache=false").append(LINE_SEPARATOR);

            builder.append("storeKey=true").append(LINE_SEPARATOR);

            builder.append("debug=true;").append(LINE_SEPARATOR);

            builder.append("};").append(LINE_SEPARATOR);

        }

        return builder.toString();

    }



    public static void securityPrepare(String principal, String keyTabFilePath, String krb5ConfigPath) throws IOException {

        // windows路径下分隔符替换

        keyTabFilePath = keyTabFilePath.replace("\\", "\\\\");

        krb5ConfigPath = krb5ConfigPath.replace("\\", "\\\\");



        LoginUtil.setKrb5Config(krb5ConfigPath);

        LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");

        LoginUtil.setJaasFile(principal, keyTabFilePath);

    }



    /*

     * 判断文件是否存在

     */

    private static boolean isFileExists(String fileName) {

        File file = new File(fileName);

        return file.exists();

    }



}

上文中截图的java目录下有一个entity包,该包是上文提到过的特殊处理kafka message格式所用到的一些实体类,没什么借鉴意义,就不进行展示了。

  • plugin.json

该文件主要是用来定义插件的信息,方便datax加载到我们的插件。其中name与class属性需要跟实际名称及路径保持一致。

{

    "name": "kafkawriter",

    "class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter",

    "description": "kafka writer",

    "developer": "alibaba"

}
  • plugin_job_template.json

该文件主要定义同步任务的配置示例。

{

    "name": "kafkawriter",

    "parameter": {

        "bootstrapServers": "11.1.1.111:9092",

        "topic": "test-topic",

        "ack": "all",

        "column": [

            "id",

            "name",

            "description",

            "weight"

        ],

        "batchSize": 1000,

        "retries": 0,

        "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",

        "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",

        "topicNumPartition": 1,

        "topicReplicationFactor": 1

    }

}

上文基本将kafka writer中的相关代码都展示完毕,至此也是基本完成了kafkawriter插件的开发。打包完成之后,就可以愉快的进行测试了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2035743.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年TI杯E题-三子棋游戏装置方案分享-jdk123团队-第二弹 手搓机械臂

第一弹赛题的选择与前期方案的准备 opencv调用摄像头bug的解决 机械臂的组装 采用三个舵机&#xff0c;组成一个三自由度的机械臂。 并且利用电磁吸盘的方式&#xff0c;完成对棋子的抓取工作&#xff0c;后面的事实证明&#xff0c;在预算不足的情况下&#xff0c;队友手搓…

顺序表的实现——数据结构

线性表 文章目录 线性表线性表的定义和基本操作线性表的定义线性表的基本操作 线性表的顺序表示顺序表的定义顺序表的实现——静态分配顺序表的实现——动态分配顺序表的特点 线性表的定义和基本操作 线性表的定义 线性表&#xff08;Linear List&#xff09;的定义 ​ 线性…

多进程多线程

exec 系列函数 结束代码函数 atexec 函数 结束标志&#xff0c;从下往上 先出2再出1 void cleanup1() {printf("clean1"); }void cleanup2() {printf("clean2"); }atexec(cleanup1) atexec(cleanup2)

28. 找出字符串中第一个匹配项的下标【 力扣(LeetCode) 】

一、题目描述 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 二、测试用例 示例 1&#xff1a; 输…

Cmake编译工程

Cmake目录结构&#xff1a;项目主目录中会放一个CmakeList.txt的文本文档&#xff0c;后期使用cmake指令时候&#xff0c;依赖的就是该文档 1.包含源文件的子文件夹包含Cmakelist.txt文件时&#xff0c;主目录的Cmakelists.txtx要通过add_subdirector添加子目录 2.包含源文件…

算法3:二分查找(下)

文章目录 寻找峰值寻找旋转数组最小值 寻找峰值 class Solution { public:int findPeakElement(vector<int>& nums) {int left 0, right nums.size() - 1;while(left < right){int mid left (right - left) / 2;if(nums[mid] < nums[mid 1])left mid 1;…

opencv-python图像增强三:图像清晰度增强

文章目录 一、简介&#xff1a;二、图像清晰度增强方案&#xff1a;三、算法实现步骤3.1高反差保留实现3.2. usm锐化3.3 Overlay叠加 四&#xff1a;整体代码实现五&#xff1a;效果 一、简介&#xff1a; 你是否有过这样的烦恼&#xff0c;拍出来的照片总是不够清晰&#xff…

视频汇聚/安防监控综合平台EasyCVR接入海康私有协议EHOME显示失败是什么原因?

安防监控/视频综合管理平台/视频集中存储/磁盘阵列EasyCVR视频汇聚平台&#xff0c;支持多种视频格式和编码方式&#xff08;H.264/H.265&#xff09;&#xff0c;能够轻松对接各类前端监控设备&#xff0c;实现视频流的统一接入与集中管理。安防监控EasyCVR平台支持多种流媒体…

Stable Diffusion【 ControlNet实战】OpenPose 轻松制作武侠动作

前言 hello&#xff0c;大家好** 好了&#xff0c;进入正题。如何通过Stable Diffusion ControlNet来进行姿态控制。来一起来看下老徐的简单示例&#xff1a; 老徐参数设置&#xff1a; **大模型&#xff1a;**墨幽人造人_v1080 所有的AI设计工具&#xff0c;模型和插件&…

25届秋招网络安全面试资料库

吉祥知识星球http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247485367&idx1&sn837891059c360ad60db7e9ac980a3321&chksmc0e47eebf793f7fdb8fcd7eed8ce29160cf79ba303b59858ba3a6660c6dac536774afb2a6330#rd 《网安面试指南》http://mp.weixin.qq.com/s?…

机器人中的wrench(力旋量)

在机器人领域&#xff0c;wrench&#xff08;力旋量&#xff09;是一个由力和力矩组成的6D空间向量&#xff0c;用于描述刚体在空间中受到的力和力矩的综合作用。具体来说&#xff0c;wrench可以看作是一个包含线性分量和旋转分量的矢量&#xff0c;其中线性分量代表作用在刚体…

时间序列+预训练大模型!最新开源成果性能暴涨42.8%

今天我们来聊一个新兴的、创新空间很大的方向&#xff1a;时间序列预训练大模型。 预训练大模型因为在大规模多领域的数据集上进行训练&#xff0c;能学习到丰富的、跨领域的时间序列表示&#xff0c;在面对新的、没见过的时间序列数据时&#xff0c;它能够表现出更强的泛化性…

快速排序效率

为了搞清楚快速排序的效率&#xff0c;我们先从分区开始。分解来看&#xff0c;你会发现它包含两种步骤。 ❏ 比较&#xff1a;每个值都要与轴做比较。 ❏ 交换&#xff1a;在适当时候将左右指针所指的两个值交换位置。 一次分区至少有N次比较&#xff0c;即数组的每个值都要…

打开第四十二天:买卖股票的最佳时机IV、最佳买卖股票时机含冷冻期、买卖股票的最佳时机含手续费

一、买卖股票的最佳时机IV&#xff08;困难&#xff09; 题目 文章 视频 这道题目可以说是上一题的进阶版&#xff0c;这里要求至多有k次交易。 确定dp数组以及下标的含义 在上一题中定义了一个二维dp数组&#xff0c;本题其实依然可以用一个二维dp数组。使用二维数组 dp…

wireshark使用介绍及案例分享

一、wireshark介绍 1、定义 wireshark是非常流行的网络封包分析软件,简称小鲨鱼,功能十分强大。可以截取各种网络封包,显示网络封包的详细信息。对应的,linux下的抓包工具是 tcpdump。 1.1、网络基础 参考TCP/IP五层模型,帧结构如下: 帧字段 帧字段含义 Frame 物理层的…

百元蓝牙耳机哪个牌子的比较好?四款百元必入热门机型盘点

一款优秀的蓝牙耳机都能极大提升我们的使用体验&#xff0c;然而&#xff0c;对于大多数消费者而言&#xff0c;高端蓝牙耳机昂贵的价格常常令人望而却步&#xff0c;幸运的是&#xff0c;市场上有很多性价比极高的品牌提供了百元左右的优质选择&#xff0c;那么百元蓝牙耳机哪…

基于STM32开发的智能电能监测系统

目录 引言环境准备工作 硬件准备软件安装与配置系统设计 系统架构硬件连接代码实现 初始化代码控制代码应用场景 家庭电能监测工业用电管理常见问题及解决方案 常见问题解决方案结论 1. 引言 智能电能监测系统通过实时采集电流、电压等电力参数&#xff0c;计算电能消耗&…

FPGA开发——UART串口通信的介绍和回环实验框架构建

一、简介 1、原理 UART&#xff08;Universal Asynchronous Receiver/Transmitter&#xff0c;通用异步收发器&#xff09;是一种广泛使用的串行通信协议&#xff0c;特别适用于微控制器、计算机和各种嵌入式设备之间的数据通信。 UART是一种异步串行通信方式&#xff0c;通过…

Prometheus+Grafana-1-基础介绍及安装

一、体系架构(了解) 数据采集流程 说白了就是采集数据->计算是否超过阈值->发起警告 Prometheus查询界面如下 1.报警简介 展现形式&#xff1a;短信&#xff0c;邮件&#xff0c;电话&#xff0c;通讯软件。 阈值(Trigger Value)&#xff0c;如达到阈值可以触发预警。…

巨详细的规则引擎 Drools——小白也可食用

巨详细的规则引擎 Drools——小白也可食用 一、问题1.1、传统做法1.2、存在的问题1.3、引入 二、规则引擎概述2.1、什么是规则引擎2.2、使用规则引擎的优势2.3、规则引擎应用场景2.4、Drools介绍 三、Drools入门案例3.1、创建Springboot项目3.2、引入依赖3.3、添加Drools配置类…