ChunJun: 自定义插件

news2024/7/4 9:38:44

序言

Chunjun的版本兼容可能会有问题,在我们了解了自定义插件后,在修改源码以应对不同的场景就会得心应手了,针对Chunjun1.12.Release版本说明cuiyaonan2000@163.com 

自定义插件整体流程

从数据流的角度来看ChunJun,可以理解为不同数据源的数据流通过对应的ChunJun插件处理,变成符合ChunJun数据规范的数据流;脏数据的处理可以理解为脏水流通过污水处理厂,变成符合标准,可以使用的水流,而对不能处理的水流收集起来。----总的来说跟Flink的数据处理一样,只是增加了一个插件的概念用于处理不同的数据源,并生成对应的Flink任务cuiyaonan2000@163.com

插件开发不需要关注任务具体如何调度,只需要关注关键问题:

  1. 数据源本身读写数据的正确性;
  2. 如何合理且正确地使用框架;
  3. 配置文件的规范,每个插件都应有对应的配置文件;

每个插件应当有以下目录:

  1. conf:存放插件配置类的包。
  2. converter:存放插件数据类型转换规则类的包。
  3. source:存放插件数据源读取逻辑有关类的包。
  4. sink:存放插件数据源写入逻辑有关类的包。
  5. table:存放插件数据源sql模式有关类的包。  -----这个应该不是我们的重点,flink的sql并不好cuiyaonan2000@163.com
  6. util:存放插件工具类的包,chunjun已经封装了一些常用工具类在chunjun-core模块中,如果还需编写插件工具类的请放在该插件目录中的util包

以Stream插件为例子,他的插件结构如下图所示:

调试

Debug调试

(1)本地调试

在chunjun-local-test模块中,官方已经写好了本地测试的LocalTest类,只需更改脚本文件路径,在代码处打上断点即可调试。

(2)远程调试

如果需要远程调试,那么需要在 flink-conf.yaml 中增加 Flink 的远程调试配置,然后在 idea 中配置”JVM Remote“,在代码块中打断点(这种方法还能调试 Flink 本身的代码)

 

env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

只需要修改标记的这两个地方,如果是 HA 集群,需要根据日志修改怎么看日志,怎么修改,自行查资料

至此,任务 idea 调试流程就这些内容。

任务类型

从Chunjun的配置文件Json中可以看到任务的分类

  • sync:同步任务,同理有同步任务的读插件和写插件,即sync(reader),sync(writer)
  • sql:计算任务,,同理有计算任务的读插件和写插件,即sync(reader),sync(writer)

reader

开发流程

以Stream插件为例

插件数据源读取逻辑需要继承BaseRichInputFormat类,BaseRichInputFormat是具体的输入数据的操作,包括open、nextRecord、close,每个插件具体操作自己的数据,InputFormat公共内容都在BaseRichInputFormat,不要随意修改。

创建StreamInputFormat类继承BaseRichInputFormat类,重写其中的必要方法。

public class StreamInputFormat extends BaseRichInputFormat {
    //创建数据分片
	@Override
    public InputSplit[] createInputSplitsInternal(int minNumSplits) {......}
    //打开数据连接
	@Override
    public void openInternal(InputSplit inputSplit) {......}
    //读取一条数据
 	@Override
    public RowData nextRecordInternal(RowData rowData) throws ReadRecordException {......}
    //判断数据是否读取完毕
	@Override
    public boolean reachedEnd() {......}
    //关闭数据连接
	@Override
    protected void closeInternal() {......}
}

由此可见StreamInputFormat 具体的实施类,但是在调用实现类的方法前还有引导类的创建,具体流程是:StreamSourceFactory-->StreamInputFormatBuilder-->StreamInputFormat 中间会引用StreamConf和StreamColumnConverter  至此一个source就完成了cuiyaonan2000@163.com

业务流程

1 com.dtstack.chunjun.Main是启动类,首先判断是计算任务,还是同步任务

2 以exeSyncJob为例进入可以看到,这里就是根据我们传入的Json文件内容生成环境变量

3 .将上面解析生成的SyncConf,然后通过反射加载具体的插件调用createSource方法生成DataStream,  注意这里就是重点了根据 我们的json文件的内容,来获取StreamSourceFactory ,然后创建的数据内容是DataStream----从这里开始就是重点了

4 createSource方法中会构建inputformat对象,然后调用createInput方法,将inputformat对象封装至DtInputFormatSourceFunction中。

未完待续~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1040252.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文了解数据治理全知识体系!

在业界,大家都为如何做好数据治理而感到困惑。数据治理工作一定要先摸清楚数据的家底,规划好路线图,再进行决策。 本文从数据治理的误区、元数据管理、数据质量管理、数据资产管理等4个方面整理出数据治理的一套经验总结,给予数据…

python算法部署(通信篇)

1.dockerflask方式 # YOLOv5 🚀 by Ultralytics, AGPL-3.0 license """ Run a Flask REST API exposing one or more YOLOv5s models """import argparse import io import jsonimport torch from flask import Flask, jsonify, req…

公司新来的实习生问我SpringBoot多个环境的配置方式

这是一篇写给新手的文章,老手可以绕行了。 起因是一个同学让我帮他看个问题,他说有两个环境,一个环境有问题,另一个环境没问题,但是一直找不到原因,假设一个环境是 dev,另一个环境是 test。 于…

2023 版 QQ 机器人运行部署文档

文章目录 1.前置说明2.机器人框架的下载与运行2.1 下载机器人框架2.2 下载可操作框架的静态页面2.3 运行机器人框架 3.登录QQ机器人3.1 前提说明3.2 扫码登录3.3 注意事项 4.后端程序处理消息4.1 下载 stater4.2 stater 基本说明4.3 运行后端程序4.4 测试消息处理4.5 关于扩展消…

ISE_ChipScope Pro的使用

1.ChipScope Pro Core Inserter 使用流程 在之前以及编译好的流水灯实验上进行学习 ChipScope的使用。 一、新建一个ChipScope 核 点击Next,然后在下一个框中选择 Finish,你就会在项目菜单中看到有XX.cdc核文件。 二、对核文件进行设置 右键“Synthesize – XST” …

计算机丢失ac1st16.dll怎么解决?教你简单的修复ac1st16.dl文件

在使用Windows操作系统时,有时会遇到一些DLL文件丢失的问题,其中之一就是AC1ST16.DLL。AC1ST16.DLL是用于AutoCAD相关软件的动态链接库文件,当该文件丢失时,可能会导致软件无法正常运行。本文将详细介绍多种解决AC1ST16.DLL丢失问…

OpenGL超级宝典(第五版)第8章fbo_drawbuffers例子分析

目录 1. 概述 2. 疑难点剖析 2.1 SetupRC函数分析 2.2 multibuffer.vs分析 2.3 RenderScene分析 3. 其它 1. 概述 《OpenGL超级宝典(第五版)》如下: 该书第8版的 fbo_drawbuffers工程展示了如下技术点: 什么是帧缓冲区对象&#xff08…

【Linux】进程间通信方式②——文件共享映射(附图解与代码实现)

我们来简单了解下文件共享映射的定义:通过映射文件,使用映射机制,实现资源共享,完成进程通信 具体是如何实现的呢?跟随着这篇博客,我们来看一看 进程通过文件共享映射实现通信的具体步骤 由某一进程创建映…

【湖科大教书匠】计算机网络随堂笔记第2章(计算机网络物理层)

目录 2.1、物理层的基本概念 2.2、物理层下面的传输媒体 导引型传输媒体 同轴电缆 双绞线 光纤 电力线 非导引型传输媒体 无线电波 微波 红外线 可见光 2.3、传输方式 串行传输和并行传输 同步传输和异步传输 单向通信(单工)、双向交替通信&#xf…

【LeetCode-简单题】110. 平衡二叉树

文章目录 题目方法一:后序递归 题目 方法一:后序递归 递归遍历的同时判断是否是平衡二叉树,如果不是,就置为-1,如果是 就正常做递归求最大深度 参考图解网址 判断平衡二叉树 class Solution {public boolean isBalanc…

誉天在线项目-放大招-Vue3集成RichText富文本客户端组件QuillEditor

背景 开发中我们需要填写图文内容,就是含有图片和文字,html标准组件中是没有的。都是第三方来实现,就需要我们去集成。 有早期的fckEditor、ckEditor等,新的我们使用了vue框架,市场又推出了quillEditor。下面我们就在…

【【萌新的SOC学习之绪论】】

萌新的SOC学习之绪论 Vitis 统一软件平台的前身为 Xilinx SDK,从 Vivado 2019.2 版本开始,Xilinx SDK 开发环境已统一整合 到全功能一体化的 Vitis 中。Vitis 开发平台除了启动方式、软件界面、使用方法与 SDK 开发平台略有区别, 其他操作几…

使用acme.sh申请免费ssl证书(Cloudflare方式API自动验证增加DNS Record到期证书到期自动重新申请)

下载acme.sh curl https://get.acme.sh | sh -s emailmyexample.comcd ~/.acme.sh/获取Cloudflare密钥 Preferences | Cloudflare 登录选择账户详情选择API Token选择创建令牌选择区域DNS模板,并设置编辑写入权限生成并复制令牌备用回到首页概览界面下部获取账号…

【RabbitMQ实战】3分钟在Linux上安装RabbitMQ

本节采用docker安装RabbitMQ。采用的是bitnami的镜像。Bitnami是一个提供各种流行应用的Docker镜像和软件包的公司。采用docker的方式3分钟就可以把我们想安装的程序运行起来,不得不说真的很方便啊,好了,开搞。使用前提:Linux虚拟…

3D设计软件Rhinoceros 6 mac 犀牛6中文版功能特征

Rhinoceros Mac中文版是一款3D设计软件“犀牛”,在众多三维建模软件中,Rhinoceros mac因为其体积小、功能强大、对硬件要求低而广受欢迎,对于专业的3D设计人员来说它是一款非常不错的3D建模软件,Rhinoceros Mac中文版能轻易整合3D…

苹果手机短信删除了怎么恢复?3种有效方法介绍

手机短信是一种即时通信方式,人们可以使用短信来达到快速传递信息的目的。在没有网络或者网络不稳定的时候,短信仍然可以做到发送和接收,这弥补了其他网络通信软件的缺点。 所以说,手机短信仍然是我们生活中不可缺少的一部分。当…

【rtp】mid 扩展: RtpMid 字符串扩展的解析和写入

mid 是uint8_t 类型? 扩展填写的是字符串,读取字符串后atoi 转 uint8_t : webrtc 看起来是个字符串:写入 扩展的值是改变了: 这里是更新扩展的长度: 新的大小小于原来的,没有缩减内存,而是对于多余的置位0了:if (len < current_len) {memset(

基于SSM的高校图书馆个性化服务的设计与实现(有报告)。Javaee项目。

演示视频&#xff1a; 基于SSM的高校图书馆个性化服务的设计与实现&#xff08;有报告&#xff09;。Javaee项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过S…

如何在低代码平台中应用可视化编程

可视化编程&#xff0c;亦即可视化程序设计&#xff1a;以“所见即所得”的编程思想为原则&#xff0c;力图实现编程工作的可视化&#xff0c;即随时可以看到结果&#xff0c;程序与结果的调整同步。可视化编程的理念来源于可视化技术&#xff0c;它指的是一种把计算机程序中的…

深度学习在视频直播美颜sdk中的应用

视频直播美颜SDK是一类用于实时视频美颜处理的工具包&#xff0c;它们利用深度学习算法来提高视频直播中的主播和观众的外观吸引力。本文将深入探讨深度学习在视频直播美颜sdk中的应用&#xff0c;以及这些应用对直播行业的重要性。 一、人脸检测与关键点定位 通过卷积神经网…