大数据-玩转数据-Flink 水印

news2024/11/25 2:46:21

一、Flink 中的水印

在Flink的流式操作中, 会涉及不同的时间概念:

1.1 处理时间

是指的执行操作的各个设备的时间,对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据,然后下个窗口是10:00am-11:00am, 等等。处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调。他提供了最好的性能和最低的延迟。 但是, 在分布式和异步的环境下,处理时间没有办法保证确定性,容易受到数据传递速度的影响: 事件的延迟和乱序。在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。

1.2 事件时间

是指的这个事件发生的时间。在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在。在事件时间体系中, 时间的进度依赖于数据本身,和任何设备的时间无关。事件时间程序必须制定如何产生Event Time Watermarks(水印) 。假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,这些记录落入该小时。在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。从1.12开始, Flink内部已经把默认的语义改成了事件时间。

1.3 Flink中的WaterMark

支持event time的流式处理框架需要一种能够测量event time 进度的方式。 比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间,从而在程序中去关闭这个窗口。事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中, 即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据,这些数据可以照样被正确处理, 就像实时发生的一样能够进入正确的窗口。这种在Flink中去测量事件时间的进度的机制就是watermark(水印)。

1.4 Flink中如何产生水印

在这里插入图片描述

二、代码集成

package com.lyh.flink08;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.naming.Context;
import java.time.Duration;

public class WatorMark_01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0],
                                Long.valueOf(datas[1]),
                                Integer.valueOf(datas[2]))
                                ;
                    }
                });
        WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000;
            }
        });

        stream
                .assignTimestampsAndWatermarks(wms) // 指定水印和时间戳
                .keyBy(WaterSensor::getId)
          .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                             @Override
                             public void process(String key,
                                                 Context ctx,
                                                 Iterable<WaterSensor> elements,
                                                 Collector<String> out) throws Exception {
                                 String msg = "当前key: " + key
                                         + "窗口: [" + ctx.window().getStart() / 1000 + "," + ctx.window().getEnd()/1000 + ") 一共有 "
                                         + elements.spliterator().estimateSize() + "条数据 ";
                                 out.collect(msg);
                             }
                         }
                ).print();
        env.execute();
    }
}

三、测试结果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/941582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

去掉vue项目运行时中出现的黄色警告

最近在写vue项目时发现想测试一个接口行不行的时候&#xff0c;在控制台输出的时候发现会有很多黄色警告&#xff0c;每次都要找很久才能找到自己想输出的内容&#xff0c;如下图&#xff1a; 去掉这些只需要一句话&#xff1a; const app createApp(App) app.config.warnHa…

数据库管理-第九十九期 OCM之路(20230828)

数据库管理-第九十九期 OCM之路&#xff08;20230828&#xff09; 本周五&#xff0c;我就要在上海Oracle University的考场进行19c OCM的升级考试了。关于之前版本的OCM&#xff0c;11g OCM我是在2016年9月拿下的&#xff0c;在一年后的即2017年的9月拿下了12c OCM。所以对于我…

Spring Cloud Alibaba-Sentinel规则

1 流控规则 流量控制&#xff0c;其原理是监控应用流量的QPS(每秒查询率) 或并发线程数等指标&#xff0c;当达到指定的阈值时 对流量进行控制&#xff0c;以避免被瞬时的流量高峰冲垮&#xff0c;从而保障应用的高可用性。 第1步: 点击簇点链路&#xff0c;我们就可以看到访…

QML Book 学习基础2(基本元素控件)

目录 矩形&#xff08;Rectangle&#xff09; 文本元素 鼠标键盘交互 布局元素 矩形&#xff08;Rectangle&#xff09; 矩形项用于用纯色或渐变填充区域&#xff0c;和/或提供矩形边框。需要注意如果长宽没有设置&#xff0c;是无法看到矩形的 Rectangle {id: rect1x: 12; …

上位机采集8通道模拟量模块数据

模拟量模块和上位机的配合使用可以实现对模拟量数据的采集、传输和处理。下面是它们配合使用的一般步骤&#xff1a;1. 连接模拟量模块&#xff1a;将模拟量模块与上位机进行连接。这通常涉及将模拟量模块的输入通道与被监测的模拟信号源连接起来&#xff0c;如传感器、变送器等…

css元素定位:通过元素的标签或者元素的id、class属性定位,还不明白的伙计,看这个就行了!

前言 大部分人在使用selenium定位元素时&#xff0c;用的是xpath元素定位方式&#xff0c;因为xpath元素定位方式基本能解决定位的需求。xpath元素定位方式更直观&#xff0c;更好理解一些。 css元素定位方式往往被忽略掉了&#xff0c;其实css元素定位方式也有它的价值&…

设计模式入门笔记

1 设计模式简介 在IT这个行业&#xff0c;技术日新月异&#xff0c;可能你今年刚弄懂一个编程框架&#xff0c;明年它就不流行了。 然而即使在易变的IT世界也有很多几乎不变的知识&#xff0c;他们晦涩而重要&#xff0c;默默的将程序员划分为卓越与平庸两类。比如说&#xff…

游戏测试工程师的职业发展前景怎么样?

软件测试是一个看似入门简单&#xff0c;门槛极低&#xff0c;薪资也还不错的的行业&#xff0c;但如果你开始向这个方向发展&#xff0c;你就发现并非如此。实际情况是初级功能岗位薪资偏低&#xff0c;而且以外包公司居多&#xff0c;高级岗位薪资非常可观&#xff0c;但对技…

震动分析国标GB/T 19873.3-2019/ISO 13373-3:2015笔记

1.国家标准 1.1震动测量 现行国家标准是&#xff1a;GB/T 19873.2-2009 机器状态监测与诊断 振动状态监测 第2部分&#xff1a;振动数据处理、分析与描述 它的起草人&#xff1a; 郑州机械研究所。西安热工研究院有限公司。东南大学。 主要起草人 韩国明 、张学延 、傅行…

2023-08-28 C语言函数一定要在.h文件中声明吗

老林的C语言新课, 想快速入门点此 <C 语言编程核心突破> C语言函数一定要在.h文件中声明吗 前言一、三种情况下的函数声明与定义策略单文件小练习多文件工程需要在多个文件调用的函数不需要跨文件调用的函数 二、示例需要在多个文件调用的函数:不需要跨文件调用的函数: …

一起参与开源,志愿者招募中!IT、翻译、新媒体、设计等

Hi 同学&#xff0c;你是不是专注某个领域&#xff0c;想找机会&#xff0c;积累开源软件方面的早期经验&#xff1f;你来对地方啦。请阅读本文&#xff0c;了解为什么要加入 ONLYOFFICE&#xff0c;做出贡献&#xff0c;以及如何做到。 贡献开源&#xff0c;福利多多 为开源项…

2023全国大学生数学建模竞赛B题思路模型代码

目录 一.思路模型见文末名片&#xff0c;比赛开始9.7晚上第一时间更新 二.国赛常用算法之随机森林 3.思路获取见此 一.思路模型见文末名片&#xff0c;比赛开始9.7晚上第一时间更新 二.国赛常用算法之随机森林 # -*- coding: utf-8 -*- """ author: Administ…

自动化运维工具-----Ansible入门详解

目录 一.Ansible简介 什么是Ansible&#xff1f; Ansible的特点 Ansible的架构 二.Ansible任务执行解析 ansible任务执行模式 ansible执行流程 ansible命令执行过程 三.Ansible配置解析 ansible的安装方式 ansible的程序结构&#xff08;yum安装为例&#xff09; ansibl…

11. 排兵布阵

目录 题目 思路&#xff08;贪心快排&#xff09; 注意事项 C代码 题目 排兵布阵 Description 总所周知&#xff0c;韩信是一位神勇的军事家。某日夜幕&#xff0c;敌方突然来袭&#xff0c;韩信作为塞外将帅吹响紧急的号角。各个帐内的士兵听见号角立即集合&#xff0c;站…

Python爬虫异常处理实践:处理被封禁和网站升级问题

在这篇文章中&#xff0c;我们将一起探讨Python爬虫异常处理实践&#xff0c;特别关注处理被封禁和网站升级问题。让我们一起来看看如何解决这些问题&#xff0c;提高我们爬虫程序的稳定性和可靠性。   首先&#xff0c;我们要了解为什么会遇到这些问题。网站封禁爬虫的原因主…

(六)k8s实战-存储管理

一、Volumes 1、HostPath 【使用场景&#xff1a;容器目录 挂载到 主机目录】 【可以持久化到主机上】 将节点上的文件或目录挂载到 Pod 上&#xff0c;此时该目录会变成持久化存储目录&#xff0c;即使 Pod 被删除后重启&#xff0c;也可以重新加载到该目录&#xff0c;该目…

最小化安装移动云大云操作系统--BCLinux-for-Euler-22.10-everything-x86_64-230316版

CentOS 结束技术支持&#xff0c;转为RHEL的前置stream版本后&#xff0c;国内开源Linux服务器OS生态转向了开源龙蜥和开源欧拉两大开源社区&#xff0c;对应衍生出了一系列商用Linux服务器系统。BCLinux-for-Euler-22.10是中国移动基于开源欧拉操作系统22.03社区版本深度定制的…

【halcon深度学习】图像分割数据集格式的转换

前言 目前用于**图像分割的**数据集&#xff0c;我目前接触到的用的比较多的有&#xff1a; 1 PASCAL VOC 2 COCO 3 YOLO 4 Halcon自己的格式&#xff08;其实就是Halcon字典类型&#xff09;当前我涉及到计算机视觉中的数据集格式有&#xff0c;PASCAL VOC、COCO 和 YOLO 用于…

天气插件和antv图表组件库的使用

目录 天气插件 antv组件库 特性 数据映射 data xField yField 图形样式 point state 图表组件 label tooltip 图表交互 添加交互 天气插件 网站:天气预报代码_天气预报插件_免费天气预报代码(插件)调用——天气网 (tianqi.com) 挑选想要的样式&#xff0c;点击 …

暴力递归转动态规划(一)

前两篇帖子介绍了暴力递归的过程&#xff0c;总的来说就是利用自然智慧不断的尝试。这篇文章则会介绍如何将暴力递归转成动态规划。 斐波那契数列 斐波那契数列一定都不陌生&#xff0c;规定第一列的值是1&#xff0c;第二列的值是2的话&#xff0c;那第七列的值就是13&#x…