大数据(9f)Flink富函数RichFunction

news2024/11/26 22:40:51

文章目录

  • 1、概述
  • 2、示例
    • 2.1、普通函数
    • 2.2、富函数
      • 2.2.1、获取富函数的运行时上下文
  • 3、源码截取
    • 3.1、RichFunction
    • 3.2、RuntimeContext

1、概述

Rich Function,译名富函数,和普通函数相比,多了:
生命周期( openclose方法)
获取函数的运行时上下文( getRuntimeContext方法)
本文版本
Flink:1.14.6
Java:1.8
Scala:2.12

2、示例

2.1、普通函数

MapFunction接口 继承了 Function接口

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class H1 {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);
        //获取数据源
        DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
        //普通函数
        dss.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer i) {
                return i * i;
            }
        }).print();
        //执行
        env.execute();
    }
}

测试结果

2.2、富函数

RichMapFunction抽象类 继承了 AbstractRichFunction抽象类
AbstractRichFunction抽象类 实现了 RichFunction接口

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class H1 {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);
        //获取数据源
        DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
        //普通函数
        dss.map(new RichMapFunction<Integer, Integer>() {
            @Override
            public void open(Configuration parameters) {
                System.out.println("生命周期开始");
            }

            @Override
            public void close() {
                System.out.println("生命周期结束");
            }

            @Override
            public Integer map(Integer i) {
                return i * i;
            }
        }).print();
        //执行
        env.execute();
    }
}

测试结果

2.2.1、获取富函数的运行时上下文

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class H1 {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(2);
        //获取数据源
        DataStreamSource<Integer> dss = env.fromElements(1, 2, 3);
        //普通函数
        dss.map(new RichMapFunction<Integer, Integer>() {
            @Override
            public void open(Configuration parameters) {
                System.out.println("生命周期开始");
                //获取运行时上下文
                RuntimeContext context = getRuntimeContext();
                System.out.println("子任务索引:" + context.getIndexOfThisSubtask());
            }

            @Override
            public void close() {
                System.out.println("生命周期结束");
            }

            @Override
            public Integer map(Integer i) {
                return i * i;
            }
        }).print();
        //执行
        env.execute();
    }
}

并行度设置为2,测试结果

3、源码截取

3.1、RichFunction

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;

@Public
public interface RichFunction extends Function {
    /** 函数的生命周期 */
    void open(Configuration parameters) throws Exception;

    void close() throws Exception;

    /** 获取函数运行时上下文对象,对象信息包含:并行度、作业ID、任务名、子任务索引… */
    RuntimeContext getRuntimeContext();

    /** 设置函数的运行时上下文。在创建函数的并行实例时,此方法被框架调用 */
    void setRuntimeContext(RuntimeContext t);
}

3.2、RuntimeContext

/**
 * RuntimeContext 包含 函数的运行时上下文信息
 * 函数的每个并行实例都有1个context对象,通过访问对象,可获取 静态信息、累加器、广播变量、状态
 */
@Public
public interface RuntimeContext {

    JobID getJobId();

    String getTaskName();

    int getIndexOfThisSubtask();

    int getAttemptNumber();

    String getTaskNameWithSubtasks();

    // ------------------------------------ 累加器 -------------------------------------------

    <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);

    <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);

    @PublicEvolving
    IntCounter getIntCounter(String name);

    @PublicEvolving
    LongCounter getLongCounter(String name);

    @PublicEvolving
    DoubleCounter getDoubleCounter(String name);

    @PublicEvolving
    Histogram getHistogram(String name);

    // ---------------------------------- 广播变量 -------------------------------------------

    @PublicEvolving
    boolean hasBroadcastVariable(String name);

    <RT> List<RT> getBroadcastVariable(String name);

    <T, C> C getBroadcastVariableWithInitializer(
            String name, BroadcastVariableInitializer<T, C> initializer);

    // -------------------------- 访问【状态】的方法 --------------------------------

    @PublicEvolving
    <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

    @PublicEvolving
    <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

    @PublicEvolving
    <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);

    @PublicEvolving
    <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
            AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

    @PublicEvolving
    <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/27336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DGL学习笔记——第二章 消息传递范式

提示&#xff1a;DGL用户指南学习中 文章目录一、内置函数和消息传递API二、编写高效的消息传递代码总结消息传递是实现GNN的一种通用框架和编程范式。它从聚合与更新的角度归纳总结了多种GNN模型的实现。 假设节点 &#x1d463; 上的的特征为 &#x1d465;&#x1d463;∈ℝ…

Java(八)----多线程

1. 线程的基本概念 1.1 进程 任何的软件存储在磁盘&#xff08;硬盘&#xff09;中,运行软件的时候,OS&#xff08;操作系统&#xff09;使用IO技术,将磁盘中的软件的文件加载到内存,程序才能运行。 &#xff08;进程是从硬盘到内存&#xff09; 进程的概念 &#xff1a; 应…

Marked.js让您的文档编辑更加轻松自如!

低代码应用平台——kintone既可以保留更改记录&#xff0c;也有流程管理的功能&#xff0c;在公司内部分享会议记录啊、wiki等文档或学习资料等时非常的便利。 kintone还有丰富的文本编辑框&#xff0c;可以对内容进行编辑提高易读性。但是还是有不少人觉得如果能够使用Markdo…

19.[Python GUI] PyQt5中的模型与视图框架-基本原理

PyQt中的模型与视图框架 一、Qt中模型与视图相关的类 二、模型与视图的基本原理 MVC把图形界面分为三个部分&#xff1a;模型&#xff08;Model&#xff09;&#xff0c;视图&#xff08;View&#xff09;和控制器&#xff08;Controller&#xff09;&#xff0c; 模型&#x…

Git大型文件存储

什么是 Git LFS&#xff1f; Git 是跟踪代码库演变和与同行高效协作的绝佳选择。但是&#xff0c;当您要跟踪的存储库非常大时会发生什么&#xff1f; 如果您考虑一下&#xff0c;存储库变得庞大的主要原因大致有两个&#xff1a; 他们积累了非常非常长的历史&#xff08;项目…

【C】文件操作fopen与fclose

目录 函数 1.fopen 2.fclose 3.freopen 函数 头文件 #include<stdio.h> 1.fopen FILE *fopen(const char *restrict dilename,const char* restrict mode); 作用&#xff1a;打开文件参数&#xff1a; 第一个是含有要打开文件名的字符串&#xff08;"文件名…

肝了一周的八万字Redis实战篇

Redis实战篇 文章目录Redis实战篇开篇导读1. 短信登录2. 商户查询缓存3. 优惠卷秒杀4. 附近的商户5. UV统计6. 用户签到7. 好友关注8. 达人探店一、短信登录1. 导入黑马点评项目1.1 导入SQL1.2 有关当前模型1.3 导入后端项目1.4 导入前端工程1.5 运行前端项目2. 基于Session实现…

【杂谈】快来看看如何使用LGMT这样的蜜汁缩写来进行CodeReview吧!

文章目录一、先从一个梗开始说起吧&#xff01;二、什么是LGTM&#xff1f;2.1 LGTM 是什么意思&#xff1f;2.2 蹭梗品牌故事2.3 虚假的CodeReview三、Code Review中的蜜汁缩写四、参考链接一、先从一个梗开始说起吧&#xff01; 公司最近在如火如荼的开展CodeReview活动&…

Reinforcement learning from demonstration through shaping(Wiewiora 2003)

摘要 强化学习中的一个重要问题是如何以有原则的方式整合专家知识&#xff0c;尤其是当我们扩展到现实世界的任务时。在本文中&#xff0c;我们提出了一种在不改变最优策略的情况下将任意建议纳入强化学习agent的奖励结构的方法。 该方法将 Ng 等人 (1999) 提出的基于势能的塑…

1530_AURIX_TriCore内核架构_通用寄存器以及系统寄存器

全部学习汇总&#xff1a; GreyZhang/g_tricore_architecture: some learning note about tricore architecture. (github.com) 继续看一下内核手册&#xff0c;这次了解一下通用寄存器以及系统寄存器。最近一段时间最复位以及trap困扰了许久&#xff0c;看看这里面是否能够获取…

黄河水稻山东智慧 国稻种芯·中国水稻节:济南泉城米袋子

黄河水稻山东智慧 国稻种芯中国水稻节&#xff1a;济南泉城米袋子 新闻中国采编网 中国新闻采编网 谋定研究中国智库网 中国农民丰收节国际贸易促进会 国稻种芯中国水稻节 中国三农智库网-功能性农业农业大健康大会报道&#xff1a;又是一年春天。济南黄河流域吴家堡水稻田旁的…

数据挖掘与机器学习:数据挖掘算法原理与实践:数据预处理

目录 第一关&#xff1a;标准化 任务描述&#xff1a; 相关知识&#xff1a; 一、为什么要进行标准化 二、Z-score标准化 三、Min-max标准化 四、MaxAbs标准化 编程要求&#xff1a; 测试说明&#xff1a; 第二关&#xff1a;非线性转换 任务描述&#xff1a; 相关知…

【LeetCode】878. 第 N 个神奇数字

题目描述 一个正整数如果能被 a 或 b 整除&#xff0c;那么它是神奇的。 给定三个整数 n , a , b &#xff0c;返回第 n 个神奇的数字。因为答案可能很大&#xff0c;所以返回答案 对 109 7 取模 后的值。 示例 1&#xff1a; 输入&#xff1a;n 1, a 2, b 3 输出&#xff…

stm32cubemx hal学习记录:FreeRTOS互斥量

一、互斥量 1、互斥量用于互锁&#xff0c;可以充当资源保护的令牌&#xff0c;当一个任务希望访问某个资源时&#xff0c;它必须先获取令牌&#xff0c;当任务使用完资源后&#xff0c;必须返还令牌&#xff0c;以便其他任务可以访问该资源。 2、互斥量一般用于临界资源保护…

[附源码]计算机毕业设计JAVA教师档案管理系统

[附源码]计算机毕业设计JAVA教师档案管理系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybati…

LabVIEW通信-CAN

文章目录CANcan总线特点位定位与同步标识符检验滤波报文传输类型CAN CAN属于OSI的物理层与数据链路层 can总线特点 网络各节点之间的数据通信实时性强 开发周期短 结构简单&#xff08;只有两根线与外部相连&#xff0c;内部继承了错误探测和管理模块&#xff09; 数据通信没…

Spring IOC

一、为什么要使用Spring&#xff1f; Spring 是个java企业级应用的开源开发框架。Spring主要用来开发Java应用&#xff0c;但是有些扩展是针对构建J2EE平台的web应用。Spring 框架目标是简化Java企业级应用开发&#xff0c;并通过POJO为基础的编程模型促进良好的编程习惯。 为…

RPA案例|云扩助力保险行业开启超自动化运营新阶段

近五年&#xff0c;全球平均保险深度总体呈小幅平稳下降趋势&#xff0c;2021年中国保险深度为4.15%&#xff0c;全球平均保险深度为5.96%&#xff0c;而美国、英国等发达国家的保险深度则保持在10%以上&#xff0c;中国保险深度仍然具有很大的上升空间。 为进一步拓展增量空间…

软考是什么?---2023年软考最全解析

软考是什么&#xff1f; 软考全称为&#xff1a;计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff0c;发证机构为国家人力资源和社会保障部、工业和信息化部&#xff0c;简称为&#xff1a;人社部和工信部。中日韩三国互认&#xff0c;证书的含金量很高…

Navicat操作数据库与Mysql常见命令操作实战

一&#xff1a;Navicat下载与安装 官网下载链接&#xff1a;Navicat 下载完后直接安装即可 二&#xff1a;数据库的连接 1.打开Navicat软件&#xff0c;点击左上角连接按钮&#xff0c;选择mysql数据库 输入完成后双击连接名&#xff0c;连接成功后&#xff0c;小海豚变绿色 …