大数据-玩转数据-Flink状态编程(上)

news2024/11/27 4:15:22

一、Flink状态编程

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
Flink的状态管理是它的优势之一。

二、什么是状态

在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。

流式计算分为无状态计算和有状态计算两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。

在简单聚合、窗口聚合、处理函数的应用,都会有状态的身影出现。在Flink这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时能正确地恢复,这就需要一套完整的管理机制来处理所有状态。

三、为什么需要管理状态

下面的几个场景都需要使用流处理的状态功能:
去重: 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测: 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合: 对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
更新机器学习模型: 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

四、Flink中的状态分类

Managed State
状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩
状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等
使用场景 绝大数Flink算子。

Raw State
状态管理方式 用户自己管理
状态数据结构 字节数组: byte[]
使用场景 所有算子

从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
在我们平时的使用中Managed State已经足够我们使用。

在这里插入图片描述
对Managed State继续细分,它又有2种类型
Operator State(算子状态)
Keyed State(键控状态)

Operator State
适用用算子类型: 可用于所有算子: 常用于source, sink,
例如:FlinkKafkaConsumer
状态分配:一个算子的子任务对应一个状态
创建和访问方式: 实现CheckpointedFunction或ListCheckpointed(已经过时)接口
横向扩展 :并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量
支持的数据结构: ListState,UnionListStste和BroadCastState

Keyed State
适用用算子类型: 只能用于用于KeyedStream上的算子
状态分配 :一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式:重写RichFunction, 通过里面的RuntimeContext访问w
横向扩展 :并发改变, State随着Key在实例间迁移
支持的数据结构:ValueState, ListState,MapState ReduceState, AggregatingState

五、算子状态的使用

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。

注意: 算子子任务之间的状态不能互相访问
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。

Flink为算子状态提供三种基本数据结构:
列表状态(List state),将状态表示为一组数据的列表

联合列表状态(Union list state),也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。

广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

三种状态的实现
在这里插入图片描述

六、键控状态的使用

键控状态是根据输入数据流中定义的键(key)来维护和访问的,只能用于KeyedStream(keyBy算子处理之后)。相同key的所有数据都会访问相同的状态。
键控状态支持的数据类型
在这里插入图片描述
注意:
a)所有的类型都有clear(), 清空当前key的状态
b)这些状态对象仅用于用户与状态进行交互.
c)状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
d)从状态获取的值与输入元素的key相关

七、状态后端

状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
状态后端主要负责两件事:
本地(taskmanager)的状态管理
将检查点(checkpoint)状态写入远程存储

状态后端的分类及配置
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
在这里插入图片描述
MemoryStateBackend
内存级别的状态后端(默认),
存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
特点:快速, 低延迟, 但不稳定
使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用
FsStateBackend
存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统(hdfs)中.
使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境

八、案例列表状态

package com.lyh.flink09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;

public class state_programe1_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.socketTextStream("hadoop100",9999)
                .map(new MyMapFunctin())
                .print();
        env.execute();

    }
    public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {
        private Long count = 0L;
        private ListState<Long> state;
        @Override
        public Long map(String value) throws Exception {
            count++;
            return count;
        }
        // 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initialize.....");
            state = context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Long>("state",Long.class));

                for (Long c : state.get()) {
                    count += c;
                }
            }

        // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshot.....");
            state.clear();
            state.add(count);
        }
    }
}

九、案例广播状态

package com.lyh.flink09;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class state_broad1_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment()
                .setParallelism(3);
        DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);
        DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);


        MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);
        // 广播流
        BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);
        dataStream
                .connect(broadcastStream)
                .process(new BroadcastProcessFunction<String, String, String>() {
                    @Override
                    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        // 从广播状态中取值, 不同的值做不同的业务
                        ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
                        if ("1".equals(state.get("switch"))) {
                            out.collect("切换到1号配置....");
                        } else if ("0".equals(state.get("switch"))) {
                            out.collect("切换到0号配置....");
                        } else {
                            out.collect("切换到其他配置....");
                        }
                    }

                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
                        // 把值放入广播状态
                        state.put("switch", value);
                    }
                })
                .print();

        env.execute();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/969181.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

顺式元件热图+柱状图

写在前面 本教程来自粉丝投稿&#xff0c;主要做得是顺式元件的预测和热图绘制。类似的教程&#xff0c;在我们基于TBtools做基因家族分析也做过&#xff0c;流程基本一致。我们前期的教程&#xff0c;主要是基于TBtools&#xff0c;本教程主要是基于纯代码,也是值得学习收藏的…

【人工智能】—_线性分类器、感知机、损失函数的选取、最小二乘法分类、模型复杂性和过度拟合、规范化

文章目录 Linear predictions 线性预测分类线性分类器感知机感知机学习策略损失函数的选取距离的计算 最小二乘法分类求解最小二乘分类矩阵解法一般线性分类模型复杂性和过度拟合训练误差测试误差泛化误差复杂度与过拟合规范化 Linear predictions 线性预测 分类 从具有有限离…

2022年03月 C/C++(七级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 第1题&#xff1a;红与黑 有一间长方形的房子&#xff0c; 地上铺了红色、 黑色两种颜色的正方形瓷砖。你站在其中一块黑色的瓷砖上&#xff0c; 只能向相邻的黑色瓷砖移动。 请写一个程序&#xff0c; 计算你总共能够到…

Aqs的CyclicBarrier。

今天我们来学习AQS家族的“外门弟子”&#xff1a;CyclicBarrier。 为什么说CyclicBarrier是AQS家族的“外门弟子”呢&#xff1f;那是因为CyclicBarrier自身和内部类Generation并没有继承AQS&#xff0c;但在源码的实现中却深度依赖AQS家族的成员ReentrantLock。就像修仙小说…

Java 复习笔记 - 学生管理系统篇

文章目录 学生管理系统一&#xff0c;需求部分需求分析初始菜单学生类添加功能删除功能修改功能查询功能 二&#xff0c;实现部分&#xff08;一&#xff09;初始化主界面&#xff08;二&#xff09;编写学生类&#xff08;三&#xff09;编写添加学生方法&#xff08;四&#…

ref 操作 React 定时器

秒表 需要将 interval ID 保存在 ref 中&#xff0c;以便在需要时能够清除计时器。 import { useRef, useState } from "react";const SecondWatch () > {const [startTime, setStartTime] useState<any>(null);const [now, setNow] useState<any>…

Elasticsearch中RestClient使用

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

如何将自己的镜像使用 helm 部署

本文分别从如下几个方面来分享一波 如何将自己的镜像使用 helm 部署 简单介绍一下 helm 使用自己写 yaml 文件的方式在 k8s 中部署应用 使用 helm 的方式在 k8s 中部署应用 简单介绍一下 helm Helm 是 Kubernetes 的包管理器&#xff0c;在云原生领域用于应用打包和分发 Hel…

12. 微积分 - 梯度积分

Hi,大家好。我是茶桁。 上一节课,我们讲了方向导数,并且在最后留了个小尾巴,是什么呢?就是梯度。 我们再来回看一下但是的这个式子: [ f x f y

信息系统项目管理师(第四版)教材精读思维导图-第八章项目整合管理

请参阅我的另一篇文章&#xff0c;综合介绍软考高项&#xff1a; 信息系统项目管理师&#xff08;软考高项&#xff09;备考总结_计算机技术与软件专业技术_铭记北宸的博客-CSDN博客 本章思维导图PDF格式 本章思维导图XMind源文件 目录 8.1 管理基础 8.2 管理过程 8.3 制定项…

LRU算法 vs Redis近似LRU算法

LRU(Least Recently Use)算法&#xff0c;是用来判断一批数据中&#xff0c;最近最少使用算法。它底层数据结构由Hash和链表结合实现&#xff0c;使用Hash是为了保障查询效率为O(1)&#xff0c;使用链表保障删除元素效率为O(1)。 LRU算法是用来判断最近最少使用到元素&#xf…

最短路Dijkstra,spfa,图论二分图算法AYIT---ACM训练(模板版)

文章目录 前言A - Dijkstra Algorithm0x00 算法题目0x01 算法思路0x02 代码实现 B - 最长路0x00 算法题目0x01 算法思路0x02 代码实现 C - 二分图最大匹配0x00 算法题目0x01 算法思路0x02 代码实现 D - 搭配飞行员0x00 算法题目0x01 算法思路0x02 代码实现 E - The Perfect Sta…

企业架构LNMP学习笔记11

Nginx配置文件的介绍&#xff1a; #nginx子进程启动用户 #user nobody; #子进程数量 一般调整为cpu核数或者倍数 worker_processes 1; #错误日志定义 #error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info;#进程pid 存储文件…

ISO/IEC/ITU标准如何快速查找(三十九)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药. 更多原创,欢迎关注:Android…

C++中的语法知识虚继承和虚基类

多继承(Multiple Inheritance)是指从多个直接基类中产生派生类的能力,多继承的派生类继承了所有父类的成员。尽管概念上非常简单,但是多个基类的相互交织可能会带来错综复杂的设计问题,命名冲突就是不可回避的一个。 多继承时很容易产生命名冲突,即使我们很小心地将所有类…

UDP和TCP协议报文格式详解

在初识网络原理(初识网络原理_蜡笔小心眼子&#xff01;的博客-CSDN博客)这篇博客中,我们简单的了解了一下TCP/IP五层网络模型,这篇博客将详细的学习一下五层网络模型中传输层的两个著名协议:UDP和TCP 目录 一, 传输层的作用 二, UDP 1,UDP协议的特点 2,UDP报文格式 三, TC…

【数据结构】如何设计循环队列?图文解析(LeetCode)

LeetCode链接&#xff1a;622. 设计循环队列 - 力扣&#xff08;LeetCode&#xff09; 目录 做题思路 只开辟 k 个空间 多开一个空间 代码实现 1. 循环队列的结构 2. 开辟空间 3. 判断空 4. 判断满 5. 队尾插入数据 6. 队头删除数据 7. 获取队头元素 8. 获取队尾元…

ElasticSearch第二讲:ES详解 - ElasticSearch基础概念

ElasticSearch第二讲&#xff1a;ES详解 - ElasticSearch基础概念 在学习ElasticSearch之前&#xff0c;先简单了解下ES流行度&#xff0c;使用背景&#xff0c;以及相关概念等。本文是ElasticSearch第二讲&#xff0c;ElasticSearch的基础概念。 文章目录 ElasticSearch第二讲…

【GoldenDict】win11牛津高阶英汉双解词典安装使用方法

【词典资源】 1&#xff08;本文章使用的版本&#xff09;牛津高阶&#xff08;第10版 英汉双解&#xff09; V11.8&#xff1a; https://pan.baidu.com/s/11272Cldde_2UttQkWS2MlQ 提取码&#xff1a;0p3j 2&#xff08;另一版本&#xff09;第十版 v13.2&#xff1a; ht…

信息系统项目管理师(第四版)教材精读思维导图-第九章项目范围管理

请参阅我的另一篇文章&#xff0c;综合介绍软考高项&#xff1a; 信息系统项目管理师&#xff08;软考高项&#xff09;备考总结_计算机技术与软件专业技术_铭记北宸的博客-CSDN博客 本章思维导图PDF格式 本章思维导图XMind源文件 目录 9.1 管理基础 9.2 管理过程 9.3 规划范…