PiflowX如何快速开发flink程序

news2025/2/24 8:11:13

PiflowX如何快速开发flink程序

参考资料

Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com)

Flink SQL 背景

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

  • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;
  • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
  • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;
  • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
  • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。
Flink SQL 常规实战应用

案例来自(Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com))!详细流程有兴趣可以参考原文示例。(如有侵犯,请请联系!)。

在此,简单总结一下flink sql的开发流程:

1.首先需要创建maven工程,确认需要的各种依赖,运气好的话,还需要花费大量的精力和时间去排查依赖冲突的问题(oh God bless me!);

2.开始balabala编写模板代码,如:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

3.数据准备和预处理;

 DataSet<String> input = env.readTextFile("score.csv");
        DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
            @Override
            public PlayerData map(String s) throws Exception {
                String[] split = s.split(",");
                return new PlayerData(String.valueOf(split[0]),
                        String.valueOf(split[1]),
                        String.valueOf(split[2]),
                        Integer.valueOf(split[3]),
                        Double.valueOf(split[4]),
                        Double.valueOf(split[5]),
                        Double.valueOf(split[6]),
                        Double.valueOf(split[7]),
                        Double.valueOf(split[8])
                );
            }
        });
其中的PlayerData类为自定义类:
public static class PlayerData {
        /**
         * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
         */
        public String season;
        public String player;
        public String play_num;
        public Integer first_court;
        public Double time;
        public Double assists;
        public Double steals;
        public Double blocks;
        public Double scores;

        public PlayerData() {
            super();
        }

        public PlayerData(String season,
                          String player,
                          String play_num,
                          Integer first_court,
                          Double time,
                          Double assists,
                          Double steals,
                          Double blocks,
                          Double scores
                          ) {
            this.season = season;
            this.player = player;
            this.play_num = play_num;
            this.first_court = first_court;
            this.time = time;
            this.assists = assists;
            this.steals = steals;
            this.blocks = blocks;
            this.scores = scores;
        }
    }

4.终于到了真正的业务处理了,有了flink sql的强大和方便,倒是省了不少代码;

Table queryResult = tableEnv.sqlQuery("
select player, 
       count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
");

5.ok,到此,数据处理和计算逻辑完毕,处理结果写入到sink,可以完结散花咯,哈哈;

DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();

6.哦!好像还需要调试运行,好吧,再辛苦一会,便可大功告成!
在这里插入图片描述

7.完美,上线。。。。。。
在这里插入图片描述

(以上,纯属娱乐,如有不当,敬请谅解!)

可见,在平日开发一个flink任务虽已尽可能简单,但开发周期也得1-2个工作日,甚至更长,有没有简单粗暴的,让我分分钟领盒饭,不,让我分分钟高效完成任务的!

当然有啦!!!接下来让我隆重的介绍一下今天的主角—PilfowX—大数据流水线系统。有兴趣可以查看之前的文章(StreamPark + PiflowX 打造新一代大数据计算处理平台-CSDN博客)。

PiflowX是基于Piflow和StreamPark二开实现的,在其基础上,实现了图像化拖拉拽的方式开发spark或flink作业,这里我将介绍flink任务的开发流程,以及如何零代码实现flink sql的开发。

PiflowX的flink组件算子基本都是基于flink table和sql实现的,我们只需在UI界面填写组件相关参数,之后的工作交给底层框架即可。

在这里插入图片描述

我们回顾一下flink sql语法定义。

Flink SQL 的语法和算子

Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]
PiflowX组件flink table实现

在了解了flink sql的定义后,一切便简单多了,那么,我们只需要根据业务需要,设计出一个表单输入,填写我们的业务参数,然后,由框架自动生成sql不就可以了么。

以下介绍如何配置一个mysqlcdc组件:

1.首先从组件列表中拖入一个MysqlCdc组件到画布中,点击节点,右侧会显示出节点参数表单区域和参数说明和示例。参数解释可以查看之前的文章(PiflowX-MysqlCdc组件-CSDN博客)。

在这里插入图片描述

在这里插入图片描述

2.填写相关参数,其实就是在定义flink table中的with属性。

在属性输入框中,点击预览可以实时查看生成的flink sql。

在这里插入图片描述
在这里插入图片描述

生成的flink sql 语句仅供参考,最终执行的语句会在引擎执行侧生成。
在这里插入图片描述

3.接下来我们可以根据需要来定义flink table结构,此步骤和其他步骤没有先后顺序。点击表单属性tableDefinition,在此表单中我们可以输入flink table中的结构属性定义。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

可以看到,我们可以在此定义flink table中的表基本信息,物理列,元数据列,计算列,水印等,具体说明在此就不赘述了,以后会有具体文章来说明。看看最终的效果:

在这里插入图片描述

至此,我们通过简单的表单填写,便可开发一个flink任务,最后,点击运行,系统便可自动提交到flink环境,并可实时查看运行日志,是不是很方便快捷!

当然,目前系统处于初期研发阶段,还有很多不完善的地方,敬请谅解。最后,我们来看一个简单的实例,如果通过PiflowX开发一个mysql cdc实时同步和flink读取doris的任务。

PiflowX-Droris读写组件

PiflowX-MysqlCdc组件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1383873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GPT5会是什么样的?奥特曼在YC W24会上演讲要点

“YC启动活动上&#xff0c;Sam Altman表示&#xff1a;以GPT-5和AGI将在’相对不久的将来’实现的心态来构建。” 在Y Combinator的一个启动活动中&#xff0c;Sam Altman表示&#xff0c;人工通用智能&#xff08;AGI&#xff09;的发展即将到来&#xff0c;并建议在构建产品…

【Fiddler抓包】微信扫码访问链接打不开网页

又来每天进步一点点~~~ 背景&#xff1a;某天发版的时候&#xff0c;手机连接电脑抓包查看用户登录之前的sessionID&#xff0c;由于业务需要&#xff0c;是需要用户登录微信扫码跳转至某一页面的&#xff0c;微信&#xff08;分身&#xff09;扫码成功&#xff0c;跳转时打不…

dcm数据格式转nrrd数据格式(2维转3维)

目的 将dcm数据格式&#xff08;2D&#xff09;转成nrrd数据格式&#xff08;3D&#xff09; 将一个文件夹下的dcm数据转成一个nrrd数据 代码 1. 安装必要包 pip install SimpleITK2. 上代码 Descripttion: Result: Author: Philo Date: 2024-01-10 14:25:49 LastEditors: …

WorkPlus企业打破信息孤岛,构建统一工作平台的首选之一

在当今数字化时代&#xff0c;企业内部存在着繁多的工作应用和系统。要实现高效的工作协作&#xff0c;企业需要一个统一的工作平台来打破信息孤岛&#xff0c;提升协作效率。作为一家领先的企业统一工作平台&#xff0c;WorkPlus以其卓越的性能和专业的功能&#xff0c;助力企…

【算法】Java-二叉树的右视图(BFS、DFS两种解法)

题目要求&#xff1a; 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 示例 1: 输入: [1,2,3,null,5,null,4] 输出: [1,3,4]示例 2: 输入: [1,null,3] 输出: [1,3]示例 3: 输入…

python基础-base64编码理解

目录 1、base64是什么 2、base64有什么用 3、base64如何用 4、理解base64 5、扩展 1、base64是什么 base64 就是包括字母a-z,A-Z,数字0-9&#xff0c;符号“”&#xff0c;“/”一共64个字符的字符集&#xff1b;还有一个‘’ 字符&#xff0c;占位补充&#xff1b; …

WorkPlus助力企业高效协作的企业级内网即时通讯解决方案

在企业内部&#xff0c;高效沟通和协作是推动工作顺利进行的关键。而企业级内网即时通讯成为了提升内部沟通效率的重要工具。作为一家领先的企业级内网即时通讯解决方案&#xff0c;WorkPlus以其卓越的性能和高安全性&#xff0c;打造了高效沟通协作的新标杆。 为什么选择WorkP…

嘘……快进来!这儿有最新版Microsoft照片程序的安装秘籍!(附安装引导程序下载)

网管小贾 / sysadm.cc 最近啊有不少小伙伴向我反馈&#xff0c;自个的 Windows 10 系统里边居然没有 Microsoft 照片 程序。 我觉得有点不可思议&#xff0c;为啥呢&#xff0c;因为他们的电脑是新买的&#xff01; 你看哈&#xff0c;系统是 22H2 最新版&#xff0c;安装日期…

陀螺仪LSM6DSV16X与AI集成(6)----检测自由落体

陀螺仪LSM6DSV16X与AI集成.6--检测自由落体 概述视频教学样品申请源码下载生成STM32CUBEMX串口配置IIC配置CS和SA0设置串口重定向参考程序初始换管脚获取ID复位操作BDU设置 概述 本文介绍如何初始化传感器并配置其参数&#xff0c;以便在检测到自由落体事件时发送通知。 最近…

STM32H5 Nucleo-144 board开箱

文章目录 开发板资料下载 【目标】 点亮LD1&#xff08;绿&#xff09;、LD2&#xff08;黄&#xff09;和LD3&#xff08;红&#xff09;三个LED灯 【开箱过程】 博主使用的是STM32CubeMX配置生成代码&#xff0c;具体操作如下&#xff1a; 打开STM32CubeMX&#xff0c;File-…

620基于51单片机的密码锁设计[Proteus仿真]

620基于51单片机的密码锁设计[proteus仿真] 密码锁设计这个题目算是课 程设计和毕业设计中常见的题目了&#xff0c;本期是一个基于51单片机的密码锁设计 需要的源文件和程序的小伙伴可以关注公众号【阿目分享嵌入式】&#xff0c;赞赏任意文章 2&#xffe5;&#xff0c;私信…

第 11 章 树结构实际应用

文章目录 11.1 堆排序11.1.1 堆排序基本介绍11.1.2 堆排序基本思想11.1.3 堆排序步骤图解说明11.1.4 堆排序代码实现 11.2 赫夫曼树11.2.1 基本介绍11.2.2 赫夫曼树几个重要概念和举例说明11.2.3 赫夫曼树创建思路图解11.2.4 赫夫曼树的代码实现 11.3 赫夫曼编码11.3.1 基本介绍…

关于java类与对象的创建

关于java类与对象的创建 我们在前面的文章中回顾了方法的定义和方法的调用&#xff0c;以及了解了面向对象的初步认识&#xff0c;我们本篇文章来了解一下类和对象的关系&#xff0c;还是遵循结合现实的方式去理解&#xff0c;不是死记硬背&#x1f600;。 1、类 类是一种抽…

第 3 场 蓝桥杯小白入门赛 解题报告 | 珂学家 | 单调队列优化的DP + 三指针滑窗

前言 整体评价 T5, T6有点意思&#xff0c;这场小白入门场&#xff0c;好像没真正意义上的签到&#xff0c;整体感觉是这样。 A. 召唤神坤 思路: 前后缀拆解 #include <iostream> #include <algorithm> #include <vector> using namespace std;int main()…

03.neuvector之组的划分逻辑

neuvector之组的划分逻辑 原文链接,欢迎大家关注我的github账号 一、组的定义 NeuVector 会自动从正在运行的应用程序中创建组。这些组以前缀‘nv‘开头。您也可以使用 CRD 或 REST API 手动添加它们&#xff0c;并且可以在任何模式下创建、发现、监视或保护。网络和响应规则需…

Pandas:Python可视化神器

大家好&#xff0c;数据可视化可以让我们很直观的发现数据中隐藏的规律&#xff0c;察觉到变量之间的互动关系&#xff0c;可以帮助我们更好的给他人解释现象&#xff0c;做到一图胜千文的说明效果。 常见的数据可视化库有: matplotlib 是最常见的2维库&#xff0c;可以算作可…

如何基于 Gin 封装出属于自己 Web 框架?

思路 在基于 Gin 封装出属于自己的 Web 框架前&#xff0c;你需要先了解 Gin 的基本用法和设计理念。 然后&#xff0c;你可以通过以下步骤来封装自己的 Web 框架&#xff1a; 封装路由&#xff1a;Gin 的路由是通过 HTTP 方法和 URL 路径进行匹配的&#xff0c;你可以根据自己…

MySQl Mybatis

一、MySQL 1.1 概述 1.1.1 MySQL安装 1.1.2 数据模型 1.1.3 SQL简介 1.2 DDL 1.2.1 数据库操作 1.2.2 图形化工具 1.2.3 表结构操作 &#xff08;一&#xff09;创建 &#xff08;二&#xff09;数据类型 &#xff08;1&#xff09;数值类型 age tinyint unsigned——加上…

分布式链路追踪专栏,分布式链路追踪:Skywalking集群管理设计

SkyWalking 是一个开源 APM 系统&#xff0c;包括针对 Cloud Native 体系结构中的分布式系统的监视&#xff0c;跟踪&#xff0c;诊断功能。核心功能如下&#xff1a; 服务、服务实例、端点指标分析&#xff1b; 根本原因分析&#xff0c;在运行时分析代码&#xff1b; 服务拓…

从零开始搭建ubuntu 16.04 pwndocker环境

1.安装VMware-tools 1.1遇到问题 在使用 VMware Workstation时遇到了VMware Tools不能安装的问题&#xff0c;具体表现为&#xff1a;在要安装VMware Tools的虚拟机上右键 ----》安装VMware Tools(T)… 为灰色&#xff0c;不能够点击。 1.2解决方案    1. 关闭虚拟机&…