SeaTunnel扩展Transform插件,自定义转换插件

news2025/1/11 11:05:47

代码结构

在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

自定义转换插件功能说明

这是个适配KafkaSource的转换插件,接收到的原文格式为:

{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

需要转换为只保留cont里面的数据

{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

任务配置文件

env {

  # You can set engine configuration here STREAMING BATCH

  execution.parallelism = 1

  job.mode = "STREAMING"



  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"

}



source {

  # This is a example source plugin **only for test and demonstrate the feature source plugin**

   Kafka {

            bootstrap.servers = "xxxxx:9092"

            topic = "test_in2"

            consumer.group = "167321237613

            format="text"

            result_table_name="kafka"

        }

}



transform {

    ExtractFromCJ {

    source_table_name="kafka"

    result_table_name="kafka1"

    schema = {

        fields {

                NAME = "string"

                TABLE = "string"

                create_time = "string"

                ID="string"

            }

        }

    }

}



sink {

  kafka {

      source_table_name="kafka1"

      topic = "test_out2"

      bootstrap.servers = "xxxx:9092"

      kafka.request.timeout.ms = 60000

      semantics = EXACTLY_ONCE

  }

}

代码说明

XXXConfig代码,这个类主要用来保存transform的配置项

package org.apache.seatunnel.transform.extract;



import lombok.Getter;

import lombok.Setter;

import org.apache.seatunnel.api.configuration.Option;

import org.apache.seatunnel.api.configuration.Options;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;



import java.io.Serializable;

import java.util.Map;



@Getter

@Setter

public class ExtractFromCJTransformConfig implements Serializable {



    public static final Option<Map<String, String>> SCHEMA =

            Options.key("schema.fields")

                    .mapType()

                    .noDefaultValue()

                    .withDescription(

                            "Specify the field mapping relationship between input and output");



    private Map<String, String> fieldColumns;

    public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {

        ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();

        Map<String, String> fieldColumns = config.get(SCHEMA);

        extractFromCJTransformConfig.setFieldColumns(fieldColumns);

        return extractFromCJTransformConfig;

    }

}

XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

package org.apache.seatunnel.transform.extract;



import com.google.auto.service.AutoService;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.apache.seatunnel.api.configuration.util.OptionRule;

import org.apache.seatunnel.api.table.catalog.CatalogTable;

import org.apache.seatunnel.api.table.connector.TableTransform;

import org.apache.seatunnel.api.table.factory.Factory;

import org.apache.seatunnel.api.table.factory.TableFactoryContext;

import org.apache.seatunnel.api.table.factory.TableTransformFactory;



@AutoService(Factory.class)

public class ExtractFromCJTransformFactory implements TableTransformFactory {

    @Override

    public String factoryIdentifier() {

        return  "ExtractFromCJ";

    }



    @Override

    public OptionRule optionRule() {

        return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();

    }



    @Override

    public TableTransform createTransform(TableFactoryContext context) {

        CatalogTable catalogTable = context.getCatalogTable();

        ReadonlyConfig options = context.getOptions();

        ExtractFromCJTransformConfig extractFromCJTransformConfig =

                ExtractFromCJTransformConfig.of(options);

        return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);

    }

}

XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

package org.apache.seatunnel.transform.extract;



import cn.hutool.core.collection.CollUtil;

import cn.hutool.json.JSONObject;

import cn.hutool.json.JSONUtil;

import com.google.auto.service.AutoService;

import lombok.NoArgsConstructor;

import lombok.NonNull;

import lombok.extern.slf4j.Slf4j;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.apache.seatunnel.api.configuration.util.ConfigValidator;

import org.apache.seatunnel.api.table.catalog.CatalogTable;

import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;

import org.apache.seatunnel.api.table.catalog.Column;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;

import org.apache.seatunnel.api.table.catalog.PhysicalColumn;

import org.apache.seatunnel.api.table.catalog.PrimaryKey;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import org.apache.seatunnel.api.table.catalog.TableSchema;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.apache.seatunnel.api.transform.SeaTunnelTransform;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;



import java.util.ArrayList;

import java.util.List;

import java.util.stream.Collectors;



@AutoService(SeaTunnelTransform.class)

@NoArgsConstructor

@Slf4j

public class ExtractFromCJTransform extends AbstractCatalogSupportTransform {



    private ExtractFromCJTransformConfig config;

    protected SeaTunnelRowType inputRowType;

    @Override

    public String getPluginName() {

        return "ExtractFromCJ";

    }



    public ExtractFromCJTransform(

            @NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {

        super(catalogTable);

        this.config = config;

    }

    @Override

    protected void setConfig(Config pluginConfig) {

        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))

                .validate(new ExtractFromCJTransformFactory().optionRule());

        this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));

    }



    @Override

    protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {

        return inputRowType;

    }



    @Override

    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {

        Object content = inputRow.getFields()[0];

        String data = content.toString();

        Object[] outputDataArray = new Object[0];

        if (JSONUtil.isJson(data)) {

            JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");

            if (!cont.isEmpty()) {

                if (!CollUtil.isEmpty(this.config.getFieldColumns())) {

                    outputDataArray = new Object[this.config.getFieldColumns().size()];

                    int t = 0;

                    for (String key : this.config.getFieldColumns().keySet()) {

                        String value = cont.getStr(key);

                        outputDataArray[t] = value;

                        t++;

                    }

                } else {

                    outputDataArray = new Object[1];

                    outputDataArray[0] = JSONUtil.toJsonStr(cont);

                }

            }

        }

        SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);

        outputRow.setRowKind(inputRow.getRowKind());

        outputRow.setTableId(inputRow.getTableId());

        return outputRow;

    }



    @Override

    protected TableSchema transformTableSchema() {

        List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();

        List<ConstraintKey> outputConstraintKeys =

                inputCatalogTable.getTableSchema().getConstraintKeys().stream()

                        .map(ConstraintKey::copy)

                        .collect(Collectors.toList());

        PrimaryKey copiedPrimaryKey =

                inputCatalogTable.getTableSchema().getPrimaryKey() == null

                        ? null

                        : inputCatalogTable.getTableSchema().getPrimaryKey().copy();



        if (CollUtil.isEmpty(this.config.getFieldColumns())) {

            return TableSchema.builder()

                    .primaryKey(copiedPrimaryKey)

                    .columns(inputColumns)

                    .constraintKey(outputConstraintKeys)

                    .build();

        } else {

            List<Column> transformColumns = new ArrayList<>();

            for (String key : this.config.getFieldColumns().keySet()) {

                SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));

                transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));

            }

            return TableSchema.builder()

                    .primaryKey(copiedPrimaryKey)

                    .columns(transformColumns)

                    .constraintKey(outputConstraintKeys)

                    .build();

        }

    }



    @Override

    protected TableIdentifier transformTableIdentifier() {

        return inputCatalogTable.getTableId().copy();

    }

}

文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

执行结果

来源消息

结果消息

以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/971986.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Akka的Actor模拟Spark的Master和Worker工作机制

使用Akka的Actor模拟Spark的Master和Worker工作机制 Spark的Master和Worker协调工作原理 在 Apache Spark 中&#xff0c;Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程&#xff1a; Worker 启动后&#xff0c…

【Unity】VS Code 没有自动补全 MonoBehaviour 的方法

正常来说&#xff0c;在VS Code 输入类似 OnTriggerEnter2D等方法名时&#xff0c;VS Code会根据已经输入的前缀自动提示相关方法。 在不正常的情况下&#xff0c;根据StackOverFlow上面的回答&#xff0c;依次试过了 安装 .NET SDK安装 .NET Framework Dev PackVS Code安装 …

【ES6】js中的__proto__和prototype

在JavaScript中&#xff0c;__proto__和prototype都是用于实现对象继承的关键概念。 1、proto __proto__是一个非标准的属性&#xff0c;用于设置或获取一个对象的原型。这个属性提供了直接访问对象内部原型对象的途径。对于浏览器中的宿主对象和大多数对象来说&#xff0c;可…

【STM32】学习笔记-PWR(Power Control)电源控制

PWR&#xff08;Power Control&#xff09;电源控制 PWR&#xff08;Power Control&#xff09;电源控制是一种技术或设备&#xff0c;用于控制电源的开关和输出。它通常用于电源管理和节能&#xff0c;可以通过控制电源的工作状态来延长电子设备的使用寿命&#xff0c;减少能…

SQL注入-盲注 Burp盲注方法

文章目录 判断库名位数Burp 抓取数据包设置payload位置设置payload 1设置payload 2点击开始攻击 判断库名下表名的位数Burp 抓取数据包点击开始攻击 判断库名下第二张表名判断表名下的字段名判断表中具体数据 什么是盲注&#xff1f; 有时目标存在注入&#xff0c;但在页面上没…

Unity中Shader的UV扭曲效果的实现

文章目录 前言一、实现的思路1、在属性面板暴露一个 扭曲贴图的属性2、在片元结构体中&#xff0c;新增一个float2类型的变量&#xff0c;用于独立存储将用于扭曲的纹理的信息3、在顶点着色器中&#xff0c;根据需要使用TRANSFORM_TEX对Tilling 和 Offset 插值&#xff1b;以及…

使用QT操作Excel 表格的常用方法

VBA 简介 Microsoft Office软件通常使用VBA来扩展Windows的应用程序功能&#xff0c;Visual Basic for Applications&#xff08;VBA&#xff09;是一种Visual Basic的一种宏语言。 在VBA的参考手册中就可以看到具体函数、属性的用法&#xff0c;Qt操作Excel主要通过 QAxObj…

Redis 缓存预热+缓存雪崩+缓存击穿+缓存穿透

面试题&#xff1a; 缓存预热、雪萌、穿透、击穿分别是什么&#xff1f;你遇到过那几个情况&#xff1f;缓存预热你是怎么做的&#xff1f;如何造免或者减少缓存雪崩&#xff1f;穿透和击穿有什么区别&#xff1f;他两是一个意思还是载然不同&#xff1f;穿适和击穿你有什么解…

【每日一题】1041. 困于环中的机器人

1041. 困于环中的机器人 - 力扣&#xff08;LeetCode&#xff09; 在无限的平面上&#xff0c;机器人最初位于 (0, 0) 处&#xff0c;面朝北方。注意: 北方向 是y轴的正方向。南方向 是y轴的负方向。东方向 是x轴的正方向。西方向 是x轴的负方向。 机器人可以接受下列三条指令之…

入门力扣自学笔记277 C++ (题目编号:42)(动态规划)

42. 接雨水 题目&#xff1a; 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 1&#xff1a; 输入&#xff1a;height [0,1,0,2,1,0,1,3,2,1,2,1] 输出&#xff1a;6 解释&#xff1a;上面是由数组…

时序预测 | MATLAB实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测(风电功率预测)

时序预测 | MATLAB实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测&#xff08;风电功率预测&#xff09; 目录 时序预测 | MATLAB实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测&#xff08;风电功率预测&#xff09;预测效果基本介绍程序设计参考资料 预测效果…

小白学go基础03-了解Go项目的项目结构

我们先来看看第一个Go项目——Go语言自身——的项目结构是什么样的。Go项目的项目结构自1.0版本发布以来一直十分稳定&#xff0c;直到现在Go项目的顶层结构基本没有大的改变。 截至Go项目commit 1e3ffb0c&#xff08;2019.5.14&#xff09;&#xff0c;Go1.0 项目结构如下&am…

js+vue,前端关于页面滚动让头部菜单淡入淡出实现原理

今天遇到个需求&#xff1a;我这里借用小米商城的详情页做个比喻吧。 刚开始其商品详情页是这样的&#xff1a; 当滚动到一定高度时&#xff0c;是这样的&#xff1a; 可以看到当滚动到轮播图底下的时候&#xff0c;详情页的菜单完全显现出来。 以下上代码&#xff1a; HTML…

uniApp webview 中调用底座蓝牙打印功能异常

背景: 使用uniApp, 安卓底座 webView 方式开发; 调用方式采用H5 向 底座发送消息, 底座判断消息类型, 然后连接打印机进行打印; 内容通过指令集方式传递给打印机; 过程当中发现部分标签可以正常打印, 但又有部分不行,打印机没反应, 也没有报错; 原因分析: 对比标签内容…

医院安全(不良)事件上报系统源码 不良事件报告平台源码 前后端分离,支持二开

医院安全&#xff08;不良&#xff09;事件上报系统源码 系统定义&#xff1a; 规范医院安全&#xff08;不良&#xff09;事件的主动报告&#xff0c;增强风险防范意识&#xff0c;及时发现医院不良事件和安全隐患&#xff0c;将获取的医院安全信息进行分析反馈&#xff0c;…

“金融级”数字底座:从时代的“源启”,到“源启”的时代

今年初《数字中国建设整体布局规划》正式发布&#xff0c;这代表着数字中国建设迈向了实质的落地阶段&#xff0c;其背后的驱动就是遍及各行各业的数字化转型。 千姿百态、复杂多样的应用场景&#xff0c;可以看作是遍布数字中国的“点”&#xff1b;千行百业、各种类型的行业…

阿里云2核4G服务器5M带宽5年费用价格明细表

阿里云2核4G服务器5M带宽可以选择轻量应用服务器或云服务器ECS&#xff0c;轻量2核4G4M带宽服务器297元一年&#xff0c;2核4G云服务器ECS可以选择计算型c7、c6或通用算力型u1实例等&#xff0c;买5年可以享受3折优惠&#xff0c;阿腾云分享阿里云服务器2核4G5M带宽五年费用表&…

【系统编程】线程池以及API接口简介

(꒪ꇴ꒪ )&#xff0c;Hello我是祐言QAQ我的博客主页&#xff1a;C/C语言&#xff0c;数据结构&#xff0c;Linux基础&#xff0c;ARM开发板&#xff0c;网络编程等领域UP&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff0c;让我们成为一个强大的攻城狮&#xff0…

非比较排序——计数排序

本章gitee代码&#xff1a;计数排序 文章目录 &#x1f347;0. 前言&#x1f348;1. 思路&#x1f349;2. 代码实现&#x1f34a;3. 优势与缺陷&#x1f34b;4. 其他的非比较排序&#x1faf4;桶排序&#x1faf4;基数排序 &#x1f347;0. 前言 传统的排序方法通常需要逐个比…

嵌入式Linux开发实操(十五):nand flash接口开发(2)

通用NAND驱动程序支持几乎所有基于NAND的芯片,并将它们连接到Linux内核的内存技术设备(MTD)子系统。这个接口走的是nand的并口,可以在shell的/dev中看到设备,比如/mtd0、/mtd0ro…,mtdblock0、mtdblock1… sysfs在设备层次结构中提供了几个视角。设备必须挂在某条总线bus…