Flink测试利器之DataGen初探 | 京东云技术团队

news2025/2/26 1:53:22

什么是 Flinksql

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如过滤、聚合、连接和转换等。它还提供了窗口操作、时间处理和复杂事件处理等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。他是Flink最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

使用 Flink SQL处理数据的基本步骤:

  1. 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。

  2. 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。

  3. 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。

  4. 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

什么是 connector

Flink Connector 是指用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  1. JDBC :用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。

  2. JDQ :用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。

  3. Elasticsearch :用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。

  4. File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。

还有如HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或将处理结果导出到外部系统。

DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (
 order_number BIGINT,
 price DECIMAL(32,2),
 buyer ROW<first_name STRING, last_name STRING>,
 order_time TIMESTAMP(3)
) WITH (
 'connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 `input_table` 的输入表。该表包含了 `order_number`、`price` 和 `buyer` ,`order_time`四个字段。默认是random随机生成对应类型的数据,生产速率是10000条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{"order_number":-6353089831284155505,"price":253422671148527900374700392448,"buyer":{"first_name":"6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name":"d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time":"2023-09-21 06:22:29.618"}
{"order_number":1102733628546646982,"price":628524591222898424803263250432,"buyer":{"first_name":"4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name":"7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time":"2023-09-21 06:23:01.69"}

支持的类型

字段类型数据生成方式
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

连接器属性

属性是否必填默认值类型描述
connectorrequired(none)String‘datagen’.
rows-per-secondoptional10000Long数据生产速率
number-of-rowsoptional(none)Long指定生产的数据条数,默认是不限制。
fields.#.kindoptionalrandomString指定字段的生产数据的方式 random还是sequence
fields.#.minoptional(Minimum value of type)(Type of field)random生成器 指定字段 # 最小值, 支持数字类型
fields.#.maxoptional(Maximum value of type)(Type of field)random生成器的指定字段 # 最大值, 支持数字类型
fields.#.lengthoptional100Integerchar/varchar/string/array/map/multiset 类型的长度.
fields.#.startoptional(none)(Type of field)sequence生成器的开始值
fields.#.endoptional(none)(Type of field)sequence生成器的结束值

DataGen使用

了解了dategen的基本使用方法,那么下面来结合其他类型的连接器实践一下吧。

场景1 生成一亿条数据到hive表

CREATE TABLE dataGenSourceTable
 (
 order_number BIGINT,
 price DECIMAL(10, 2),
 buyer STRING,
 order_time TIMESTAMP(3)
 )
WITH
 ( 'connector'='datagen', 
 'number-of-rows'='100000000',
 'rows-per-second' = '100000'
 ) ;


CREATECATALOG myhive
WITH (
 'type'='hive',
 'default-database'='default'
);
USECATALOG myhive;
USE dev;
SETtable.sql-dialect=hive;
CREATETABLEifnotexists shipu3_test_0932 (
 order_number BIGINT,
 price DECIMAL(10, 2),
 buyer STRING,
 order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
 'partition.time-extractor.timestamp-pattern'='$dt',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);
SETtable.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number,price,buyer,order_time, cast( CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产10万条数据的时候,17分钟左右就可以完成,当然我们可以通过增加Flink任务的计算节点、并行度、提高生产速率’rows-per-second’的值等来更快速的完成大数据量的生产。

场景2 持续每秒生产10万条数到消息队列

CREATE TABLE dataGenSourceTable (
 order_number BIGINT,
 price INT,
 buyer ROW< first_name STRING, last_name STRING >,
 order_time TIMESTAMP(3),
 col_array ARRAY < STRING >,
 col_map map < STRING, STRING >
 )
WITH
 ( 'connector'='datagen', --连接器类型
 'rows-per-second'='100000', --生产速率
 'fields.order_number.kind'='random', --字段order_number的生产方式
 'fields.order_number.min'='1', --字段order_number最小值
 'fields.order_number.max'='1000', --字段order_number最大值
 'fields.price.kind'='sequence', --字段price的生产方式
 'fields.price.start'='1', --字段price开始值
 'fields.price.end'='1000', --字段price最大值
 'fields.col_array.element.length'='5', --每个元素的长度
 'fields.col_map.key.length'='5', --map key的长度
 'fields.col_map.value.length'='5' --map value的长度
 ) ;
CREATE TABLE jdqsink1
 (
 order_number BIGINT,
 price DECIMAL(32, 2),
 buyer ROW< first_name STRING, last_name STRING >,
 order_time TIMESTAMP(3),
 col_ARRAY ARRAY < STRING >,
 col_map map < STRING, STRING >
 )
WITH
 (
 'connector'='jdq',
 'topic'='jrdw-fk-area_info__1',
 'jdq.client.id'='xxxxx',
 'jdq.password'='xxxxxxx',
 'jdq.domain'='db.test.group.com',
 'format'='json'
 ) ;
INSERTINTO jdqsink1
SELECT*FROM dataGenSourceTable;

思考

通过以上案例可以看到,通过Datagen结合其他连接器可以模拟各种场景的数据

  • 性能测试:我们可以利用Flink的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应;
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性;
  • 数据完整性测试:我们通过Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。

作者:京东零售 石朴

来源:京东云开发者社区 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1087701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【2023研电赛】商业计划书赛道上海市一等奖:基于双矢量优化谐波预测控制的MMC-PET光伏储能系统

该作品参与极术社区组织的2023研电赛作品征集活动&#xff0c;欢迎同学们投稿&#xff0c;获取作品传播推广&#xff0c;并有丰富礼品哦~ 团队介绍 参赛单位&#xff1a;上海理工大学 参赛队伍&#xff1a;Dream explorers 参赛队员&#xff1a;吕哲 李天皓 赵安杰 项目意义…

基于Springboot实现商务安全邮箱邮件收发系统项目【项目源码+论文说明】计算机毕业设计

基于Springboot实现商务安全邮箱邮件收发系统演示 摘要 随着社会的发展&#xff0c;社会的方方面面都在利用信息化时代的优势。计算机的优势和普及使得商务安全邮箱的开发成为必需。 本文以实际运用为开发背景&#xff0c;运用软件工程原理和开发方法&#xff0c;采用jsp技术…

Allegro芯片引脚如何散出?

在日常进行PCB设计时,遇到有BGA芯片的项目,引脚就需要散出。那么如何散出呢? 需要对引脚进行散出的BGA芯片 下面详细介绍散出方法。 (1)选择菜单Route(布线) (2)选择Create Fanout (3)在选择散出命令后,在Find选项卡,可以选择Symbol(器件)或Pins(引脚)

vscode 右侧滚动条标记不提示,问题解决纪录

问题描述 用vscode看代码时&#xff0c;我希望在右侧提示一个变量在文件下都在那里使用&#xff0c;在那里赋值&#xff0c;之前该功能是存在的&#xff0c;当我打开一个新的文件夹时这个功能消失了。 解决办法 在setting.json文件下输入 "C_Cpp.intelliSenseEngine&…

0基础学习VR全景平台篇 第108篇:全景图细节处理(下,航拍)

上课&#xff01;全体起立~ 大家好&#xff0c;欢迎观看蛙色官方系列全景摄影课程&#xff01; &#xff08;调色前图库&#xff09; &#xff08;原图-大图&#xff09; 一、导入文件 单击右下角导入按钮&#xff0c;选择航拍图片所在文件夹&#xff0c;选择图片&#xff0…

土壤水分烘干法流程

土壤水分烘干法流程 叠小盒子装土 对折 得到一个正方形&#xff0c;裁掉多余的。然后将正方形按如下形式折 再次对折 然后再展开&#xff0c;对着折痕&#xff0c;竖立起盒子边缘 把上面的尖角翻下来 最后将多余的长条裁出一个盒子底部大小的小方块&#xff0c;放入盒子…

【教学类-35-04】学号+姓名+班级(中3班)学号字帖(A4竖版2份 竖版长条)

图片展示: 背景需求: 2022年9-2023年1月我去过小3班带班&#xff0c;但是没有在这个班级投放过学具&#xff0c;本周五是我在本学期第一次带中3班&#xff0c;所以提供了一套学号描字帖。先让我把孩子的名字和脸混个眼熟。 之前试过一页两套名字的纸张切割方法有&#xff1a;…

2023年中国缝纫机针行业分类、市场规模及存在问题分析[图]

缝纫机针是指用于缝纫机上的零件&#xff0c;用于穿透和连接不同种类的织物&#xff0c;从而完成缝纫工作。这些针通常由金属制成&#xff0c;具有不同的形状、尺寸和用途&#xff0c;以适应各种缝纫需求&#xff0c;如直线缝、锁边、装饰等。 缝纫机针行业分类 资料来源&…

新风机缺点有哪些?

虽然新风机在提供新鲜空气和改善室内空气质量方面有很多优点&#xff0c;但它也存在一些缺点。下面列举几个常见的新风机缺点&#xff1a; 安装成本较高&#xff1a;新风机需要通过管道连接室内和室外&#xff0c;需要对房屋进行改造和安装。这可能会增加一些额外的安装成本&am…

Dhgate敦煌手机端下单支付测试

1. 打开敦煌首页&#xff0c;点击左上角的图标打开菜单 2. 点击Join Free注册账号。 3. 点击SIGN UP注册账号&#xff0c;填写账号的信息。 4. 填写收货地址后注册完成。 5. 找到自己需要购买的商品&#xff0c;点击进去。 6. 点击Buy it now 7. 选择完颜色 类型后进入下一步。…

缓存设计的创新之旅:架构的灵魂之一

缓存在架构设计中占有重要地位。缓存在提升性能中也扮演重要的角色。常见的有对资源的缓存&#xff0c;比如数据库连接池、http连接池&#xff0c;还有对数据的缓存等。缓存的设计可复杂也可简单&#xff0c;但是需要考虑的点却很多。 缓存对象 设计缓存的时候一定要考虑的是&…

将vue项目打包成安卓app

目标&#xff1a;将vue项目打包成安卓app 工具&#xff1a;HbuilderX 1.在HbuilderX中创建一个 5App 项目 创建好的app项目目录 2.将vue项目打包 2.1 在 vue.config.js 中添加公共路径&#xff08;解决打包后的app图片不显示问题&#xff09; module.exports defineConfig(…

Java进阶 之 再论面向对象(1)——面向对象的编程思想 Java中的类和对象 深入认识对象,内存图解+变量作用域+参数传递

前言 我们都知道Java是一门面向对象object-oriented&#xff08;OOP&#xff09;的编程语言&#xff0c;但究竟什么是面向对象&#xff0c;为什么要用面向对象&#xff0c;往往会语焉不详。 本篇博客从面向过程和面向对象的编程思想谈起&#xff0c;阐述了类和对象在Java中的…

044:mapboxGL利用fill-extrusion自定义挤出状建筑体

第044个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中利用fill-extrusion自定义挤出状建筑体。填充挤压样式图层在地图上渲染一个或多个填充(以及可选的描边)挤压(3D)多边形。 您可以使用填充拉伸图层来配置多边形或多多边形要素的拉伸和视觉外观。 直接复…

JMeter定时器

一. 同步定时器&#xff08;Synchronizing Timer) &#xff08;在Loadrunner中叫做集合点&#xff09; 思考&#xff1a; 如何模拟多个用户同时抢一个红包&#xff1f;如何测试电商网站中抢购活动、秒杀活动&#xff1f; 1.1 介绍 Sync Timer的目的是阻塞线程&#xff0c;直…

【算法-动态规划】完全背包问题

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

计算机毕业设计选什么题目好?springboot 儿童福利院管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

【Java 定时任务】crontab定时任务配置(139)

背景 在日常的开发工作中我们经常会遇到定时任务的相关问题&#xff0c;比如&#xff1a; 信用卡定时每月给用户推送账单数据&#xff1b; 轮训更新某个任务的状态是否完成&#xff1b; 设置一个定时提醒&#xff1b; 邮件或消息设置定时发送&#xff1b; 定时统计某个时间段的…

高压放大器在软体机器人领域的应用

软体机器人是一种新型机器人技术&#xff0c;与传统的硬体机器人有着很大的不同。软体机器人通常由柔软的材料制成&#xff0c;具有高度的柔韧性和灵活性&#xff0c;并且可以实现多种形状和动作。但是&#xff0c;软体机器人的发展面临很多技术挑战&#xff0c;其中之一就是控…

信息与网络安全基础知识汇总

需要面试真题和考证资料的 面试真题考证资料点我领取 面试真题考证资料点我领取 面试真题考证资料点我领取 一、概述 1.网络信息安全基本概念 信息安全&#xff1a;是指信息网络中的硬件、软件及其系统中的数据受到保护&#xff0c;不受偶然的或者恶意的原因而遭到破…