使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

news2025/1/11 6:05:10

file

版本说明:

SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT

引擎说明:

Flink:1.16.2

Zeta:官方自带

前言

近些时间,我们正好接手一个数据集成项目,数据上游方是给我们投递到Kafka,我们一开始的技术选型是SpringBoot+Flink对上游数据进行加工处理(下文简称:方案一),由于测试不到位,后来到线上,发现数据写入效率完全不符合预期。后来将目光转到开源项目SeaTunnel上面,发现Source支持Kafka,于是开始研究测试,开发环境测试了500w+数据,发现效率在10000/s左右。果断放弃方案一,采取SeaTunnel对数据进行集成加工(下文简称:方案二)。在SeaTunnel研究的过程中,总结了两种方法,方法二相较于方法一,可以实现全场景使用,无需担心字段值里面各种意想不到的字符对数据落地造成错位现象的发生。

对比

file

在方案二的基础上又衍生出两种方法 file

所以,在经过长时间的探索和我们线上验证得出结论,建议使用方案二的方法二。

好了,我们进入正文,主篇幅主要介绍方案二中的两种方法,让大家主观的感受SeaTunnel的神奇。

方案一 Springboot+Flink实现Kafka 复杂JSON的解析

网上案例很多,在此不做过多介绍。

方案二 SeaTunnel实现Kafka复杂JSON的解析

在开始介绍之前,我们看一下我们上游方投递Kafka的Json样例数据(对部分数据进行了敏感处理),如下:

    "magic": "a***G",
    "type": "D***",
    "headers": null,
    "messageSchemaId": null,
    "messageSchema": null,
    "message": {
        "data": {
            "LSH": "187eb13****l0214723",
            "NSRSBH": "9134****XERG56",
            "QMYC": "01*****135468",
            "QMZ": "1Zr*****UYGy%2F5bOWtrh",
            "QM_SJ": "2023-05-05 16:42:10.000000",
            "YX_BZ": "Y",
            "ZGHQ_BZ": "Y",
            "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
            "SKSSQ": 202304,
            "SWJG_DM": "",
            "SWRY_DM": "00",
            "CZSJ": "2023-05-05 16:42:10.000000",
            "YNSRSBH": "9134****XERG56",
            "SJTBSJ": "2023-06-26 19:29:59.000",
            "SJCZBS": "I"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "12440977",
            "timestamp": "2023-06-26T19:29:59.673000",
            "streamPosition": "00****3.16",
            "transactionId": "000***0006B0002",
            "changeMask": "0FFF***FF",
            "columnMask": "0F***FFFF",
            "transactionEventCounter": 1,
            "transactionLastEvent": false
        }
    }
}

方法一、不通过UDF函数实现

存在问题:字段值存在分隔符,例如‘,’ 则数据在落地的时候会发生错位现象。

该方法主要使用官网 transform-v2的各种转换插件进行实现,主要用到的插件有 ReplaceSplit以及Sql实现

ST脚本:(ybjc_qrqm.conf)

env {
  execution.parallelism = 100
  job.mode = "STREAMING"
  job.name = "kafka2mysql_ybjc"
  execution.checkpoint.interval = 60000
}

source {
  Kafka {
    result_table_name = "DZFP_***_QRQM1"
    topic = "DZFP_***_QRQM"
    bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
    schema = {
      fields {
        message =  {
            data = {
                LSH = "string",
                NSRSBH =  "string",
                QMYC =  "string",
                QMZ =  "string",
                QM_SJ =  "string",
                YX_BZ =  "string",
                ZGHQ_BZ =  "string",
                ZGHQ_SJ =  "string",
                SKSSQ =  "string",
                SWJG_DM = "string",
                SWRY_DM = "string",
                CZSJ = "string",
                YNSRSBH = "string",
                SJTBSJ = "string",
                SJCZBS = "string"
            }
        }
      }  
    }
      start_mode = "earliest"
    #start_mode.offsets = {
    #     0 = 0
    #     1 = 0
    #     2 = 0
    #}    
    kafka.config = {
      auto.offset.reset = "earliest"
      enable.auto.commit = "true"
        # max.poll.interval.ms = 30000000
        max.partition.fetch.bytes = "5242880"
        session.timeout.ms = "30000"
      max.poll.records = "100000"
    }
  }
}

transform {
 Replace {
    source_table_name = "DZFP_***_QRQM1"
    result_table_name = "DZFP_***_QRQM2"
    replace_field = "message"
    pattern = "[["
    replacement = ""
    #is_regex = true
      #replace_first = true
  }
  Replace {
    source_table_name = "DZFP_***_QRQM2"
    result_table_name = "DZFP_***_QRQM3"
    replace_field = "message"
    pattern = "]]"
    replacement = ""
    #is_regex = true
      #replace_first = true
  }

  Split {
    source_table_name = "DZFP_***_QRQM3"
    result_table_name = "DZFP_***_QRQM4"
    # 存在问题: 如果字段值存在分隔符 separator,则数据会错位
    separator = ","
    split_field = "message"
    # 你的第一个字段包含在zwf5里面,,前五个占位符是固定的。
    output_fields = [zwf1,zwf2,zwf3,zwf4,zwf5,nsrsbh,qmyc,qmz,qm_sj,yx_bz,zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,sjczbs]
  }

  sql{
   source_table_name = "DZFP_***_QRQM4"
   query = "select replace(zwf5 ,'fields=[','') as lsh,nsrsbh,trim(qmyc) as qmyc,qmz,qm_sj,yx_bz, zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,replace(sjczbs,']}]}','') as sjczbs  from DZFP_DZDZ_QRPT_YWRZ_QRQM4 where skssq <> ' null'"
   result_table_name = "DZFP_***_QRQM5"
  }

}

sink {
    Console {
        source_table_name = "DZFP_***_QRQM5"
    }

    jdbc {
        source_table_name = "DZFP_***_QRQM5"
        url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "pwd"
        batch_size = 200000
        database = "dbname"
        table = "tablename"
                generate_sink_sql = true
        primary_keys = ["nsrsbh","skssq"]
    } 
}

正常写入数据是可以写入了。

写入成功如下:

● kafka源数据: file ● tidb目标数据: file 现在我们模拟给kafka发送一条数据,其中,SJTBSJ字段我在中间设置一个, 是逗号。

原始值:2023-06-26 19:29:59.000 更改之后的值2023-06-26 19:29:59.0,00

往topic生产一条数据命令
kafka-console-producer.sh --topic DZFP_***_QRQM --broker-list centos1:19092,centos2:19092,centos3:19092

发送如下:

    "magic": "a***G",
    "type": "D***",
    "headers": null,
    "messageSchemaId": null,
    "messageSchema": null,
    "message": {
        "data": {
            "LSH": "187eb13****l0214723",
            "NSRSBH": "9134****XERG56",
            "QMYC": "01*****135468",
            "QMZ": "1Zr*****UYGy%2F5bOWtrh",
            "QM_SJ": "2023-05-05 16:42:10.000000",
            "YX_BZ": "Y",
            "ZGHQ_BZ": "Y",
            "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
            "SKSSQ": 202304,
            "SWJG_DM": "",
            "SWRY_DM": "00",
            "CZSJ": "2023-05-05 16:42:10.000000",
            "YNSRSBH": "9134****XERG56",
            "SJTBSJ": "2023-06-26 19:29:59.0,00",
            "SJCZBS": "I"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "12440977",
            "timestamp": "2023-06-26T19:29:59.673000",
            "streamPosition": "00****3.16",
            "transactionId": "000***0006B0002",
            "changeMask": "0FFF***FF",
            "columnMask": "0F***FFFF",
            "transactionEventCounter": 1,
            "transactionLastEvent": false
        }
    }
}

写入之后,发现数据错位了。 file

结论:其实这个问题线上还是能遇到的,比如地址字段里面含有逗号,备注信息里面含有逗号等等,这种现象是不可避免的,所以此种方案直接pass。对数据危害性极大!可以处理简单的数据,当做一种思路。

方法二:通过UDF函数实现

该方法通过UDF函数扩展(https://seatunnel.apache.org/docs/2.3.2/transform-v2/sql-udf)的方式,实现嵌套kafka source json源数据的解析。可以大大简化ST脚本的配置

ST脚本:(ybjc_qrqm_yh.conf)

env {
    execution.parallelism = 5
    job.mode = "STREAMING"
    job.name = "kafka2mysql_ybjc_yh"
    execution.checkpoint.interval = 60000
}

source {
    Kafka {
        result_table_name = "DZFP_***_QRQM1"
        topic = "DZFP_***_QRQM"
        bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
        schema = {
        fields {
             message =  {
                data = "map<string,string>"
             }
        }  
        }
        start_mode = "earliest"
        #start_mode.offsets = {
        #     0 = 0
        #     1 = 0
        #     2 = 0
        #}    
        kafka.config = {
        auto.offset.reset = "earliest"
        enable.auto.commit = "true"
        # max.poll.interval.ms = 30000000
        max.partition.fetch.bytes = "5242880"
        session.timeout.ms = "30000"
        max.poll.records = "100000"
        }
    }
}
transform {
    sql{
        source_table_name = "DZFP_***_QRQM1"
        result_table_name = "DZFP_***_QRQM2"
        # 这里的qdmx就是我自定义的UDF函数,具体实现下文详细讲解。。。
        query = "select qdmx(message,'lsh') as lsh,qdmx(message,'nsrsbh') as nsrsbh,qdmx(message,'qmyc') as qmyc,qdmx(message,'qmz') as qmz,qdmx(message,'qm_sj') as qm_sj,qdmx(message,'yx_bz') as yx_bz,qdmx(message,'zghq_bz') as zghq_bz,qdmx(message,'zghq_sj') as zghq_sj,qdmx(message,'skssq') as skssq,qdmx(message,'swjg_dm') as swjg_dm,qdmx(message,'swry_dm') as swry_dm,qdmx(message,'czsj') as czsj,qdmx(message,'ynsrsbh') as ynsrsbh, qdmx(message,'sjtbsj') as sjtbsj,qdmx(message,'sjczbs') as sjczbs  from  DZFP_DZDZ_QRPT_YWRZ_QRQM1"
    }
}


sink {
    Console {
        source_table_name = "DZFP_***_QRQM2"
    }
    jdbc {
          source_table_name = "DZFP_***_QRQM2"
          url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
          driver = "com.mysql.cj.jdbc.Driver"
          user = "user"
          password = "pwd"
          batch_size = 200000
          database = "dbname"
          table = "tablename"
          generate_sink_sql = true
          primary_keys = ["nsrsbh","skssq"]
    } 
}

执行脚本:查看结果,发现并没有错位,还在原来的字段(sjtbsj)上面。

这种方法,是通过key获取value值。不会出现方法一中的按照逗号分割出现数据错位现象。 file

具体UDF函数编写如下。

maven引入如下:

<dependencies>
  <dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-transforms-v2</artifactId>
    <version>2.3.2</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-api</artifactId>
    <version>2.3.2</version>
  </dependency>

  <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.20</version>
  </dependency>

  <dependency>
    <groupId>com.google.auto.service</groupId>
    <artifactId>auto-service-annotations</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
    <scope>compile</scope>
  </dependency>

  <dependency>
    <groupId>com.google.auto.service</groupId>
    <artifactId>auto-service</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
    <scope>compile</scope>
  </dependency>
</dependencies>  

UDF具体实现java代码如下:

package org.seatunnel.sqlUDF;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@AutoService(ZetaUDF.class)
public class QdmxUDF implements ZetaUDF {

    @Override
    public String functionName() {
        return "QDMX";
    }

    @Override
    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> list) {
        return BasicType.STRING_TYPE;
    }

    // list 参数实例:(也就是kafka 解析过来的数据)
    //SeaTunnelRow{tableId=, kind=+I, fields=[{key1=value1,key2=value2,.....}]}
    @Override
    public Object evaluate(List<Object> list) {
        String str = list.get(0).toString();
        //1 Remove the prefix
        str = StrUtil.replace(str, "SeaTunnelRow{tableId=, kind=+I, fields=[{", "");
        //2 Remove the suffix
        str = StrUtil.sub(str, -3, 0);
        // 3 build Map key value
        Map<String, String> map = parseToMap(str);
        if ("null".equals(map.get(list.get(1).toString())))
            return "";
        // 4 return the value of the key
        return map.get(list.get(1).toString());
    }

    public static Map<String, String> parseToMap(String input) {
        Map<String, String> map = new HashMap<>();
        // 去除大括号 在字符串阶段去除
        // input = input.replaceAll("[{}]", "");
        // 拆分键值对
        String[] pairs = input.split(", ");

        for (String pair : pairs) {
            String[] keyValue = pair.split("=");
            if (keyValue.length == 2) {
                String key = keyValue[0].trim().toLowerCase();
                String value = keyValue[1].trim();
                map.put(key, value);
            }
        }
        return map;
    }
}

然后打包,打包命令如下:

mvn -T 8 clean install -DskipTests -Dcheckstyle.skip -Dmaven.javadoc.skip=true

查看META-INF/services, 看注解@AutoService 是否生成对应的spi接口:

如下:则打包成功! file 如果没有,则打包失败,UDF函数无法使用.

可以参考我的打包插件:

  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-site-plugin</artifactId>
      <version>3.7</version>
      <dependencies>
        <dependency>
          <groupId>org.apache.maven.doxia</groupId>
          <artifactId>doxia-site-renderer</artifactId>
          <version>1.8</version>
        </dependency>
      </dependencies>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
        <annotationProcessorPaths>
          <path>
            <groupId>com.google.auto.service</groupId>
            <artifactId>auto-service</artifactId>
            <version>1.1.1</version>
          </path>
        </annotationProcessorPaths>
      </configuration>
    </plugin>
  </plugins>
</build>

最终打成的jar包放到 ${SEATUNNEL_HOME}/lib目录下,由于我的UDF函数引入了第三方jar包,也需要一并上传。如果是Zeta集群,需要重启Zeta集群才能生效。其他引擎实时生效。 最终上传成功如下: file 说明:这个hutool-all的jar包可以含在java_study这个项目里面,我图方便,上传了两个。

综上,推荐使用通过UDF函数扩展的方式,实现嵌套kafka source json源数据的解析。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/770527.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker容器化部署Rancher2.x实战

关闭swap分区 swapoff -avi /etc/fstab 注释掉 /dev/mapper/centos-swap swap swap default 0 0 关闭防火墙 systemctl stop firewalld systemctl disable firewalld 设置主机名称 hostnamectl set-hostname k8s-master hostnamectl set-hostname k8s-node01 加入dns解析…

Tomcat总结

文章目录 1. Tomca简介2. Maven Web 项目结构3. HTTP数据传输格式4. IDEA集成本地Tomcat5. Maven配置Tomcat 1. Tomca简介 简介:   Web服务器的作用&#xff1a; 分析: Tomcat: 1. Tomcat是一个Web服务器, 可以负责解析和处理服务器和浏览器之间传输的HTTP协议 2. Tomcat默…

jupyter定制数学函数

from math import * #导入绘图模块 import numpy as np #导入数值计算模块 import matplotlib.pyplot as plt #导入绘图模块 plt.rcParams[font.sans-serif][SimHei] #绘图中文 plt.rcParams[axes.unicode_minus]False #绘图负号 import mpl_toolkits.axisartist as axisartist…

wampserver的mysql8.0版本在my.ini文件中加入skip_grant_tables无效等一系列问题。

背景&#xff1a;安装了新的wampserver之后&#xff0c;php版本mysql8.0.31&#xff0c;想打开phpadmin可视化管理页面&#xff0c;后来忘记密码了&#xff0c;报错&#xff1a;ERROR 1045 (28000): Access denied for user rootlocalhost (using password: No)&#xff0c;只能…

Apikit 自学日记:如何测试多个关联的 API

肯定会有人好奇&#xff0c;如果有多个关联的 API 如何做测试呢&#xff1f;很简单&#xff01;在 APIkit 中也有测试多个关联 API 的功能。 1、在流程测试用例详情页中&#xff0c;点击“ 添加测试步骤”&#xff0c;选择“从API文档添加API请求” 2、在对应的项目下选择关联的…

使用bat处理批量下载表情包图片

需求&#xff1a; 一共有98个表情包需要下载到本地电脑&#xff0c;表情包png图片的远程URL地址如下&#xff1a; http://bbs.296o.com/emoj/1.png 至 http://bbs.296o.com/emoj/98.png 如果是手工下载的话&#xff0c;要浪费很多时间&#xff0c;我们直接有windows系统上使…

目标检测算法:YOLOv2-v4简单解读

目标检测算法&#xff1a;YOLOv2-v4简单解读 说明 ​ YOLO系列算法是目标检测领域比较突出的算法之一&#xff0c;网上关于每个版本都有非常多的解读&#xff0c;这里我只是简单梳理一下我自己的观点&#xff0c;主要目的是帮助自己复习和梳理知识。 ​ 本博客属于论文解读系列…

nginx基础3——配置文件详解(实用功能篇)

文章目录 一、平滑升级二、修饰符2.1 无修饰符效果2.2 精准匹配&#xff08;&#xff09;2.3 区分大小写匹配&#xff08;~&#xff09;2.4 不区分大小写匹配&#xff08;~*&#xff09;2.5 匹配优先级 三、访问控制四、用户认证五、配置https六、开启状态界面七、rewrite重写u…

文件加密软件哪个好?文件加密方法介绍

为了避免数据泄露事件的发生&#xff0c;电脑中的重要文件需要采用加密的方法进行保护。那么&#xff0c;文件加密软件哪个好呢&#xff1f;下面我们就一起来了解一下。 EFS加密 除了Windows的家庭版系统外&#xff0c;其他版本的系统均拥有EFS文件加密功能&#xff0c;它可以…

Linux 学习记录53(ARM篇)

Linux 学习记录53(ARM篇) 本文目录 Linux 学习记录53(ARM篇)一、内存读写指令1. 在C语言中读取内存2. 指令码及功能3. 格式4. 使用示例5. 寻址方式(1. 前索引方式(2. 后索引方式(3. 自动索引 6.批量寄存器操作指令(1. 操作码(2. 格式(3. 使用示例(4. 地址增长方式>1 ia后缀&…

【论文学习】Distortion Agnostic Deep Watermarking

一、前言 论文链接&#xff1a;Distortion Agnostic Deep Watermarking 论文主要内容&#xff1a; 该文献提出了一种失真不可知的鲁棒水印模型&#xff0c;以解决现有DNN鲁棒水印方法的局限性。现有的DNN鲁棒水印方法&#xff0c;通常是在训练阶段将各类失真&#xff08;例如…

Databend 开源周报第 102 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 为指定列创建 B…

DataTable数据对比

DataTable数据对比 文章目录 DataTable数据对比前言一、计算DataTable差集结构不同的情况结构相同的情况 二、计算DataTable交集结构不同的情况结构相同的情况 三、计算DataTable的并集合两个DaTable结构相同的情况计算并集 前言 开发中我们经常会出现查询数据库后返回DataTab…

Spring Data JPA使用规则和审计的学习

一、引入依赖 完整的pom文件如下所示: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http…

idea 常用快捷键总结

IDEA常用快捷键总结 很多新手小白在使用IDEA进行代码编写的时候 对快捷键很感兴趣 这里泡泡给大家总结了一些常用的快捷键 希望能帮助到你 记得要收藏下来时常观看并且练习&#xff0c;才能熟练哦~ 1. 根据psvm或者main快速生成主函数 我们可以在类中输入psvm 或者main 然后I…

数据结构day4(2023.7.18)

一、Xmind整理&#xff1a; 链表的插入和删除&#xff1a; 二、课上练习&#xff1a; 练习1&#xff1a;顺序表去重 33 22 22 11 11 i jfor(int i0;i<list->len-1;i){for(int ji1;j<len;j){if(list->data[i]list->data[j]){delete_by_sub(j,list); …

springmvc @RequestMapping注解中produces以及consumes属性的含义(转载请删除括号里的内容)

http协议基础知识 首先需要了解什么叫MediaType&#xff08;媒体类型&#xff09;&#xff1f; 通俗来说&#xff0c;在http协议中&#xff0c;用来表示传输内容的格式就是MediaType&#xff0c;比如text/html&#xff0c;application/json等&#xff0c;其中text代表介质&am…

win7系统电脑怎么在桌面上悬挂工作日程安排清单显示呢?

在现代快节奏的工作环境中&#xff0c;合理安排和管理工作日程是非常重要的。而在电脑桌面上悬挂工作日程安排清单显示&#xff0c;可以让我们随时了解自己的任务和工作进度&#xff0c;提高工作效率。那么&#xff0c;如何在Win7系统电脑上实现这一功能呢&#xff1f; 今天我…

第六章内存保护单元(Cortex-M7 Processor)

目录 第六章内存保护单元 6.1关于MPU 6.2MPU功能描述 6.3MPU编程器模型 第六章内存保护单元 介绍MPU (Memory Protection Unit)。它包含以下部分: 关于第6-2页的MPU。MPU功能描述见第6-3页。MPU程序员模型在第6-4页。 6.1关于MPU MPU是内存保护的可选组件。处理器支持标准…

【算法基础:数据结构】2.3 并查集

文章目录 并查集算法原理&#xff08;重要&#xff01;⭐&#xff09; 经典例题836. 合并集合&#xff08;重要&#xff01;模板&#xff01;⭐&#xff09;837. 连通块中点的数量&#xff08;维护连通块大小的并查集&#xff09;240. 食物链&#xff08;维护额外信息的并查集&…