Flink 源算子之 DataGeneratorSource DataGenerator

news2025/1/11 19:59:23

目录

1、功能说明

2、API使用说明

3、代码示例


1、功能说明

        从Flink1.1开始提供了DataGen连接器,它提供了Source类的实现(可并行的源算子),用来生成测试数据,在本地开发或者无法访问外部系统(如kafka)时,它就会非常有用。

        DataGen连接器是内置的,不需要额外的依赖项。


2、API使用说明

方法定义:
public DataGeneratorSource(
        DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows)

参数说明:
        DataGenerator<T> generator     :  指定数据生成器对象
        long rowsPerSecond             :  指定数据发射速率(每秒发射的记录数),默认值为Long.MAX_VALUE
        @Nullable Long numberOfRows    :  指定指定输出数据的总行数(为null时,表示一直输出)

关于DataGenerator类
public interface DataGenerator<T> extends Serializable, Iterator<T>

功能说明:
   继承了Iterator,利用迭代器来构造测试数据 

3、代码示例

Flink版本说明:flink_1.13.0、scala_2.12

定义User类:

package com.baidu.bean

case class User(id: Long, name: String)

测试代码:

  test("DataGen 连接器") {

    // 1. 获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)

    // 自定义 DataGenerator实现类(用来随机生成User对象)
    val userGenerator = new DataGenerator[User]() {
      // 定义随机数数据生成器
      var generator: RandomDataGenerator = _

      // 初始化数据生成器
      override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {
        generator = new RandomDataGenerator
      }

      // 判断迭代器是否有值
      override def hasNext: Boolean = true

      // 生成随机字符串,并返回
      override def next(): User = {
        User(generator.nextLong(1, 99) // 生成1~99区间的随机整数
          , generator.nextHexString(4) // 生成4位字符串
        )
      }
    }

    // 自定义字符串数据生成器
    val stringGenerator = new DataGenerator[String]() {
      // 定义随机数数据生成器
      var generator: RandomDataGenerator = _

      // 初始化数据生成器
      override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {
        generator = new RandomDataGenerator
      }

      // 是否有下一个值
      override def hasNext: Boolean = true

      // 生成随机字符串,并返回
      override def next(): String = generator.nextHexString(3)
    }

    val dataGenSource = new DataGeneratorSource(
      userGenerator // 指定数据生成器
      , 2L // 指定发射速率(每秒发射的记录数)
      , null // 指定输出数据的总行数(为null时,表示一直输出)
    )

    // 将DataGeneratorSource做为数据源
    val ds = env.addSource(dataGenSource)

    println(s"并行度: ${ds.parallelism}")

    // 打印DataStream
    ds.print()

    // 出发程序执行
    env.execute()
  }

执行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/701241.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java连接数据库的5种方式

方式一直接导入第三方库驱动类 这种加载方式在jdbc入门时已经用过&#xff0c;这个driver属于第三方库&#xff0c;。为静态加载&#xff0c;灵活性差&#xff0c;依赖性抢 方式二使用反射机制获取 方式一和方式二代码 package com.hsp.edu;import com.mysql.cj.jdbc.Driver;i…

【FFmpeg实战】音频重采样

转载自原文地址&#xff1a; https://www.cnblogs.com/zjacky/p/16529648.html 重采样&#xff1a; 将音频进行SDL播放的时候&#xff0c;因为当前的SDL2.0不支持plannar格式&#xff0c;也不支持浮点型的&#xff0c;而最新的FFpemg会将音频解码为AV_SAMPLE_FMT_FLTP&#xf…

LORA模型原理详解+分层控制使用

一、前言 LoRA模型全称是&#xff1a;Low-Rank Adaptation of Large Language Models&#xff0c;可以理解为Stable-Diffusion中的一个插件&#xff0c;仅需要少量的数据就可以进行训练的一种模型。在生成图片时&#xff0c;LoRA模型会与大模型结合使用&#xff0c;从而实现对…

一张SSL证书支持绑定多个域名吗?

一张SSL证书可支持绑定多个不同类型的域名&#xff0c;选择多域名SSL证书&#xff08;SAN SSL&#xff09;或通配符SSL证书&#xff08;Wildcard SSL&#xff09;类型&#xff0c;就可以实现一张SSL证书绑定多个域名&#xff0c;但绑定的域名类型有些不同。 1、多域名SSL证书&a…

Golang语言介绍、环境搭建以及编译工具( CDN 加速代理)

Go 语言是非常有潜力的语言&#xff0c;是因为它的应用场景是目前互联网非常热门的几个领域&#xff0c;比如 WEB 开发、区块链开发、大型游戏服务端开发、分布式/云计算开发。国内比较知名的B 站就是用 Go 语言开发的&#xff0c;像 Goggle、阿里、京东、百度、腾讯、小米、36…

人工智能开发人员工作流程、看法、工具统计数据

人工智能开发人员工作流程、看法、工具统计数据 本文目录&#xff1a; 一、人工智能开发所需要的技能和知识 二、开发人工智能需要以下工具 2.1、开发过程中的人工智能工具调查 2.2、AI 工具情绪调查 2.3、AI 工具的优势调查 2.4、人工智能工具的准确性调查 2.5、开发工…

rails console (SQL)查找语句

find可以查找一个 where可以查找多个

Flink DataStream之从集合/文件读数据

从集合中读数据方式一 package test01;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.…

牛客·轻拍牛头

b[i]:输入 a[i]:计数 c[i]:答案void solve() {cin>>n;for(int i0;i<n;i){cin>>b[i];a[b[i]];}for(int i1;i<NN;i){if(a[i]){for(int ji;j<NN;ji)c[j]a[i];}}for(int i0;i<n;i){cout<<c[b[i]]-1<<\n;} }

【深度学习】contrastive loss与triplet loss

自己总结一下&#xff0c; 三元组如果正负样本足够开&#xff0c;距离足够远&#xff0c;loss为0&#xff0c;因为模型已经学的不错了&#xff0c;不需要继续学习。 最好的负样本是&#xff0c;model预测负样本的把握不太大的。 如果负样本是很难分的&#xff0c;例如d(a,p)>…

数据结构:Mysql索引原理(通俗易懂)

目录 前言正文索引结构-数组无序数组有序数组 索引数据结构-HashHash冲突-链式寻址法Hash冲突-再哈希法Hash冲突-开放地址法 索引数据结构-树二叉树平衡二叉树红黑树红黑树的特性红黑树如何减少旋转 B树B树 Mysql的索引一级索引二级索引 总结 前言 在工作中如果经常写业务代码…

信号链噪声分析4

目录 概要 整体架构流程 技术名词解释 技术细节 5.计算信号链的噪声 噪声谱密度 重点注意事项 有源滤波器配置 小结 概要 提示&#xff1a;这里可以添加技术概要 本文介绍对高速宽带宽信号链进行噪声性能理论分析的各个步骤。尽管选择了一个特 定信号链&#xff0c;…

传统商超巨头折戟即时零售

今年的618&#xff0c;包含沃尔玛、永辉、大润发、华润、物美、麦德龙、盒马、联华、家家悦、中百、华联在内10大商超巨头都集体“失声”了。 值得注意的是&#xff0c;今年618不仅各大商超没有公布相关数据&#xff0c;且在所有即时零售平台中&#xff0c;仅有商超到家的合作…

怎么用postman进行自动化接口测试,终于学到了

​ 目录 背景描述 创建一个GET请求 在pre-request scripts构建签名 脚本写在环境变量中 postman console的用法 Collection Runner 自动化API测试 创建接口的测试用例 选择并运行自动化接口测试 测试结果 总结&#xff1a; 背景描述 有一个项目要使用postman进行接…

元宇宙虚拟人物3d建模贯穿产业链数字化转型全链条

随着5G、AI、VR/AR、区块链、云计算、脑机等技术的发展&#xff0c;给构建这个虚拟的宇宙提供了技术支撑。今年以来&#xff0c;因为疫情等原因&#xff0c;大众在虚拟空间停留的时间变得更长&#xff0c;元宇宙概念重新火热。 元宇宙虚拟人的基本特征包括&#xff1a; 沉浸式体…

Android hook、检测及对抗相关

frida——hook 内存访问断点 环境&#xff1a;app&#xff1a;arm64 python 3.10 frida 15.2.2 简单的内存访问断点代码&#xff0c;可能还有些bug&#xff0c;根据apk需要自己改&#xff0c;下文为在apk中指定的地址调用函数时内存断点才被激活&#xff0c;以下需要…

【GESP】2023年03月图形化二级 -- 快乐时光

文章目录 快乐时光1. 准备工作2. 功能实现3. 设计思路与实现&#xff08;1&#xff09;角色、舞台背景设置a. 角色设置b. 舞台背景设置 &#xff08;2&#xff09;脚本编写a. 角色&#xff1a;小猫b. 角色&#xff1a;小猴 4. 评分标准 快乐时光 1. 准备工作 &#xff08;1&am…

CSS知识点汇总(十)--移动端适配

文章目录 怎么做移动端的样式适配&#xff1f;1、方案选择2. iPhoneX 适配方案 怎么做移动端的样式适配&#xff1f; 在移动端虽然整体来说大部分浏览器内核都是 webkit&#xff0c;而且大部分都支持 css3 的所有语法。但手机屏幕尺寸不一样&#xff0c;分辨率不一样&#xff0…

jenkins共享库配置及设计

jenkins共享库做模块封装时遇到的问题总结&#xff1a; 背景描述:使用jenkins共享库对SCM subversion操作进行封装时&#xff0c;使用了Checkout插件&#xff0c;生成的检出脚本代码为 checkout([$class: SubversionSCM, additionalCredentials: [], excludedCommitMessages: …

【Dashy安装使用】本地Linux 部署 Dashy 并远程访问

文章目录 简介1. 安装Dashy2. 安装cpolar3.配置公网访问地址4. 固定域名访问 转载自cpolar极点云文章&#xff1a;本地Linux 部署 Dashy 并远程访问 简介 Dashy 是一个开源的自托管的导航页配置服务&#xff0c;具有易于使用的可视化编辑器、状态检查、小工具和主题等功能。你…