flink的带状态的RichFlatMapFunction函数使用

news2024/11/24 16:02:55

背景

使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换,而且这种用法非常常见,根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换,本文就来简单举个例子说明下RichFlatMapFunction的使用方法

RichFlatMapFunction使用示例

下面的例子的输入是不用name下的count数量值,当本次name的数量和前一次name的数量相差超过配置的阈值100时,打印出来一条告警日志,详细代码如下:

package wikiedits.func.state;

import java.util.Objects;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
 * Tuple2<String, Integer> 是输入的数据类型 String 是监控到异常值后的输出数据类型
 */
public class MyRichFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Integer>, String> {

    // 键值分区状态,对应每个name一个值
    ValueState<StateEntity> nameState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 创建一个键值分区状态
        ValueStateDescriptor<StateEntity> state = new ValueStateDescriptor<>("nameState", StateEntity.class);
        nameState = getRuntimeContext().getState(state);
    }

    @Override
    public void flatMap(Tuple2<String, Integer> input, Collector<String> collector) throws Exception {
        // 判断状态值是否为空(状态默认值是空)
        if (Objects.isNull(nameState.value())) {
            StateEntity sFalg = new StateEntity(input.f0, input.f1);
            nameState.update(sFalg);
            return;
        }
        // 和上一次的状态值比较
        StateEntity value = nameState.value();
        if (Math.abs(value.count - input.f1) > 100) {
            collector.collect(new String("监控到异常值,名称: " + input.f0 + " 上次的值:" + value + " 本次的值:" + input));
        }
        value.setName(input.f0);
        value.setCount(input.f1);
        // 更新状态值
        nameState.update(value);
    }


}

package wikiedits.func.state;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class RichFlatMapFunctionTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置数据源,一共三个元素
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                for (int i = 1; i < Integer.MAX_VALUE; i++) {
                    // 只有XXX,YYY,ZZZ三种name
                    String name = (0 == i % 3) ? "XXX" : ((i % 3 == 1) ? "YYY" : "ZZZ");
                    int count = RandomUtils.nextInt(0, 1000);
                    // 使用当前时间作为时间戳
                    long timeStamp = System.currentTimeMillis();
                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, count), timeStamp);
                    // 每发射一次就延时1秒
                    Thread.sleep(5000);
                }
            }

            @Override
            public void cancel() {}
        });
        dataStream.keyBy((f) -> {
            return f.f0;
        }).flatMap(new MyRichFlatMapFunction()).print();

        env.execute();
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }

}

结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1185654.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一台电脑生成两个ssh,绑定两个GitHub账号

背景 一般一台电脑账号生成一个ssh绑定一个GitHub&#xff0c;即一一对应的关系&#xff01;我之前有一个账号也配置了ssh&#xff0c;但是我想经营两个GitHub账号&#xff0c;当我用https url clone新账号的仓库时&#xff0c;直接超时。所以想起了配置ssh。于是有了今天这篇…

UG画弹簧模型教程

我们通常做的弹簧大多数都圆柱形的&#xff0c;如果要创建弹簧弯曲的形状也是可以的&#xff0c;这里介绍怎样通过样条曲线做弯曲样式来生成弹簧的技巧。 UG怎么画已经折弯的弹簧模型? 1、先新建一个模型文件&#xff0c;进入草图&#xff0c;绘制一条样条曲线&#xff0c;样…

深入理解指针:【探索指针的高级概念和应用二】

目录 一&#xff0c;数组参数、指针参数 1.一维数组传参 2.二维数组传参 3.一级指针传参 4.二级指针传参 二&#xff0c;函数指针 三&#xff0c;函数指针数组 &#x1f342;函数指针数组的用途&#xff08;转移表&#xff09;&#xff1a; 四&#xff0c;指向函数指针…

Git 代码库 gogs 部署私服及 https 配置手册

背景 玩了一下 Git &#xff0c;想到一个问题&#xff1a;企业内部怎么用 Git 呢&#xff1f;仓库哪里来呢&#xff1f; 理一理 Git 及其相关产品的区别&#xff1a; Git 分布式版本管理工具。GitHub 和 Gitee &#xff0c;基于 Git 的互联网代码托管平台&#xff0c;一个是…

【小技巧】WPS统计纯汉字(不计标点符号)

【小技巧】WPS统计纯汉字&#xff08;不计标点符号&#xff09; 首先&#xff0c;CtrlF打开查找页面&#xff1a; 选择“高级搜索”&#xff0c;然后勾选“使用通配符”&#xff0c;然后在“查找内容”后面输入&#xff1a;[一-﨩]。注意&#xff1a;一定要带“[]”和“-”且…

FreeRTOS_空闲任务

目录 1. 空闲任务详解 1.1 空闲任务简介 1.2 空闲任务的创建 1.3 空闲任务函数 2. 空闲任务钩子函数详解 2.1 钩子函数 2.2 空闲任务钩子函数 3. 空闲任务钩子函数实验 3.1 main.c 空闲任务是 FreeRTOS 必不可少的一个任务&#xff0c;其他 RTOS 类系统也有空闲任务&a…

Android MotionLayout

MotionLayout exends ConstraintLayout(动画框架 过渡) View动画 API1 属性动画API11 过渡动画API18 root.width RootViewWidth TransitionManager.beginDelayedTransition(view) 过渡动画 可以改变其大小和流畅性 Fade 可以改变透明度 通过TrasitinManager管理 Go:动态替…

adb and 软件架构笔记

Native Service&#xff0c;这是Android系统里的一种特色&#xff0c;就是通过C或是C代码写出来的&#xff0c;供Java进行远程调用的Remote Service&#xff0c;因为C/C代码生成的是Native代码&#xff08;机器代码&#xff09;&#xff0c;于是叫Native Service。 native服务…

Linux C语言进阶-D14指针函数

指针函数&#xff1a;指一个函数的返回值为地址量的函数 <数据类型>* <函数名称>&#xff08;<参数说明>&#xff09; { 语句序列; } 返回值&#xff1a;全部变量的地址、静态变量的地址、字符串常量的地址、堆上的地址 注意&#xff1a;不可返回局部变量…

Redis Java 开发简单示例

文章目录 一、概述二、Jedis 开发示例2.1 导入 maven 依赖2.2 使用连接池读写2.3 使用集群读写2.4 完整示例代码2.5 测试集群的搭建 三、Lettuce 开发示例3.1 导入 maven 依赖3.2 读写数据 四、Spring Boot Redis 开发示例4.1 导入 maven 依赖4.2 配置Redis服务地址4.3 基于 Re…

52基于MATLAB的希尔伯特Hilbert变换求包络谱

基于MATLAB的希尔伯特Hilbert变换求包络谱&#xff0c;对原始信号进行初步滤波&#xff0c;之后进行包络谱分析。可替换自己的数据进行优化。程序已调通&#xff0c;可直接运行。 52的尔伯特Hilbert变换包络谱 (xiaohongshu.com)

混沌系统在图像加密中的应用(基于哈密顿能量函数的混沌系统构造1.1)

混沌系统在图像加密中的应用&#xff08;基于哈密顿能量函数的混沌系统构造1.1&#xff09; 前言一、基于广义哈密顿系统的一类混沌系统构造1.基本动力学特性分析2.数值分析 待续 前言 本文的主题是“基于哈密顿能量函数的混沌系统构造”&#xff0c;哈密顿能量函数是是全文研…

【Git】快速入门安装及使用git与svn的区别常用命令

一、导言 1、什么是svn&#xff1f; SVN是Subversion的简称&#xff0c;是一个集中式版本控制系统。与Git不同&#xff0c;SVN没有分布式的特性。在SVN中&#xff0c;项目的代码仓库位于服务器上&#xff0c;团队成员通过向服务器提交和获取代码来实现版本控制。SVN记录了每个文…

Go 接口:nil接口为什么不等于nil?

本文主要内容:深入了解接口类型的运行时表示层。 文章目录 一、Go 接口的地位二、接口的静态特性与动态特性2.1 接口的静态特性与动态特性介绍2.2 “动静皆备”的特性的好处 三、nil error 值 ! nil四、接口类型变量的内部表示第一种&#xff1a;nil 接口变量第二种&#xff1a…

JDBC(二)

第4章 操作BLOB类型字段 4.1 MySQL BLOB类型 MySQL中&#xff0c;BLOB是一个二进制大型对象&#xff0c;是一个可以存储大量数据的容器&#xff0c;它能容纳不同大小的数据。 插入BLOB类型的数据必须使用PreparedStatement&#xff0c;因为BLOB类型的数据无法使用字符串拼接写…

Hundred Finance 攻击事件分析

背景知识 Hundred Finance 是 fork Compound 的一个借贷项目&#xff0c;在2023/04/15遭受了黑客攻击。攻击者在发起攻击交易之前执行了两笔准备交易占据了池子&#xff0c;因为发起攻击的前提是池子处于 empty 的状态&#xff08;发行的 hToken 数量为 0&#xff09;。 准备交…

​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​

软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】 课本里章节里所有蓝色字体的思维导图

CAN 协议常见面试题总结

0.讲一下CAN通讯的过程 第一段&#xff1a;需要发送的通讯设备&#xff0c;先发送一个显性电平0&#xff0c;告诉其他通讯设备&#xff0c;需要开始通讯。 第二段&#xff1a;就是发送仲裁段&#xff0c;其中包括ID帧和数据帧类型&#xff0c;告诉其他通讯设备&#xff0c;需…

记录一次校园CTF--wp

一.第一题简单nc 这题直接nc 地址端口即可得到flags没有套路 二.第二题pwn:ezstack 这是一题栈溢出题目&#xff0c;查看保护&#xff1a; 没有开启PIE&#xff0c;运行下查看效果&#xff1a; 题目是一个文字购物游戏。 接着扔进IDA中分析&#xff1a; 在主函数中我们找到…

C++ string赋值和添加值

在MFC中使用C的string&#xff0c;要先#include <string>&#xff0c;然后&#xff0c;std::string s2("") 这样就可以了&#xff1b; void CStrnewView::OnDraw(CDC* pDC) {CStrnewDoc* pDoc GetDocument();ASSERT_VALID(pDoc);// TODO: add draw code for n…