因为datax目前不支持写入数据到kafka中,因此本文主要介绍如何基于DataX自定义KafkaWriter,用来同步数据到kafka中。本文偏向实战,datax插件开发理论宝典请参考官方文档:
https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md。
首先我们需要克隆datax源码到本地,此处不再赘述。然后使用开发工具打开datax项目,项目结构如下:
可以看到不论是reader还是writer都是独立的项目,所以我们也需要新建kafkawriter的项目。可以直接新建,也可以复制一个现有的writer项目然后进行修改。注意包结构需要与现有的writer保持一致,如下所示:
基本上所有插件的目录结构都是这样,main文件夹下面分为三个目录,分别为assembly、java、resources。其中assembly下面主要用来存放打包文件,java则是存放我们的代码,resources中存放插件相关的配置信息。
创建完kafkawriter项目之后,需要在datax的pom文件中添加上我们的插件,如下图所示:然后我们需要在datax的打包文件中加入我们的kafkawriter插件,如下图所示:接下来从上到下依次介绍各文件的内容。
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
该文件主要配置插件的打包信息,基本上所有的插件内容也都是类似的,主要修改的地方如图所示:
如果我们想自定义s3的writer,也只需要修改图中所标识的几个地方。
- KafkaWriter
该类为我们实现写入数据到kafka的主要逻辑实现类,其主要结构可以参照上文中提到的datax官方文档,代码示例如下,每个地方的处理逻辑可以参考代码中的注释。
package com.alibaba.datax.plugin.writer.kafkawriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.writer.kafkawriter.entity.*;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
//首先自定义的writer需要继承Writer类
public class KafkaWriter extends Writer {
//创建Job类继承Writer.Job
public static class Job extends Writer.Job {
private static final Logger logger = LoggerFactory.getLogger(Job.class);
//存放同步任务的配置信息
private Configuration conf = null;
//重写split方法,任务切分逻辑,writer切分主要依据上游reader任务的切分数
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(conf);
}
return configurations;
}
//重写init方法,进行相关的初始化操作
@Override
public void init() {
//获取同步任务相关的配置
this.conf = super.getPluginJobConf();
logger.info("kafka writer params:{}", conf.toJSON());
//校验配置中的必要参数
this.validateParameter();
}
private void validateParameter() {
this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf.getNecessaryValue(Key.COLUMN, KafkaWriterErrorCode.REQUIRED_VALUE);
}
//重写destroy方法,主要用来处理数据同步任务结束之后需要收尾的事情,比如删除同步过程中可能产生的临时文件,关闭连接等
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger logger = LoggerFactory.getLogger(Task.class);
//存放同步任务的配置信息
private Configuration conf;
//存放同步的数据表列信息
private List<String> columns;
//保存kafka的相关配置
private Properties properties;
//定义kafka生产者
private Producer<String, String> producer;
//数据同步的目标topic
private String topic;
//kafka消息header(如果单纯同步数据,不需要定义消息头,可以删除该字段)
private RecordHeaders recordHeaders = new RecordHeaders();
//控制消息是否格式化为cdc格式(如果单纯同步数据,不需要定义特殊的消息格式,可以删除该字段及下面代码的相关逻辑)
private Boolean cdcValue = false;
//源表主键信息(如果单纯同步数据,不需要定义特殊的消息格式,可以删除该字段及下面代码的相关逻辑)
private List<String> primaryKeys;
//init()方法主要对任务配置的解析赋值以及一些认证处理逻辑
@Override
public void init() {
this.conf = super.getPluginJobConf();
//kerberos逻辑处理,如果开启kerberos认证可以参考,如果没有可以删除
String haveKerberos = conf.getUnnecessaryValue(Key.HAVE_KERBEROS, "false", null);
if (StringUtils.isNotBlank(haveKerberos) && haveKerberos.equalsIgnoreCase("true")) {
String kerberosPrincipal = conf.getString(Key.KERBEROS_PRINCIPAL);
String kerberosKrb5ConfigPath = conf.getString(Key.KERBEROS_KRB5CONFIG_PATH);
String kerberosKeytabFilePath = conf.getString(Key.KERBEROS_KEYTABFILE_PATH);
if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKrb5ConfigPath) || StringUtils.isBlank(kerberosKeytabFilePath)) {
throw new DataXException(KafkaWriterErrorCode.KERBEROS_VALUE, KafkaWriterErrorCode.KERBEROS_VALUE.getDescription());
}
try {
LoginUtil.securityPrepare(kerberosPrincipal, kerberosKrb5ConfigPath, kerberosKeytabFilePath);
} catch (IOException e) {
throw new DataXException(KafkaWriterErrorCode.KERBEROS_AUTH, KafkaWriterErrorCode.KERBEROS_AUTH.getDescription());
}
}
//初始化message header,此处是业务特定需求需要给同步到kafka中的message加上header,如果有(特殊的处理逻辑,如果不需要可以删除)
if (StringUtils.isNotBlank(conf.getString(Key.KAFKAHEADER))) {
JSONObject jsonObject = JSONObject.parseObject(conf.getString(Key.KAFKAHEADER));
jsonObject.forEach((key, value) -> {
if (StringUtils.isBlank(String.valueOf(value)) || String.valueOf(value).equals("null")) {
RecordHeader recordHeader = new RecordHeader(key, null);
recordHeaders.add(recordHeader);
} else {
RecordHeader recordHeader = new RecordHeader(key, String.valueOf(value).getBytes(StandardCharsets.UTF_8));
recordHeaders.add(recordHeader);
}
});
}
//控制message格式的开关
if (conf.getBool(Key.CDCVALUE) != null) {
this.cdcValue = conf.getBool(Key.CDCVALUE);
}
//获取列属性并保存
this.columns = conf.getList(Key.COLUMN, String.class);
//获取源表主键信息并保存
this.primaryKeys = conf.getList(Key.PRIMARYKEYS, String.class);
//获取配置的目标topic
this.topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
//设置kafka配置信息
properties = new Properties();
properties.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
properties.put("key.serializer", conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
properties.put("value.serializer", conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
properties.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));
properties.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "0", null));
properties.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
producer = new KafkaProducer<String, String>(properties);
}
//prepare() 方法主要用来进行一些同步前的准备工作,比如创建目标topic
@Override
public void prepare() {
//创建目标topic
if (Boolean.valueOf(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
ListTopicsResult topicsResult = AdminClient.create(properties).listTopics();
try {
if (!topicsResult.names().get().contains(this.topic)) {
NewTopic newTopic = new NewTopic(
this.topic,
Integer.valueOf(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
Short.valueOf(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
);
AdminClient.create(properties).createTopics(Arrays.asList(newTopic));
}
} catch (Exception e) {
throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
}
}
//特殊处理,数据开始同步前,首先发送一条特定格式的消息(特殊处理逻辑,可以参考,不需要可以删除)
if (cdcValue) {
FirstCdcValueTemplate firstCdcValueTemplate = new FirstCdcValueTemplate();
JSONObject tableId = new JSONObject();
//tableId的值为topic名称
tableId.put("tableName", this.conf.getString(Key.TOPIC));
firstCdcValueTemplate.setTableId(tableId);
SchemaTemplate schemaTemplate = new SchemaTemplate();
schemaTemplate.setColumns(buildFirstMessageColumns());
if (this.primaryKeys != null && this.primaryKeys.size() > 0) {
schemaTemplate.setPrimaryKeys(this.primaryKeys);
} else {
schemaTemplate.setPrimaryKeys(Arrays.asList());
}
schemaTemplate.setPartitionKeys(Arrays.asList());
schemaTemplate.setOptions(new JSONObject());
firstCdcValueTemplate.setSchema(schemaTemplate);
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
null, null, null, JSON.toJSONString(firstCdcValueTemplate, JSONWriter.Feature.WriteMapNullValue), this.recordHeaders)
);
}
}
//依次写入数据到kafka
@Override
public void startWrite(RecordReceiver lineReceiver) {
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
null, null, null, buildMessage(record), this.recordHeaders)
);
}
}
//进行收尾工作,关闭生产者客户端
@Override
public void destroy() {
if (ObjectUtils.isNotEmpty(this.producer)) {
this.producer.close();
}
}
//组装特殊message格式
private List<ColumnTemplate> buildFirstMessageColumns() {
List<ColumnTemplate> columnTemplates = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(conf.getString(Key.KAFKAHEADER));
List<HeaderSchemaTemplate> schemaJson = jsonObject.getList("schemaJson", HeaderSchemaTemplate.class);
for (HeaderSchemaTemplate temp : schemaJson) {
ColumnTemplate columnTemplate = new ColumnTemplate();
columnTemplate.setName(temp.getName());
columnTemplate.setType(temp.getType());
columnTemplates.add(columnTemplate);
}
return columnTemplates;
}
//组装message格式
private String buildMessage(Record record) {
JSONObject jo = new JSONObject();
for (int i = 0; i < columns.size(); i++) {
String columnName = columns.get(i);
Column columnValue = record.getColumn(i);
if (!Objects.isNull(columnValue)) {
if (Objects.isNull(columnValue.getRawData())) {
jo.put(columnName, null);
} else {
switch (columnValue.getType()) {
case INT:
jo.put(columnName, columnValue.asBigInteger());
break;
case BOOL:
jo.put(columnName, columnValue.asBoolean());
break;
case LONG:
jo.put(columnName, columnValue.asLong());
break;
case DOUBLE:
jo.put(columnName, columnValue.asDouble());
break;
default:
jo.put(columnName, columnValue.asString());
}
}
} else {
jo.put(columnName, null);
}
}
if (cdcValue) {
ValueTemplate valueTemplate = new ValueTemplate();
valueTemplate.setBefore(null);
valueTemplate.setAfter(jo);
valueTemplate.setOp("c");
return JSON.toJSONString(valueTemplate, JSONWriter.Feature.WriteMapNullValue);
} else {
return jo.toJSONString();
}
}
}
}
可以注意到上文中的Task内部类中定义了几个特殊的变量:recordHeaders、cdcValue、primaryKeys,这几个变量主要是用来定义特殊的kafka消息格式,比如当前代码的逻辑是要将消息转换为CDC相关的格式,所以做了额外处理。可以参考该思路,如果有其他的类似的需求,也可以通过任务配置传递进来,然后构建消息的时候进行处理。
- KafkaWriterErrorCode
定义错误码,抛出异常的时候用来提示。
package com.alibaba.datax.plugin.writer.kafkawriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum KafkaWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("KafkaWriter-00", "您缺失了必须填写的参数值."),
KERBEROS_VALUE("KafkaWriter-02", "您缺失了必须填写的kerberos参数值."),
KERBEROS_AUTH("KafkaWriter-03", "kerberos认证失败"),
CREATE_TOPIC("KafkaWriter-01", "写入数据前检查topic或是创建topic失败.");
private final String code;
private final String description;
private KafkaWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code,
this.description);
}
}
- Key
该类定义数据同步任务配置文件中的Key
package com.alibaba.datax.plugin.writer.kafkawriter;
public class Key {
public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
public static final String TOPIC = "topic";
public static final String KEY_SERIALIZER = "keySerializer";
public static final String VALUE_SERIALIZER = "valueSerializer";
public static final String COLUMN = "column";
public static final String ACK = "ack";
public static final String BATCH_SIZE = "batchSize";
public static final String RETRIES = "retries";
public static final String NO_TOPIC_CREATE = "noTopicCreate";
public static final String TOPIC_NUM_PARTITION = "topicNumPartition";
public static final String TOPIC_REPLICATION_FACTOR = "topicReplicationFactor";
//是否开启kerberos认证
public static final String HAVE_KERBEROS = "haveKerberos";
public static final String KERBEROS_KEYTABFILE_PATH = "kerberosKeytabFilePath";
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
public static final String KERBEROS_KRB5CONFIG_PATH = "kerberosKrb5ConfigPath";
public static final String KAFKAHEADER = "kafkaHeader";
public static final String CDCVALUE = "cdcValue";
public static final String PRIMARYKEYS = "primarykeys";
}
- LoginUtil
该类主要是定义了kerberos认证的相关逻辑,可以参考
package com.alibaba.datax.plugin.writer.kafkawriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
public class LoginUtil {
private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
public enum Module {
KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
/**
* line operator string
*/
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf";
/**
* is IBM jdk or not
*/
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
/**
* IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
/**
* oracle jdk login module
*/
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
/**
* java security login file path
*/
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* 设置jaas.conf文件
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException {
String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX;
// windows路径下分隔符替换
jaasPath = jaasPath.replace("\\", "\\\\");
// 删除jaas文件
deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
}
/**
* 设置zookeeper服务端principal
*
* @param zkServerPrincipal
* @throws IOException
*/
public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null) {
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
}
if (!ret.equals(zkServerPrincipal)) {
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
}
}
/**
* 设置krb5文件
*
* @param krb5ConfFile
* @throws IOException
*/
public static void setKrb5Config(String krb5ConfFile) throws IOException {
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null) {
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile)) {
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* 写入jaas文件
*
* @throws IOException 写文件异常
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException {
FileWriter writer = new FileWriter(new File(jaasPath));
try {
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
} catch (IOException e) {
throw new IOException("Failed to create jaas.conf File");
} finally {
writer.close();
}
}
private static void deleteJaasFile(String jaasPath) throws IOException {
File jaasFile = new File(jaasPath);
if (jaasFile.exists()) {
if (!jaasFile.delete()) {
throw new IOException("Failed to delete exists jaas file.");
}
}
}
private static String getJaasConfContext(String principal, String keytabPath) {
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule) {
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
} else {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
public static void securityPrepare(String principal, String keyTabFilePath, String krb5ConfigPath) throws IOException {
// windows路径下分隔符替换
keyTabFilePath = keyTabFilePath.replace("\\", "\\\\");
krb5ConfigPath = krb5ConfigPath.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krb5ConfigPath);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
LoginUtil.setJaasFile(principal, keyTabFilePath);
}
/*
* 判断文件是否存在
*/
private static boolean isFileExists(String fileName) {
File file = new File(fileName);
return file.exists();
}
}
上文中截图的java目录下有一个entity包,该包是上文提到过的特殊处理kafka message格式所用到的一些实体类,没什么借鉴意义,就不进行展示了。
- plugin.json
该文件主要是用来定义插件的信息,方便datax加载到我们的插件。其中name与class属性需要跟实际名称及路径保持一致。
{
"name": "kafkawriter",
"class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter",
"description": "kafka writer",
"developer": "alibaba"
}
- plugin_job_template.json
该文件主要定义同步任务的配置示例。
{
"name": "kafkawriter",
"parameter": {
"bootstrapServers": "11.1.1.111:9092",
"topic": "test-topic",
"ack": "all",
"column": [
"id",
"name",
"description",
"weight"
],
"batchSize": 1000,
"retries": 0,
"keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
"valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
"topicNumPartition": 1,
"topicReplicationFactor": 1
}
}
上文基本将kafka writer中的相关代码都展示完毕,至此也是基本完成了kafkawriter插件的开发。打包完成之后,就可以愉快的进行测试了。