3. 自定义datasource

news2024/11/28 18:59:01

在这里插入图片描述

一、自定义DataSource

​ 自定义DataSource有两大类:单线程的DataSource和多线程的DataSource

  • 单线程:继承 SourceFunction

  • 多线程:继承 ParallelSourceFunction,继承 RichParallelSourceFunction(可以有其他的很多操作)

    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, RichParallelSourceFunction, SourceFunction}
    
    //1. 单线程
    class MyNoParallelSource1 extends SourceFunction[Long] {
    
      var count = 1L;
      var isRunning = true
    
      override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        while(isRunning) {
          ctx.collect(count)
          count += 1
          Thread.sleep(1000)
        }
      }
    
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    
    //2. 多线程
    class MyNoParallelSource2 extends ParallelSourceFunction[Long] {
    
      var count = 1L
      var isRunning = true
    
      override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        while(isRunning) {
          ctx.collect(count)
          count += 1
          Thread.sleep(1000)
        }
      }
    
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    
    /**3. 多线程使用RichFunction的方式
     * 提供了open和close方法,可以用于打开和释放资源
     */
    class MyNoParallelSource3 extends RichParallelSourceFunction[Long] {
    
      var count = 1
      var isRunning = true
    
      override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        while (isRunning) {
          ctx.collect(count)
          count += 1
          Thread.sleep(1000)
        }
      }
    
      override def cancel(): Unit = {
        isRunning = false
      }
    
      override def open(parameters: Configuration): Unit = super.open(parameters)
    
      override def close(): Unit = super.close()
      
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/992289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

origin中optimal cluster安装报错解决

1.在安装之后程序运行出错,报错信息为缺少numpy包。解决办法:打开窗口-脚本窗口,用pip安装numpy,其他缺少的包可用同样方法解决。 2.有的包在外部python中才有,通过origin无法下载。解决办法:在连接-python…

WIFI版本云音响设置教程阿里云平台版本

文章目录 WIFI本云音响设置教程介绍一、申请设备三元素1.登录阿里云物联网平台2.创建产品3.设置产品参数4.添加设备5.获取三元素 二、设置设备三元素1.打开MQTTConfigTools2.计算MQTT参数3.使用windows电脑的WIFI连接到设备热点4.设置参数5.配置设备连接路由器 三、阿里云物联网…

有始有终!

作者 | 磊哥 来源 | Java中文社群(ID:javacn666) 转载请联系授权(微信ID:GG_Stone) 开始是立秋之日(8.8 号),结束是白露之时(9.8 号)。 为期一月&…

Day60|leetcode 84.柱状图中最大的矩形

leetcode 84.柱状图中最大的矩形 题目链接:84. 柱状图中最大的矩形 - 力扣(LeetCode) 视频链接:单调栈,又一次经典来袭! LeetCode:84.柱状图中最大的矩形_哔哩哔哩_bilibili 题目概述 给定 n 个…

CSS3技巧36:backdrop-filter 背景滤镜

CSS3 有 filter 滤镜属性,能给内容,尤其是图片,添加各种滤镜效果。 filter 滤镜详见博文:CSS3中强大的filter(滤镜)属性_css3滤镜_stones4zd的博客-CSDN博客 后续,CSS3 又新增了 backdrop-filter 背景滤镜。 backdr…

卷积概念理解

卷积(convolution)最容易理解的解释_一点一点的进步的博客-CSDN博客 图像处理之卷积模式及C实现_利用卷积模型分类图片 c_扫地工的博客-CSDN博客 卷积的重要的物理意义是:一个函数(如:单位响应)在另一个函数(如&…

进程

目录 进程定义 进程与程序对比 进程分类 系统进程 用户进程 交互进程 批处理进程 守护进程 进程状态 进程组成 ​编辑正文段(text)和用户数据段 用户数据段 正文段 PCB进程控制块 进程标识信息 处理机状态 进程调度信息 进程控制信息 …

通达信自定义副图行业指标K线指标 HYZS_QD

行业指数:HY_INDEXC,NODRAW; DRAWKLINE(HY_INDEXH,HY_INDEXO,HY_INDEXL,HY_INDEXC); MA5:MA(HY_INDEXC,5),COLORWHITE; {MA10:MA(HY_INDEXC,10),COLORYELLOW,LINETHICK2}; DRAWTEXT_FIX(1,1,1,1,STRCAT(STRCAT(CON2STR(HY_INDEXADV,0),/),STRCAT(CON2STR(HY_INDEXDEC,0), ))),…

06_瑞萨GUI(LVGL)移植实战教程之驱动EC11旋转编码器(GPIO)

本系列教程配套出有视频教程,观看地址:https://www.bilibili.com/video/BV1gV4y1e7Sg 6. 驱动EC11旋转编码器(GPIO) 本次实验我们驱动EC11旋转编码器。 6.1 复制工程 上次实验得出的工程我们可以通过复制在原有的基础上得到一个新的工程。 如果你不清…

XCE18T4K1P40-FJJP40、F4Z1P40规格书(泰兴创航)

关于XCE18T4K1P40-FJJP40、F4Z1P40电连接器规格书 主要性能指标 工作温度:-55℃~200℃相对湿度:温度40℃2℃时达98%振动:频率10-2000Hz,加速度196m/s2冲击:加速度980m/s2机械寿命:5000次壳体材料&#xff1…

05_瑞萨GUI(LVGL)移植实战教程之添加LVGL库,对接显示和触摸驱动

本系列教程配套出有视频教程,观看地址:https://www.bilibili.com/video/BV1gV4y1e7Sg 5. 添加LVGL库,对接显示和触摸驱动 本次实验我们会融合前面实验的成果,添加LVGL库,对接显示和触摸驱动,让屏幕能显示…

金蝶云星空与泛微OA集成的方案落地与实践

打破信息孤岛,泛微OA集成的方案落地与实践 在现代企业内部,不同类型的业务系统和泛微OA平台层出不穷。企业需要找到一种高效的方法来整合和协同这些多样化的系统,同时将它们与泛微OA平台融合,以实现资源整合和高度协同的办公环境…

Win10下python的命令行启动和调用问题

Win10下python的命令行启动和调用问题 Win10下Python的启动问题解决办法 Win10下Python的启动问题 Win10下安装了python,但是命令行启动仍然显示Windows商店界面 同时在C:\Users\用户名\AppData\Local\Microsoft\WindowsApps目录下发现空的python3.exe文件 即便在…

【Java】基于SSM的单位人事管理系统

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用JSP技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…

向量数据库,能让AI再次起飞吗?

9月7-8日,深圳国际会展中心18号馆 来了,来了,腾讯面向产业互联网领域规格最高、规模最大、覆盖最广的年度科技盛会 -——- 腾讯全球数字生态大会。 9 月 7 日,我们将聚焦产业未来发展新趋势,针对云计算、大数据、人工…

Kafka3.0.0版本——消费者(分区的分配以及再平衡)

目录 一、分区的分配以及再平衡1.1、消费者分区及消费者组的概述1.2、如何确定哪个consumer来消费哪个partition的数据1.3、消费者分区分配策略 一、分区的分配以及再平衡 1.1、消费者分区及消费者组的概述 一个consumer group中有多个consumer组成,一个 topic有多…

单片机代码不变,hex却变了?

关注星标公众号,不错过精彩内容 作者 | strongerHuang 微信公众号 | strongerHuang 今天在技术交流群看到这么一个问题,大概意思就是:同一个代码工程(源码不变),因Keil版本不同,程序&#xff08…

更快更强更稳定:腾讯向量数据库测评

向量数据库:AI时代的新基座 人工智能在无处不在影响着我们的生活,而人工智能飞速发展的背后是需要对越来越多的海量数据处理,传统数据库已经难以支撑大规模的复杂数据处理。特别是大模型的出现,向量数据库横空出世。NVIDIA CEO黄…

sqlserver2012 bat脚本实现最大使用内存设置

前言 安装完成sqlserver之后,在运行过程中会无限制的占用电脑的内存,会影响到其他软甲的使用。 bat脚本 准备好bat脚本和sql文件之后,配置好数据库信息 直接双击即可 ECHO OFF REM 自动判断权限问题,主动获取管理员权限>…

中国有多少个省?【最新】

2023.09.09 中华人民共和国省级行政区是指中国现行的34个一级行政区, 包括23个省(河北、山西、黑龙江、吉林、辽宁、江苏、浙江、安徽、福建、江西、山东、河南、湖北、湖南、广东、海南、四川、贵州、云南、陕西、甘肃、青海、台湾)、 5个…