【大数据】Flink 测试利器:DataGen

news2025/1/23 17:44:19

Flink 测试利器:DataGen

  • 1.什么是 FlinkSQL ?
  • 2.什么是 Connector ?
  • 3.DataGen Connector
    • 3.1 Demo
    • 3.2 支持的类型
    • 3.3 连接器属性
  • 4.DataGen 使用案例
    • 4.1 场景一:生成一亿条数据到 Hive 表
    • 4.2 场景二:持续每秒生产 10 万条数到消息队列
  • 5.思考

1.什么是 FlinkSQL ?

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持 ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如 过滤聚合连接转换 等。它还提供了 窗口操作时间处理复杂事件处理 等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。它是 Flink 最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

在这里插入图片描述
使用 Flink SQL 处理数据的基本步骤:

  • 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。
  • 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。
  • 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。
  • 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink 会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过 Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

2.什么是 Connector ?

Flink Connector 是指 用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  • JDBC:用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。
  • JDQ:用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。
  • Elasticsearch:用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。
  • File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。
  • ……

还有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或 将处理结果导出到外部系统

在这里插入图片描述

3.DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

3.1 Demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (
	order_number BIGINT,
	price DECIMAL(32,2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3)
) WITH (
	'connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 input_table 的输入表。该表包含了 order_numberpricebuyerorder_time 四个字段。默认是 Random 随机生成对应类型的数据,生产速率是 10000 10000 10000 条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{
    "order_number": -6353089831284155505,
    "price": 253422671148527900374700392448,
    "buyer": {
        "first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2",
        "last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"
    },
    "order_time": "2023-09-21 06:22:29.618"
}
{
    "order_number": 1102733628546646982,
    "price": 628524591222898424803263250432,
    "buyer": {
        "first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c",
        "last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"
    },
    "order_time": "2023-09-21 06:23:01.69"
}

3.2 支持的类型

字段类型数据生成方式
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

3.3 连接器属性

属性是否必填默认值类型
描述
connectorrequired(none)String‘datagen’
rows-per-secondoptional 10000 10000 10000Long数据生产速率
number-of-rowsoptional(none)Long指定生产的数据条数,默认是不限制
fields.#.kindoptionalrandomString指定字段的生产数据的方式 random 还是 sequence
fields.#.minoptional(Minimum value of type)(Type of field)random 生成器的指定字段 # 最小值,支持数字类型
fields.#.maxoptional(Maximum value of type)(Type of field)random 生成器的指定字段 # 最大值,支持数字类型
fields.#.lengthoptional 100 100 100Integerchar / varchar / string / array / map / multiset 类型的长度
fields.#.startoptional(none)(Type of field)sequence 生成器的开始值
fields.#.endoptional(none)(Type of field)sequence 生成器的结束值

4.DataGen 使用案例

4.1 场景一:生成一亿条数据到 Hive 表

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) WITH ( 
	'connector'='datagen', 
	'number-of-rows'='100000000',
	'rows-per-second' = '100000'
);


CREATE CATALOG myhive
WITH (
	'type'='hive',
	'default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
	'partition.time-extractor.timestamp-pattern'='$dt',
	'sink.partition-commit.trigger'='partition-time',
	'sink.partition-commit.delay'='1 h',
	'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产 10 万条数据的时候,17 分钟左右就可以完成,当然我们可以通过增加 Flink 任务的计算节点、并行度、提高生产速率 rows-per-second 的值等来更快速的完成大数据量的生产。

4.2 场景二:持续每秒生产 10 万条数到消息队列

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price INT,
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_array ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH ( 
	'connector'='datagen', --连接器类型
	'rows-per-second'='100000', --生产速率
	'fields.order_number.kind'='random', --字段order_number的生产方式
	'fields.order_number.min'='1', --字段order_number最小值
	'fields.order_number.max'='1000', --字段order_number最大值
	'fields.price.kind'='sequence', --字段price的生产方式
	'fields.price.start'='1', --字段price开始值
	'fields.price.end'='1000', --字段price最大值
	'fields.col_array.element.length'='5', --每个元素的长度
	'fields.col_map.key.length'='5', --map key的长度
	'fields.col_map.value.length'='5' --map value的长度
);

CREATE TABLE jdqsink1 (
	order_number BIGINT,
	price DECIMAL(32, 2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_ARRAY ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH (
	'connector'='jdq',
	'topic'='jrdw-fk-area_info__1',
	'jdq.client.id'='xxxxx',
	'jdq.password'='xxxxxxx',
	'jdq.domain'='db.test.group.com',
	'format'='json'
);

INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;

5.思考

通过以上案例可以看到,通过 Datagen 结合其他连接器可以模拟各种场景的数据。

  • 性能测试:我们可以利用 Flink 的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应。
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性。
  • 数据完整性测试:我们通过 Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink 任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1395009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

进程间通信之匿名管道通信

每一次的努力都是自我成长的一步&#xff0c;坚持不懈的付出会铺就通向成功的道路。文章目录 进程间通信的介绍进程间通信的发展进程间通信的分类进程间通讯的本质资源&#xff1f;这个资源谁提供的&#xff1f; 管道什么是管道匿名管道管道小总结现在我给大家看一下管道通信的…

SCDN高防如何保护你的服务器

随着互联网的发展&#xff0c;如今的网络世界&#xff0c;虽说给我们的衣食住行带来了非常大的便利&#xff0c;但同时它存在着各种各样的威胁。比如我们的网站&#xff0c;如果不做任何保护措施的话&#xff0c;就很容易被DDoS、CC等攻击堵塞网络、窃取目标系统的信息&#xf…

这种网页要小心!注意你的账号密码泄露!

目录 H5是泄露账号和数据的重要渠道 代码混淆是最佳的安全保护手段 基于AI的自适应代码混淆 我们经常见到各类H5海报&#xff0c;产品展示、活动促销、招聘启事等。H5不仅能够无缝地嵌入App、小程序&#xff0c;还可以作为一个拥有独立链接地址的页面&#xff0c;直接在PC端打开…

AIOps案例 | 携手擎创,中邮信科成功打造新一代IT智能运维平台,收益明显!

为推动邮政信息科技体制改革、提升信息科技自主供给能力&#xff0c;在原信息技术局、数据中心和软开中心基础上&#xff0c;中邮信息科技&#xff08;北京&#xff09;有限公司(简称“中邮信科公司”)经中国邮政集团有限公司于2019年5月被批准成立。 公司主要负责邮政各类信息…

[论文阅读]DeepFusion

DeepFusion Lidar-Camera Deep Fusion for Multi-Modal 3D Object Detection 用于多模态 3D 物体检测的激光雷达相机深度融合 论文网址&#xff1a;DeepFusion 论文代码&#xff1a;DeepFusion 摘要 激光雷达和摄像头是关键传感器&#xff0c;可为自动驾驶中的 3D 检测提供补…

通过OpenIddict设计一个授权服务器03-客户凭证流程

在本部分中&#xff0c;我们将把 OpenIddict 添加到项目中&#xff0c;并实施第一个授权流程&#xff1a;客户端凭证流。 添加 OpenIddict 软件包 首先&#xff0c;我们需要安装 OpenIddict NuGet 软件包 dotnet add package OpenIddict dotnet add package OpenIddict.AspN…

Android CarService源码分析

文章目录 一、CarService的基本架构1.1、Android Automative整体框架1.2、Framework CarService1.3、目录结构1.3.1、CarService1.3.2、Car APP 二、CarService的启动流程2.1、系统启动后在SystemServer进程中启动CarServiceHelperService2.2、CarService启动 三、CarService源…

浅聊雷池社区版(WAF)的tengine

雷池社区版是一个开源的免费Web应用防火墙&#xff08;WAF&#xff09;&#xff0c;专为保护Web应用免受各种网络攻击而设计。基于强大的Tengine&#xff0c;雷池社区版提供了一系列先进的安全功能&#xff0c;适用于中小企业和个人用户。 Tengine的故事始于2011年&#xff0c;…

Android-三方框架的源码

ARouter Arouter的整体思路是moduelA通过中间人ARouter把路由信息的存到仓库WareHouse&#xff1b;moduleB发起路由时&#xff0c;再通过中间人ARouter从仓库WareHouse取出路由信息&#xff0c;这要就实现了没有依赖的两者之间的跳转与通信。其中涉及Activity的跳转、服务prov…

微信原生小程序上传与识别以及监听多个checkbox事件打开pdf

1.点击上传并识别 组件样式<van-field border"{{ false }}" placeholder"请输入银行卡卡号" model:value"{{bankNo}}" label"卡号"><van-icon bindtap"handleChooseImg" slot"right-icon" name"sca…

网工内推 | 运维工程师,最高10K*15薪,思科认证优先

01 乐歌股份 招聘岗位&#xff1a;服务器运维工程师 职责描述&#xff1a; 1、负责公司云上云下所有服务器的日常运维工作&#xff0c;包括应用部署、巡检、备份、日志、监控&#xff0c;故障处理&#xff0c;性能优化等&#xff0c;保障公司相关系统稳定运行。 2、为开发、测…

【linux】粘滞位.yum

粘滞位 1.为什么我们普通用户可以删掉别人的文件&#xff08;包括root&#xff09;?合理吗&#xff1f; 2.删除一个文件和目标文件有关系吗&#xff1f; 没关系&#xff0c;和所处的目录有关系。 1.我们先以root身份创建一个目录&#xff0c;接着在这个目录下创建一个文件 2…

LLM之幻觉(二):大语言模型LLM幻觉缓减技术综述

LLM幻觉缓减技术分为两大主流&#xff0c;梯度方法和非梯度方法。梯度方法是指对基本LLM进行微调&#xff1b;而非梯度方法主要是在推理时使用Prompt工程技术。LLM幻觉缓减技术&#xff0c;如下图所示&#xff1a; LLM幻觉缓减技术值得注意的是&#xff1a; 检索增强生成&…

【开发篇】五、文章内容审核接口的内存问题优化

文章目录 1、初始实现思路&#xff1a;Async注解新开一个线程去审核2、改进思路一&#xff1a;加LinkedBlockingQueue阻塞队列3、改进思路二&#xff1a;RabbitMQ4、总结 背景&#xff1a;文章微服务中有一个文章审核接口&#xff0c;接口内又调用阿里云的内容安全接口进行文字…

【运维】WSL1如何升级到WSL2

升级WSL1到WSL2&#xff1a;简便快捷版 在这篇博客中&#xff0c;我们将研究如何通过一种更简便的方式&#xff0c;将WSL1迅速升级到WSL2&#xff0c;避免官方文档的繁冗步骤。如果你觉得官方方法太过冗长&#xff0c;那么这里提供的步骤可能更适合你。 官网的办法是&#xf…

Cloudflare cdn 基本使用

个人版免费试用&#xff0c;一个邮箱账号只能缓存一个网站cdn。 地址&#xff1a;cloudflare.com 创建站点 在网站创建站点&#xff0c;填上你的域名 点击进入网站 缓存全局配置 可清除缓存&#xff0c;设置浏览器缓存时间 我设置了always online,防止服务器经常不稳定 缓…

Git学习笔记(第1章):Git概述

Git是一个免费的、开源的分布式版本控制系统&#xff0c;可以快速高效地处理从小型到大型的各种项目。 Git易于学习&#xff0c;占地面积小&#xff0c;性能极快。它具有廉价的本地库&#xff0c;方便的暂存区域和多个工作流分支等特性。其性能优于Subversion、CVS、Perforce 和…

Docker本地私有仓库搭建配置指导

一、说明 因内网主机需要拉取镜像进行Docker应用&#xff0c;因此需要一台带外主机作为内网私有仓库来提供内外其他docker业务主机使用。参考架构如下&#xff1a; 相关资源&#xff1a;加密、Distribution registry、Create and Configure Docker Registry、Registry部署、D…

Ivanti Connect Secure 曝两大零日漏洞,已被大规模利用

威胁情报公司Volexity发现&#xff0c;影响 Ivanti 的 Connect Secure VPN 和 Policy Secure 网络访问控制 (NAC) 设备的两个零日漏洞正在被大规模利用。自1月11日开始&#xff0c;多个威胁组织在大范围攻击中利用CVE-2023-46805身份验证绕过和CVE-2024-21887命令注入漏洞。 V…

Joern环境的安装(Windows版)

Joern环境的安装(Windows版) 网上很少有关于Windows下安装Joern的教程&#xff0c;而我最初使用也是装在Ubuntu虚拟机中&#xff0c;这样使用很占内存&#xff0c;影响体验感。在Windows下使用源码安装Joern也是非常简单的过程&#xff1a; 提前需要的本地环境&#xff1a; …