Flink 有状态流式处理

news2024/12/22 15:05:45

传统批次处理方法

在这里插入图片描述
【1】持续收取数据(kafka等),以window时间作为划分,划分一个一个的批次档案(按照时间或者大小等);
【2】周期性执行批次运算(Spark/Stom等);

传统批次处理方法存在的问题:
在这里插入图片描述
【1】假设计算每小时出现特定事件的转换次数(例如:1、2…),但某个事件正好处于1到2之间就尴尬了。需要将1点处理一半的结果带到2点这个批次中进行运算。而这个划分跟我们事件发生的时间也是有误差的。
【2】在分布式多线程的情况下,如果接收到事件的顺序颠倒了,又该如何处理?

理想方法

累积状态:表示过去历史接收过的所有事件。可以是计数或者机器模型等等。
在这里插入图片描述我们要处理一个持续维护的状态时,最适合的方式就是状态流处理(累积状态和维护状态+时间,是不是该收的结果都收到了)
在这里插入图片描述
【1】有状态流处理作为一种新的持续过程范式,处理连续的数据;
【2】产生准确的结果;
【3】实时可用的结果仅为模型的自然结果;

流式处理

流处理系统或者流处理引擎都是数据驱动的,而不是定期或者人为的去触发。数据也没有物理边界。
在这里插入图片描述
一般系统都会把操作符放上去,等待数据的到来进行计算。如下是一个逻辑模型(DAG
在这里插入图片描述

分散式流式处理

在这里插入图片描述
【1】从数据中选择一个属性作为key对输入流进行分区;
【2】使用多个实例,每个实例负责部分key的存储,根据Hash值,相同的key一定落在相同的分区进行处理;
【3】根据流式数据处理的DAG模型,有对应如下的分布式流处理的实例模型。例如A算子拥有两个实例,上游的实例节点可能同时与下游的一个或多个节点进行传输。这些实例根据系统或者人为的因素分配在不同的节点之上。节点与节点之间数据传输也会涉及网络之间的占用。本地的传输就不需要走网络
在这里插入图片描述

有状态分散式流式处理

定义一个变量X,输出结果依据这个X,这个X就是一个状态。有状态分散的流失处理引擎,当状态可能会累计非常大。当key比较多的时候就会超出单台节点的负荷量。这个x就应该有状态后台使用memory去维护它。【数据倾斜】
在这里插入图片描述

状态容错(State Fault Tolerance)

状态挂了,如何确保状态拥有精确一次(exactly-onceguarantee)的容错保证?就是通过定期的快照+事件日志位置。我们先假设一个简单的场景,如下,一个队列在不断的传输数据。单一的process在处理数据。这个process没处理一个数据都会累计一个状态。如何为这个process做一个容错。做法就是没处理完一笔,更改完状态之后,就做一次快照(包含它处理的数据在队列中的位置和它处理到的位置以及当时的状态进行对比)
在这里插入图片描述
举个例子:如下我处理到第二笔数据,我就会记录下第二个位置在进入process之前的信息(位置X+状态@X
在这里插入图片描述
当进入process处理的时候出现了fail时,Flink就会根据上一次的位置+状态进行恢复。

如何在分散式场景下替多个拥有本地状态的运算子产生一个全域一致的快照(global consistent snapshot)?
方式一:更改该任务流过的所有运算子的状态。比较笨,有一个副作用,就是我处理完这笔数据,它应该就到了一个process,我本应该做其他数据的处理了,可是为了全局一致性快照就会停止前面和当前的process的运算来保证全局一致性。
在这里插入图片描述

分散式状态容错

通过checkpoint实现分散式状态容错
在这里插入图片描述
每一个运算子它本地都有一个维护一个状态,当要产生一个检查点(checkpoit)的时候,都会将这个检查点存储在一个更小的分布式文件系统DFS中。当出现某个算子fail之后,就会从所有的checkpoint中获取所有算子的上一个状态进行恢复。把消息队列的位置也进行恢复。也就是多线程工作,每一个任务在DFS中就可以看作一个线程,它们数据存储的key就是这个任务,每一个算子的处理状态都会按照处理顺序添加进去。

分布式快照(Distributed Snapshots)

更重要是时如何在不中断运算的前提下生成快照?其实就是给每一个任务标记一个checkpoint n不同的任务这个n是不同的,相同的任务在不同的算子里面它是相同的。具体我们把这个分解后看看。
在这里插入图片描述
【1】如下图,当我们从数据源获取数据的时候,其实我们已经开始有状态了,这个时候我们可以把任务处理的整个过程抽象成如下图中的一张表。
在这里插入图片描述
【2】首先是数据源的状态,就是数据在操作前的一个位置offset进行快照存储,如下图所示:
在这里插入图片描述
【3】当获取到数据源之后,就进入算子中进行处理,此时就会对数据进入之前的状态进行checkpoint。记录一个savepoint
在这里插入图片描述
【4】在最后一次操作前(输出)也会记录checkpoint。在这个过程中,其实前面的算子也在产生不同的 checkpoint n-1 等。如果要进行恢复使用的话,必须是一个complete完整的Checkpoint。只有部分数据的Checkpoint是不能使用的。
在这里插入图片描述

状态维护(State Management)

本地维护的这个状态可能非常非常大。后端的管理系统一般使用内存维护这些状态。
在这里插入图片描述
Flink提供了两种状态后端:JVM Heap状态后端,适合比较小的状态,量不要很大。当运算子action要读取状态的时候,都是一个Java对象的read或者write。当要产生一个检查点的时候,需要将每个运算子的本地状态数据通过序列化存储在DFS中,
在这里插入图片描述
当状态非常大的时候就不能使用JVM Heap的时候,就需要用到RocksDB。当算子需要读取的时候本地state的时候需要进行序列化操作从而节省内存,同时,当需要进行checkpointDFS时,也少了序列化的步骤。它也会给本地存储一份,当fail的时候就可以很快恢复,提高效率。
在这里插入图片描述

Event-time 处理

EventTime是事件产生的时间。
在这里插入图片描述
下面是一张,程序处理时间与事件发生时间的时间差的一张对比图来更好的理解EventTime
在这里插入图片描述

Event-Time 处理

也就是说我们要统计的3-4点之间的数据,程序4点结束这个执行不是根据window时间,而是根据event-Time
在这里插入图片描述

Watermarks

Flinkwatermarks实现Event-Time功能的。在Flink里面也属于一个特殊事件,精髓是当某个运算子收到一个带有时间戳twatermark后就不会再收到任何小于该时间戳的事件了。也就是当window需要统计4点的数据时,例如我们每5分钟发一次watermark,那么当window收到4.05watermark的时候才会去统计4点之前的数据(下一次)。如果4.05收到了4点之前的数据的话,Flink1.5会把这个事件输出到旁路输出(side output),你可以获取出来,进行处理。目前有一个问题就是:如果某个Stream Partition 没有输入了,也就没有Watermarks。那么window就没办法进行处理了。当多个数据流的watermarks不相同的时候,Flink会取最小的watermarks进行运算。可以在接收到资源的时候通过代码设置watermarks

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

在这里插入图片描述

状态保存与迁移(Savenpoints and Job Migration)

可以想成:一个手动产生的检查点(CheckPoint):保存点记录某一个流失应用中的所有运算中的状态。当触发SavePoint之后,Flink提供了两种选择停止消费或者继续运算,根据场景定义。
在这里插入图片描述
执行停止之前,产生一个保存点。就可以解决上面提到的3个问题。
在这里插入图片描述
从保存点恢复新的执行,这个时候,例如我们重启花了30分钟,这段事件kafka还在不断的接收新的数据。恢复之后,Flink就需要从当时记录的kafka位置赶上最新的位置。这个时候利用Event-Time处理新的数据都是事件发生时的数据,这个时候再跟程序执行的时间比较就更能体现Event-time的价值。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1305813.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue3-04-reactive() 响应式失效的三种情况

1.替换对象后导致失效 简单理解 &#xff1a; 变量指向的对象换了&#xff0c;对原来的对象&#xff0c;当然没有了响应式的效果了。// 声明 变量let obj1 reactive({name:"第一个对象"})// 改变 变量的指向obj1 reactive({name:"第二个对象的属性"})co…

三天精通Selenium Web 自动化 - Selenium(Java)环境搭建 (new)

0 背景 开发工具idea代码管理mavenjdk1.8webdriver chrome 1 chromedriver & chrome chromedriver和chrome要对应上&#xff1a; chomedriver下载地址&#xff1a;淘宝镜像 这里用的是 chromedriver88-0-4324-96.zipchrome下载地址&#xff1a;如何降级和安装旧版本的C…

【计算机网络基础3】ARP/RARP协议、路由选择协议和TCP/IP协议

1、ARP/RARP协议 地址解析协议&#xff0c;即ARP&#xff08;Address Resolution Protocol&#xff09;&#xff0c;是根据IP地址获取物理地址的一个TCP/IP协议。主机发送信息时将包含目标IP地址的ARP请求广播到网络上的所有主机&#xff0c;并接收返回消息&#xff0c;以此确…

HTTP深度解析:构建高效与安全网络的关键知识

1. HTTP基础及其组件 我首先想和大家分享的是HTTP的基础知识。HTTP&#xff0c;即超文本传输协议&#xff0c;是互联网上最常用的协议之一。它定义了浏览器和服务器之间数据交换的规则&#xff0c;使得网页内容可以从服务器传输到我们的浏览器上。想象一下&#xff0c;每当你点…

kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

目录 1- 单播模式&#xff0c;只有一个消费者组2- 广播模式&#xff0c;多个消费者组3- Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。 kafka中partition…

Linux实用操作(超级实用)

Linux实用操作篇-上篇&#xff1a;Linux实用操作-上篇-CSDN博客 Linux实用操作篇-下篇&#xff1a;Linux实用操作篇-下篇-CSDN博客 一、各类小技巧&#xff08;快捷键&#xff09; 1.1 ctrl c 强制停止 Linux某些程序的运行&#xff0c;如果想要强制停止它&#xff0c;可以…

Redis缓存异常问题,常用解决方案总结

前言 Redis缓存异常问题分别是&#xff1a;1.缓存雪崩。2.缓存预热。3.缓存穿透。4.缓存降级。5.缓存击穿&#xff0c;以 及对应Redis缓存异常问题解决方案。 1.缓存雪崩 1.1、什么是缓存雪崩 如果缓存集中在一段时间内失效&#xff0c;发生大量的缓存穿透&#xff0c;所有…

【MySQL】MySQL数据库基础--什么是数据库/基本使用/MySQL架构/存储引擎

文章目录 1.什么是数据库2.主流数据库3.基本使用3.1MySQL安装3.2连接服务器3.3服务器管理3.4服务器&#xff0c;数据库&#xff0c;表关系3.5使用案例3.6数据逻辑存储 4.MySQL架构5.SQL分类6.存储引擎6.1什么是存储引擎6.2查看存储引擎6.3存储引擎对比 1.什么是数据库 对于回答…

MySQL笔记-第18章_MySQL8其它新特性

视频链接&#xff1a;【MySQL数据库入门到大牛&#xff0c;mysql安装到优化&#xff0c;百科全书级&#xff0c;全网天花板】 文章目录 第18章_MySQL8其它新特性1. MySQL8新特性概述1.1 MySQL8.0 新增特性1.2 MySQL8.0移除的旧特性 2. 新特性1&#xff1a;窗口函数2.1 使用窗口…

【LeetCode刷题-树】--144.二叉树的前序遍历

144.二叉树的前序遍历 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right) …

『npm』一条命令快速配置npm淘宝国内镜像

&#x1f4e3;读完这篇文章里你能收获到 一条命令快速切换至淘宝镜像恢复官方镜像 文章目录 一、设置淘宝镜像源二、恢复官方镜像源三、查看当前使用的镜像 一、设置淘宝镜像源 npm config set registry https://registry.npm.taobao.org服务器建议全局设置 sudo npm config…

HarmonyOS使用Tabs组件实现页面切换

Tabs组件的使用 概述 在我们常用的应用中&#xff0c;经常会有视图内容切换的场景&#xff0c;来展示更加丰富的内容。比如下面这个页面&#xff0c;点击底部的页签的选项&#xff0c;可以实现“首页”和“我的” 两个内容视图的切换。 ArkUI开发框架提供了一种页签容器组件…

Pytorch中Group Normalization的具体实现

Group Normalization (GN) 是一种用于深度神经网络中的归一化方法&#xff0c;它将每个样本划分为小组&#xff0c;并在每个小组内进行标准化。与批归一化&#xff08;Batch Normalization&#xff09;不同&#xff0c;Group Normalization 不依赖于小批量数据&#xff0c;因此…

【Hadoop_04】HDFS的API操作与读写流程

1、HDFS的API操作1.1 客户端环境准备1.2 API创建文件夹1.3 API上传1.4 API参数的优先级1.5 API文件夹下载1.6 API文件删除1.7 API文件更名和移动1.8 API文件详情和查看1.9 API文件和文件夹判断 2、HDFS的读写流程&#xff08;面试重点&#xff09;2.1 HDFS写数据流程2.2 网络拓…

[Angular] 笔记1:开发设置 , 双向绑定

1 设置开发环境 1.1 安装 node 下载 node&#xff0c;因为要使用 npm 工具&#xff0c;教程中使用 Angualr 14, 最新版 node 20 用不了&#xff0c;安装 node 16 就可以。 1.2 安装 Angular CLI Angular CLI 是用于创建 Angular 工程的工具集&#xff0c;使用如下命令&…

HTML实现页面

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>工商银行电子汇款单</title> </head> &…

数据挖掘目标(客户价值分析)

import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as snsIn [2]: datapd.read_csv(r../教师文件/air_data.csv)In [3]: data.head()Out[3]: Start_timeEnd_timeFareCityAgeFlight_countAvg_discountFlight_mileage02011/08/182014/0…

网络基础(八):路由器的基本原理及配置

目录 1、路由概述 2、路由器 2.1路由器的工作原理 2.2路由器的转发原理 3、路由表 3.1路由表的概述 3.2路由表的形成 4、静态路由配置过程&#xff08;使用eNSP软件配置&#xff09; 4.1两个静态路由器配置过程 4.2三个静态路由器配置过程 5、默认路由配置过程 5.…

得帆云为玉柴打造CRM售后服务管理系统,实现服务全过程管理|基于得帆云低代码的CRM案例系列

广西玉柴机器股份有限公司 广西玉柴机器股份有限公司始建于1992年&#xff0c;是国内行业首家赴境外上市的中外合资企业&#xff0c;产品远销亚欧美非等180多个国家和地区。公司总部设在广西玉林市&#xff0c;下辖11家子公司&#xff0c;生产基地布局广西、江苏、安徽、山东等…

收发货拥抱新技术,纵行科技推ZETag方案实现更精准的自动识别

对于制造及物流企业来说&#xff0c;收发货是影响其运营效率和成本控制的关键因素。然而传统的收发货管理高度依赖人工核对&#xff0c;比如目前国内汽车工厂零件到货验收主要采用人工方式&#xff0c;验收人员需根据送货看板进行数量清点&#xff0c;确认无误后用手持终端扫描…