Flink(java版)

news2025/4/18 21:26:31

watermark

时间语义和 watermark

注意:数据进入flink的时间:如果用这个作为时间语义就不存在问题,但是开发中往往会用处理时间
作为时间语义这里就需要考虑延时的问题。
如上图,数据从kafka中获取出来,从多个分区中获取,这时候时间肯定有乱序,这时候就需要使用事
件时间。

场景:游戏连续过五关,给予奖励
地铁里面玩游戏,连过三关断网了,二分钟过了八关。这时候是用处理时间还是事件时间呢?
处理时间的优势:牺牲一定的数据准确性,没有延迟

package com.atguigu.apitest.window;/**

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

public class WindowTest3_EventTimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //默认为当前机器的cpu的最大核数
        //env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(100);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型,分配时间戳和watermark
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        })
                // 乱序数据设置时间戳和watermark
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
        };

        // 基于事件时间的开窗聚合,统计15秒内温度的最小值
        SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .minBy("temperature");

        minTempStream.print("minTemp");
        minTempStream.getSideOutput(outputTag).print("late");

        env.execute();
    }
}
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718211,32.8
sensor_1,1547718212,37.1

注意:第一个窗口是[1547718195,1547718210);

sensor_1,1547718213,33
sensor_1,1547718224,32.1
sensor_1,1547718225,31.6
sensor_1,1547718226,21.2
sensor_1,1547718227,33.6

第二个窗口大小:第一个窗口是[1547718210,1547718225);

1.理想状态:
     来一条数据处理一条,每条数据代表对时间推进;如图到5之后就将【0,5)的窗口关闭并输出;

2.乱序状态:
     原因:网络延迟、分布式、分区导致乱序数据产生;
     网络延迟和分布式处理造成的乱序都是几十毫秒和几百毫秒的范围的差距;这将回造成大多数延迟
  数据集中在几十毫秒和几百毫秒的范围内;

3.解决方案:将时间事件放慢

flink的三重保证:
 1.设置watermaker将几百毫秒的数据全部输出;
 2.先输出一个近似的结果,但是不要关闭窗口后面延迟的时间还需要更新;
 3.当延时时间到了,窗口就关闭了;兜底方案使用侧输出流保证数据不丢失;

注意:数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经到
达了,因此,window 的执行也是由 Watermark 触发的。
6 3 2 5 4 1 
比如设置3秒的watermaker:
到达5:说明2秒之前的数据都到齐了,后面2,3都可以输出
到达6:说明3秒之前的数据都到齐了,大于等于3秒的数据才能输出

意义:watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太
小数据就不准确,需要通过具体的业务场景去平衡这个值;

watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太小,乱序数据
没有搞定,数据就不准确,需要通过具体的业务场景去平衡这个值;

如何找到watermaker:首先要了解乱序程度;
解决方案:通过机器学习构建一个模型,构建当前业务模型中的延迟状态的分布情况;

如图:大部分的延时数据都20ms和80ms之间的范围中,这时候设置80ms就搞定大部分乱序数据;
这时候还有很少的数据,如果对数据准确性要求比较高,这时候就需要设置窗口迟到机制去保证
数据的准备性;最后还有网络延迟的数据还是没有输出这时候就需要添加侧输出流作为兜底方案。

 watermark 生成问题

默认:来一条生产一条watermaker,如果短时间数据量比较大,会造成watermaker都一样造成资
源浪费;周期性添加watermaker:每隔一段时间更新一下watermaker 
周期性时间缺点:实时性不好;数据过于分散会造成资源浪费;

如何选择:看数据的分布,过于集中使用周期性生成模式,数据稀疏,使用默认的模型;

状态编程 

需求:我们可以利用 Keyed state,实现这样一个需求: 检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警

package com.atguigu.apitest.state;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.state
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/10 16:33
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName: StateTest3_KeyedStateApplicationCase
 * @Description:
 * @Author: wushengran on 2020/11/10 16:33
 * @Version: 1.0
 */
public class StateTest3_KeyedStateApplicationCase {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义一个flatmap操作,检测温度跳变,输出报警
        SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id")
                .flatMap(new TempChangeWarning(10.0));

        resultStream.print();

        env.execute();
    }

    // 实现自定义函数类
    public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>>{
        // 私有属性,温度跳变阈值
        private Double threshold;

        public TempChangeWarning(Double threshold) {
            this.threshold = threshold;
        }

        // 定义状态,保存上一次的温度值
        private ValueState<Double> lastTempState;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));
        }

        @Override
        public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
            // 获取状态
            Double lastTemp = lastTempState.value();

            // 如果状态不为null,那么就判断两次温度差值
            if( lastTemp != null ){
                Double diff = Math.abs( value.getTemperature() - lastTemp );
                if( diff >= threshold )
                    out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
            }

            // 更新状态
            lastTempState.update(value.getTemperature());
        }

        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}
sensor_1,1547718206,36.3
sensor_1,1547718206,37.9
sensor_1,1547718206,48
sensor_6,1547718201,15.4
sensor_6,1547718201,35
sensor_1,1547718226,36

 状态后端

状态后端: 1.本地的状态管理(如何存,上下文配置,怎么存,怎么写)  
         2.做快照容错,如何恢复数据

1. 测试环境:MemoryStateBackend
2. 生产环境:FsStateBackend 
3. 数据非常大时候:RocksDBStateBackend
state.backend: filesystem //默认使用FsStateBackend 
tate.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints 
//配置一个checkpoint的hdfs的存储路径

jobmanager.execution.failover-strategy: region //区域化重启

state.backend.incremental: false  //增量添加checkpoint

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/970349.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DES和3DES等常见加解密的关键要素---安全行业基础篇3

DES和3DES DES和3DES是对称加密算法&#xff0c;其加密和解密的关键要素包括&#xff1a; 密钥&#xff1a;DES和3DES使用相同长度的密钥进行加密和解密。DES使用56位密钥&#xff0c;而3DES可以使用112位或168位密钥。密钥是保护数据安全的关键&#xff0c;必须保持机密并只…

港联证券:为什么人们买涨不买跌?

在股票市场中&#xff0c;有一个普遍的现象是人们倾向于买涨不买跌。即使在市场出现明显下跌趋势时&#xff0c;大部分投资者也会选择继续持有股票或者进行买入操作&#xff0c;而在股票呈现明显上涨趋势时&#xff0c;却有更多的人涌入市场追涨杀跌。究其原因&#xff0c;可能…

微信小程序+Springboot实现订阅消息推送 (demo)

1. 开通订阅消息 2. 选用模板 订阅消息 - 公共模板库 - 选用 3. 选择关键词 勾选关键词 - 最多勾选5个 - 显示例子 4. 我的模板 5. 订阅号开发者-文档: 发送订阅消息 | 微信开放文档 6. 依赖 <dependency><groupId>com.alibaba.fastjson2</groupId><art…

淘宝API接口解析,实现获得淘宝商品快递费用

要获取淘宝商品快递费用&#xff0c;需要使用淘宝的开放平台API接口。以下是一个基本的示例&#xff0c;解析并实现获取淘宝商品快递费用的API接口。 首先&#xff0c;你需要访问淘宝开放平台并注册一个开发者账号。注册完成后&#xff0c;你需要创建一个应用并获取到API的权限…

滑动窗口实例7(串联所有单词的子串)

题目&#xff1a; 给定一个字符串 s 和一个字符串数组 words。 words 中所有字符串 长度相同。 s 中的 串联子串 是指一个包含 words 中所有字符串以任意顺序排列连接起来的子串。 例如&#xff0c;如果 words ["ab","cd","ef"]&#xff0c;…

iPhone勿扰模式如何设置?1分钟学会!

上班的时候手机信息一直“噔、噔、噔”的响&#xff0c;开了静音模式也没用&#xff0c;信息一来手机还是会一直震动&#xff0c;搞得我无法安心工作&#xff0c;还有什么方法可以将这些信息免打扰吗&#xff1f; iPhone手机有一个功能叫做【勿扰模式】&#xff0c;它能够帮助用…

多numa设备,如何看网卡插在哪个numa上

1.在Linux系统中&#xff0c;可以通过lstopo命令来查看系统的NUMA拓扑结构。这个命令通常随着hwloc包一起提供。 安装hwloc包&#xff1a;yum install hwloc 使用lstopo命令来显示系统的NUMA拓扑结构。 &#xff08;在bclinux安装后&#xff0c;只有lstopo-no-graphics命令&…

TSN协议解读系列 | (2) Qbv:车间里的求知路

时春季夜间&#xff0c;风清月朗&#xff0c;周期仿佛又轮转到了从前。一位不修篇幅的老头&#xff0c;牙还没刷&#xff0c;却已然歪头斜躺在床上&#xff0c;床边微亮的显示屏上隐约可见Victory的字样&#xff0c;不一会儿就熄了屏。 这正是我们今天的主人公&#xff0c;Qbv…

微服务-gateway跨域配置

文章目录 一、前言二、gateway跨域配置1、问题描述1.1、什么是跨域请求&#xff1f;1.1.1、同源策略1.1.2. 安全性考虑1.1.3. 跨域攻击 1.2、问题产生原因 2、解决方法2.1、修改配置文件2.2、配置类统一配置2.3、全局跨域拦截器 三、总结 一、前言 在SpringCloud项目中&#x…

C++ - 继承 一些 细节 - 组合 和 继承的区别

前言 本篇博客基于 C - 继承_chihiro1122的博客-CSDN博客 之上列出一些例子&#xff0c;如果有需要请看以上博客。 继承的例子 例1 上述例子应该选择 C。 首先不用说&#xff0c;p3肯定是指向 d 对象的开头的&#xff1b;p1 也是指向 d 对象的开头的&#xff1b;…

网站监控系统最佳实践之静态资源采样上报

作者 观测云 产品服务部门 深圳团队 朱端畅 背景说明 通过 RUM 采集前端数据时&#xff0c;若采集的数据过多&#xff0c;可能会导致占用过多的网络带宽以及其他资源。特别是刚进入首页加载数据时&#xff0c;可能会调用几十次甚至更多次 v1/write/rum?precisionms数据采集接…

spacy安装旧版本en_core_web_sm的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

Navicat Premium 16.2.7 数据库管理教程

Navicat Premium 16.2.7是一款功能强大的数据库管理工具&#xff0c;支持多种数据库类型&#xff0c;包括MySQL、Oracle、SQL Server等。以下是使用Navicat Premium的基本步骤&#xff1a; 安装Navicat Premium软件。打开Navicat Premium&#xff0c;在主界面上选择“新建连接…

实现高效数据存储:OpenStack Swift与本地文件系统的完美对接

文章目录 Swift对接本地文件系统前言控制节点新增20G磁盘针对磁盘做分区&#xff08;2个&#xff09;针对磁盘格式化卸载原有的 Swift 虚拟设备创建两个目录并挂载更改权限创建builder创建ring再平衡测试配合glance对接swiftglance对接swift测试 Swift对接本地文件系统 前言 实…

OpenCV实战(31)——基于级联Haar特征的目标检测

OpenCV实战&#xff08;31&#xff09;——基于级联Haar特征的目标检测 0. 前言1. Haar 特征图像表示2. 基于级联 Haar 特征的二分类分类器3. 级联分类器算法流程4. 使用 Haar 级联检测器进行人脸检测5. 完整代码小结系列链接 0. 前言 在机器学习基础一节中&#xff0c;我们介…

【Linux】文件缓冲区

目录 一、缓冲区图解二、自定义实现文件操作函数三、强制刷新内核缓冲区&#xff08;fsync&#xff09; 提到文件缓冲区这个概念我们好像并不陌生&#xff0c;但是我们对于这个概念好像又是模糊的存在脑海中&#xff0c;之间我们在介绍c语言文件操作已经简单的提过这个概念&…

NoSQL MongoDB Redis E-R图 UML类图概述

NoSQL NoSQL(Not only SQL)是对不同于传统的关系数据库的数据库管理系统的统称&#xff0c;即广义地来说可以把所有不是关系型数据库的数据库统称为NoSQL。 NoSQL 数据库专门构建用于特定的数据模型&#xff0c;并且具有灵活的架构来构建现代应用程序。NoSQL 数据库使用各种数…

CIM和websockt-实现实时消息通信:双人聊天和消息列表展示

欢迎大佬的来访&#xff0c;给大佬奉茶 一、文章背景 有一个业务需求是&#xff1a;实现一个聊天室&#xff0c;我和对方可以聊天&#xff1b;以及有一个消息列表展示我和对方&#xff08;多个人&#xff09;的聊天信息和及时接收到对方发来的消息并展示在列表上。 项目框架概…

SNP 分享:SAP S/4HANA Cloud 私有云版本及其独特优势

近几年来&#xff0c;SAP一直强调其愿景是帮助客户达成智慧型企业(Intelligent Enterprise)&#xff0c;为此其相关产品也在不断进行快速迭代&#xff0c;其核心就是S4HANA。同时SAP一直强调其要成为一家云计算公司&#xff0c;近些年也一直在推行云优先战略(Cloud First)。因此…

指针(通过指针间接访问内存)

#include <iostream> #include <algorithm> using namespace std; int main() { int a 2;//定义指针 &#xff1a; 数据类型 *指针变量名;int *p &a;cout << &a << " " << p << endl;//使用指针 &#xff1a; 可以通过…