大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction

news2025/1/10 8:03:53

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • FlinkDataStreamAPI 自定义输入源
  • 非并行源介绍与代码
  • 并行源介绍与代码

在这里插入图片描述

Rich并行源

基本介绍

在 Apache Flink 中,RichSourceFunction 是一种增强的源函数(Source Function),它允许开发者在定义源操作时,能够访问 Flink 的生命周期方法、状态管理、配置访问等更多功能。RichSourceFunction 是并行源的一个扩展,它继承自 RichFunction 接口,而 RichFunction 提供了更丰富的功能,比如访问运行时上下文、管理状态、以及在作业开始和结束时执行初始化或清理操作。

主要特点

  • 生命周期方法:RichSourceFunction 提供了 open() 和 close() 方法,分别在作业开始时和结束时调用。这允许你在数据读取前进行初始化操作(如打开连接、加载配置),以及在作业结束时进行清理工作(如关闭连接、释放资源)。
  • 访问运行时上下文:通过 getRuntimeContext() 方法,RichSourceFunction 可以访问 Flink 的运行时上下文,获取并行度信息、任务名称、指标管理器,以及与状态相关的操作。
  • 状态管理:RichSourceFunction 可以结合 Flink 的状态管理机制,保存和恢复状态。这对于需要在流处理中维护中间状态的源函数非常有用,尤其是在故障恢复时,状态可以帮助恢复到故障前的状态。
  • 并行执行:与普通的 SourceFunction 类似,RichSourceFunction 也可以通过设置并行度来并行执行,这使得它可以处理大规模的数据
    源。

状态管理

RichFunction 与 Flink 的状态管理系统高度集成,允许你在分布式环境中维护和管理操作符的中间状态。Flink 支持两种主要类型的状态:ValueState 和 ListState,以及更复杂的 MapState 和 ReducingState。

  • ValueState: 适用于需要保存单个值的场景,如计数器、标志位等。
  • ListState: 适用于需要保存多个值的场景,如窗口计算中的中间结果。
  • MapState: 适用于需要维护键值对的场景,特别是在进行复杂的数据关联或聚合时。
  • ReducingState: 适用于需要持续聚合数据的场景,比如计数、求和等。

示例代码

以下是一个使用 RichParallelSourceFunction 的简单示例,展示了如何在 Flink 中实现一个并行的、具有生命周期管理的源函数:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class MyRichParallelSource extends RichParallelSourceFunction<String> {

    private volatile boolean isRunning = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 在任务开始时执行初始化操作
        System.out.println("Task " + getRuntimeContext().getTaskName() + " is starting.");
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // 模拟数据流的产生
        while (isRunning) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect("Data from task " + getRuntimeContext().getIndexOfThisSubtask());
            }
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void close() throws Exception {
        // 在任务结束时执行清理操作
        System.out.println("Task " + getRuntimeContext().getTaskName() + " is closing.");
    }
}

代码解析

  • open() 方法:在任务开始时调用,适用于进行连接初始化、参数设置等操作。在这个方法中,你可以访问 Flink 的配置和运行时上下文。
  • run() 方法:实现数据源的核心逻辑,这个方法会在源函数启动后被调用。可以使用 ctx.collect() 方法将生成的数据发送到下游处理。
  • cancel() 方法:用于取消任务。当作业被取消或停止时,Flink 会调用这个方法,可以在这里做一些清理工作或者安全地停止数据生成。
  • close() 方法:在任务结束时调用,用于释放资源和进行清理操作。

注意事项

  • 状态一致性:在并行源中,如果需要维护状态,一定要注意状态的一致性和恢复机制,确保在作业恢复时可以正确地恢复数据源的状态。
  • 并行度设置:RichParallelSourceFunction 作为并行源,可以通过 setParallelism 方法设置并行度,确保根据任务的需求合理分配并行实例的数量。

RichParallelSource

package icu.wzk;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class RichParallelSourceRich extends RichParallelSourceFunction<String> {

    private long count = 1L;
    private boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (running) {
            count ++;
            ctx.collect(String.valueOf(count));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

RichParallelSourceTest

package icu.wzk;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

public class RichParallelSourceRichTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.getJavaEnv().addSource(new RichParallelSourceRich());
        data.print();
        env.execute("RichParallelSourceRichTest");
    }

}

运行结果

3> 10
5> 10
8> 10
6> 10
2> 10
4> 10
7> 10
1> 10
6> 11
5> 11
8> 11
2> 11
3> 11
4> 11
7> 11
1> 11
2> 12
3> 12
...

控制台输出结果如下所示:
在这里插入图片描述

为什么 Rich 类使用广泛

  • 生命周期管理:Rich 类提供了 open() 和 close() 方法,允许开发者在任务开始和结束时执行初始化和清理操作。这对于需要设置资源(如数据库连接、文件读写、外部服务连接)的操作非常有用。
  • 运行时上下文访问:通过 getRuntimeContext(),Rich 类可以访问任务的并行度信息、任务名称、子任务索引、状态管理等。对于需要根据任务上下文调整行为或需要跨并行实例共享状态的场景,这些信息是至关重要的。
  • 状态管理:RichFunction 可以方便地与 Flink 的状态管理结合使用。在状态丰富的应用场景(如需要维护中间计算结果、计数器、缓存等)的流处理中,Rich 类显得非常有用。
  • 性能监控:Rich 类允许开发者在 open() 方法中注册 Flink 的度量指标(Metrics),帮助监控和优化作业的性能。

什么时候不用 Rich 类

  • 简单操作:如果你只是需要进行简单的转换或过滤操作,没有复杂的初始化、状态管理或清理需求,那么 Rich 类的额外功能可能并不必要。
  • 高性能要求的场景:在一些对性能要求极高的场景中,尽量减少复杂的操作和额外的上下文访问,直接使用轻量级的 MapFunction、FilterFunction 等可能会有更好的性能表现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2097981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(五)vForm 动态表单文件上传、下载

系列文章目录 (一)vForm 动态表单设计器之使用 (二)vForm 动态表单设计器之下拉、选择 (三)vForm 动态表单解决下拉框无数据显示id问题 (四)vForm 动态表单自定义组件、属性 目录 系列文章目录 前言 一、文件上传 1.前端 2.后端 二、文件下载 1.前端 2.后端 总结 …

你可能被 Vue 中的 v-show 骗了,它并没有像你想的那样切换 display 的属性

你好同学&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注&#xff01; 在 Vue 中 v-show 是如何条件性地渲染内容的&#xff0c;你可能知道是通过切换 display 属性实现&#xff0c;但真的跟你想的一样吗&#xff1f; 一、你被骗了吗&#xff1f; v-show 和 v-…

JAVAEE初阶第三节——多线程进阶

系列文章目录 JAVAEE初阶第三节——多线程进阶 文章目录 系列文章目录JAVAEE初阶第三节——多线程进阶 一. 常见的锁策略1.乐观锁和悲观锁2. 轻量级锁和重量级锁3.自旋锁和挂起等待锁4. 普通互斥锁和读写锁5. 公平锁和非公平锁6.可重入锁和不可重入锁 二. synchronized的优化手…

GEE数据集:城市热岛强度 (UHII)

简介 城市热岛强度 (UHII) 数据集说明 城市热岛效应&#xff08;UHI&#xff09;的特点是城市地区局部变暖&#xff0c;是城市化对气候造成的一个重要后果。 传统的估算 UHI 强度&#xff08;UHII&#xff09;的方法受到一些限制&#xff0c;例如只关注晴空表面 UHII&#x…

windows11 任务栏 默认打开显示所有其他系统托盘图标

任务栏所有图标显示&#xff1a; ①&#xff0c;WINR键 ②&#xff0c;输入&#xff1a;shell:::{05d7b0f4-2121-4eff-bf6b-ed3f69b894d9} ③&#xff0c;设置&#xff1a;始终在任务栏显示所有图标和通知

【负载均衡】

一、生产者负载均衡 可解决以下问题 消息发送的容灾策略:您可以根据生产者负载均衡策略,明确当局部节点出现故障时,消息发送如何进行容灾切换。 消息发送的顺序性机制:通过生产者负载均衡策略,您可以进一步了解顺序消息发送时,如何保证相同消息组内消息的先后顺序。 消息…

《JavaEE进阶》----8.<SpringMVC实践项目:【简易对话留言板(数据存在内存中)】>

本篇博客讲解设计的一个网页版简易对话留言板。这个是将数据存在内存中。 创建了一个集合 List<MessageInfo> messageInfos new ArrayList<>(); 在这里面存入的数据。当服务器重新加载的时候&#xff0c;数据就消失了&#xff0c;下一个版本&#xff0c;是将数据存…

StateThreads 库使用

文章目录 需求介绍协程注意调度 部署环境搭建测试测试库是否正常测试TCP服务器 需求 最近在对网关模型的并发性能进行验证&#xff0c; 核心目标不仅是让服务器能够承载更多的请求&#xff0c; 还力求在协议栈的解析阶段实现极致的轻量化和无锁操作&#xff0c;从而大幅提升处理…

笔记:如何使用Process Explorer分析句柄泄露溢出问题

一、目的&#xff1a;如何使用Process Explorer分析句柄泄露溢出问题 使用 Process Explorer 分析句柄泄漏问题是一个非常有效的方法。句柄泄漏通常是由于应用程序在创建系统资源&#xff08;如文件、注册表项、GDI 对象等&#xff09;后没有正确释放这些资源。以下是使用 二、…

智尊助手V1.0.0定位打卡 定位摸鱼免root

去除了卡密验证&#xff0c;部分软件改不了&#xff0c;非常好用的虚拟定位软件&#xff0c;重点是不需要root权限就可以使用&#xff0c;操作也非常简单。 链接&#xff1a;https://pan.quark.cn/s/c92084a6cd84 &#x1f4c1;大小&#xff1a;37M &#x1f3f7;标签&#…

Linux如何ping整个网段

一、fping 命令详解 yum provides fping # 查找包名 yum -y install fping # 安装包二、fping 批量筛选存活IP脚本 vim /tmp/ip.sh !/bin/bash fping -g 10.121.52.1/24 >/tmp/ip.txt #输出 cd /tmp/ && cat ip.txt | grep "is alive&quo…

CC工具箱使用指南:【整库计算YSDM】

一、简介 这是一个批量计算【YSDM】的小工具。 一般的数据库要素或表格都有一个【YSDM】字段&#xff0c;用来标识要素类或表格。 【YSDM】的值通常是固定的&#xff0c;入库标准都会给定一个YSDM表&#xff0c;如下&#xff1a; 我们需要将表的内容保存或转换为excel格式&a…

惠中科技光伏清洗剂:绿色清洁,引领光伏行业新潮流

在当今全球能源转型的大潮中&#xff0c;光伏产业作为绿色能源的重要组成部分&#xff0c;正以前所未有的速度蓬勃发展。然而&#xff0c;随着光伏板在户外环境的长时间暴露&#xff0c;其表面不可避免地会积累灰尘、鸟粪、油污等污染物&#xff0c;严重影响光伏板的透光率和发…

如何为你的大模型应用选择最佳架构?六大模式全面解读

随着大模型&#xff08;如 GPT-4、BERT、GPT-3.5 等&#xff09;在自然语言处理、图像识别、医疗诊断等领域的广泛应用&#xff0c;如何构建高效、灵活的架构来支持大模型在复杂场景下的应用变得至关重要。本文将详细介绍几种常见的大模型应用架构设计模式&#xff0c;包括路由…

如何选择适合海外直播的网络?

随着全球化的推进&#xff0c;海外直播正成为企业、个人和机构日益关注的热点。无论是用于营销、推广还是与观众互动&#xff0c;海外直播为各种组织提供了更广泛的机会。然而&#xff0c;要确保直播的质量和用户体验&#xff0c;必须满足一系列网络要求。 1. 网络速度 要保证直…

C 语言基础 -- 函数/指针/结构体

本文介绍指针、函数和结构体 粉丝福利&#xff0c; 免费领取C/C 开发学习资料包、技术视频/项目代码&#xff0c;1000道大厂面试题&#xff0c;内容包括&#xff08;C基础&#xff0c;网络编程&#xff0c;数据库&#xff0c;中间件&#xff0c;后端开发/音视频开发/Qt开发/游戏…

html+css+js网页设计 翘珠宝微商城移动端20个页面

htmlcssjs网页设计 翘珠宝微商城移动端20个页面 网页作品代码简单&#xff0c;可使用任意HTML编辑软件&#xff08;如&#xff1a;Dreamweaver、HBuilder、Vscode 、Sublime 、Webstorm、Text 、Notepad 等任意html编辑软件进行运行及修改编辑等操作&#xff09;。 获取源码 …

ET6框架(十二)ET-EUI基本使用

文章目录 一、下载插件&#xff1a;二、使用插件例子三、使用规则四、公共UI组件五、脚本生成缝隙 ET-EUI是基于ET6.0版本拓展出来的一个套UI框架 一、下载插件&#xff1a; 首先我们需要下载&#xff0c;地址&#xff1a; GitHub - zzjfengqing/ET-EUI: 基于ET框架的UI模块 …

ESRI ArcGIS Pro 3.1.5新功能及安装教程和下载

ESRI ArcGIS Pro 3.1.5 主要新功能包括&#xff1a; 改进的数据编辑和管理&#xff1a;支持更多数据格式和更精细的属性表操作。增强的空间分析工具&#xff1a;新增和优化空间分析工具&#xff0c;提高数据分析效率。更好的3D可视化&#xff1a;改进3D渲染性能&#xff0c;支…

中国艺术孙溟㠭凿篆《无用之用》

孙溟㠭凿篆作品《无用之用》 这方作品是孙溟㠭先生用凿木的方式凿刻出来的&#xff0c;呈现出了凿痕的效果&#xff0c;与众不同。 孙溟㠭凿篆《无用之用》 孙溟㠭凿篆《无用之用》 万般皆有所用&#xff0c;取其长补余短&#xff0c;无用之用是为大用&#xff0…