大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动基于事件驱动

news2024/12/22 22:07:51

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Window 背景总览
  • Flink Window 滚动时间窗口
  • 基于时间驱动
  • 基于事件驱动

在这里插入图片描述

滑动时间窗口

在这里插入图片描述

滑动窗口是固定窗口更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。Flink 的滑动时间窗口(Sliding Window)是一种常用的窗口机制,适用于处理流式数据时需要在时间范围内定期计算的场景。滑动窗口会按照指定的窗口大小(window size)和滑动步长(slide interval)不断地划分数据,并对每个窗口内的数据进行聚合计算。

类型特点

窗口长度固定,可以有重叠。

  • 滑动窗口会有重叠部分,因此每个事件可能会被包含在多个窗口中。
  • 滑动窗口更适合定期计算某个时间范围内的聚合值,像是移动平均值、最近一段时间的活跃用户等场景。

关键参数

  • 窗口大小(window size):每个窗口包含的时间范围,例如 10 秒。
  • 滑动步长(slide interval):窗口每次滑动的时间步长,例如 5 秒。这意味着每隔 5 秒就会创建一个新的窗口,每个窗口覆盖的时间范围是 10 秒。

基于时间驱动

场景:我们可以每30秒计算一次最近一分钟用户购买的商品数

package icu.wzk;


import org.apache.commons.math3.analysis.function.Sin;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


import java.text.SimpleDateFormat;
import java.util.Random;

public class SlidingWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random +
                                ", timestamp: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10), Time.seconds(5));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        
        env.execute("SlidingWindow");
    }

}

基于事件驱动

package icu.wzk;


import org.apache.commons.math3.analysis.function.Sin;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


import java.text.SimpleDateFormat;
import java.util.Random;

public class SlidingWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random +
                                ", timestamp: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
                .countWindow(3, 2);
        globalWindow.apply(new MyCountWindowFuntion()).print();
        
        env.execute("SlidingWindow");
    }

}

会话窗口

由一系列事件组合一个指定时间长度timeout间隙组成,类似于Web应用的Session,也就是一段时间没有接收到新数据会生成新的窗口。
Session窗口分配器通过Session活动来对元素进行分组,Session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况。
Session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。
一个Session窗口通过一个Session间隔来配置,这个Session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的Session将关闭并且后续的元素将被分配到新的Session窗口去。

类型特点

  • 会话窗口不重叠,没有固定的开始和结束时间
  • 于翻滚窗口和滑动窗口相反,当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。
  • 后续的元素将会被分配到新的会话窗口

基于时间驱动

package icu.wzk;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class SessionWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {

                        return null;
                    }
                });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        window.apply(new MyTimeWindowFunction()).print();
        env.execute("SessionWindow");
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2108755.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis实战宝典:开发规范与最佳实践

目录标题 Key命名设计&#xff1a;可读性、可管理性、简介性Value设计&#xff1a;拒绝大key控制Key的生命周期&#xff1a;设定过期时间时间复杂度为O(n)的命令需要注意N的数量禁用命令&#xff1a;KEYS、FLUSHDB、FLUSHALL等不推荐使用事务删除大key设置合理的内存淘汰策略使…

Java | Leetcode Java题解之第387题字符串中的第一个唯一字符

题目&#xff1a; 题解&#xff1a; class Solution {public int firstUniqChar(String s) {Map<Character, Integer> position new HashMap<Character, Integer>();Queue<Pair> queue new LinkedList<Pair>();int n s.length();for (int i 0; i …

【python因果推断库8】工具变量回归与使用 pymc 验证工具变量1

目录 工具变量回归与使用 pymc 验证工具变量 回归机制与局部平均处理效应 旁白&#xff1a;从多元正态分布中采样 import arviz as az import daft import matplotlib.pyplot as plt import numpy as np import pandas as pd import pymc as pm import scipy from matplotli…

如何阅读PyTorch文档及常见PyTorch错误

如何阅读PyTorch文档及常见PyTorch错误 文章目录 如何阅读PyTorch文档及常见PyTorch错误阅读PyTorch文档示例常见Pytorch错误Tensor在不同设备上维度不匹配cuda内存不足张量类型不匹配 参考 PyTorch文档查看https://pytorch.org/docs/stable/ torch.nn -> 定义神经网络 torc…

红队攻防 | 利用GitLab nday实现帐户接管

在一次红队任务中&#xff0c;目标是一家提供VoIP服务的公司。该目标拥有一些重要的客户&#xff0c;如政府组织&#xff0c;银行和电信提供商。该公司要求外部参与&#xff0c;资产测试范围几乎是公司拥有的每一项互联网资产。 第一天是对目标进行信息收集。这一次&#xff0…

结构开发笔记(七):solidworks软件(六):装配摄像头、摄像头座以及螺丝,完成摄像头结构示意图

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/141931518 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

成功之路:如何获得机器学习和数据科学实习机会

一年内获得两份实习机会的数据科学家的建议和技巧 欢迎来到雲闪世界。在当今竞争激烈的就业市场中&#xff0c;获得数据科学实习机会可以成为您在科技领域取得成功的门票。 但申请者如此之多&#xff0c;你该如何脱颖而出呢&#xff1f; 无论您是学生、应届毕业生还是想要转行…

IDEA2024.2最新工具下载

​软件使用 1、解压缩包 2、打开如图第三个 3、运行过十来秒等待提示以下信息即可

Ubuntu 无法全局安装 node 包

Anchor: $: cat /etc/lsb* DISTRIB_IDUbuntu DISTRIB_RELEASE22.04 DISTRIB_CODENAMEjammy DISTRIB_DESCRIPTION"Ubuntu 22.04.4 LTS" $: node -v v20.17.0 $: npm -v 10.8.2Question: $: npm install -g docsify-cli结果&#xff1a;超时或者如下图 Answer: 有…

【Python 千题 —— 算法篇】字符串替换

Python 千题持续更新中 …… 脑图地址 &#x1f449;&#xff1a;⭐https://twilight-fanyi.gitee.io/mind-map/Python千题.html⭐ 题目背景 在日常编程中&#xff0c;我们经常会遇到需要对字符串中的特定字符或子串进行替换的需求。比如&#xff0c;替换文本中的敏感词汇、…

html初体验标准标签

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Document</title> </head> <body><input type"text"> </body> </html> 内容展示

C和Java实现杨辉三角

C: #include <stdio.h> #define N 15 int main() {int arr[N][N] { 0 }; //初始化int i 0;//行数int j 0;//列数for (i 0; i < N; i){arr[i][0] 1; //每行首元素是 1for (j 0; j < i; j)//为啥j<i呢&#xff1f;因为每一行的个数&#xff08;每一列多少个…

爵士编曲:如何编写爵士钢琴

排列&#xff08;Voicing&#xff09; 由于爵士和声组成音较为复杂&#xff0c;故此衍声排列内容 密集排列&#xff1a;直接堆一起或者左手低音右手和弦音 。 开放排列&#xff1a;各个声部大于等于纯四度&#xff0c;小于八度&#xff0c;符合两只手能弹的情况 混合排列&a…

Python | Leetcode Python题解之第388题文件的最长绝对路径

题目&#xff1a; 题解&#xff1a; class Solution:def lengthLongestPath(self, input: str) -> int:ans, i, n 0, 0, len(input)level [0] * (n 1)while i < n:# 检测当前文件的深度depth 1while i < n and input[i] \t:depth 1i 1# 统计当前文件名的长度l…

深入CSS 布局——WEB开发系列29

CSS 页面布局技术允许我们拾取网页中的元素&#xff0c;并且控制它们相对正常布局流、周边元素、父容器或者主视口/窗口的位置。 一、正常布局流&#xff08;Normal Flow&#xff09; CSS的布局基础是“正常流”&#xff0c;也就是页面元素在没有特别指定布局方式时的默认排列…

文件操作详解:fgetc,fputc,fgets,fputs,fscanf,,fprintf,fread,fwrite的使用和例子 C语言

前言 在日常应用中&#xff0c;我们为了持续的使用一些数据&#xff0c;为了让数据可以在程序退出后可以保存并正常使用&#xff0c;引入了文件的概念和操作。本文分享了一些常用的文件操作函数的使用方法和各自的区别。 一、常用文件顺序读写函数 下面例程所使用的VS工程代码…

【全网最全】《2024高教社杯/国赛》 C题 思路+代码+文献 蒙特卡洛+遗传算法 第一问 农作物的种植策略

​ 领取压缩包 问题 1&#xff1a;建模思路与方法 问题描述 我们需要为某乡村在 2024-2030 年间制定最优的农作物种植方案。考虑的因素包括农作物的销售量、种植成本、亩产量、销售价格、以及不同土地的适宜种植条件等。该问题分为两种情况&#xff1a;(1) 超过部分滞销&#…

【redis】本地windows五分钟快速安装redis

用处&#xff1a;本地自测&#xff0c;有时候公司redis环境不稳定&#xff0c;用自己的 1.下载&#xff0c;github下载一个解压缩在自己想要的位置 选择版本&#xff1a;Redis-7.4.0-Windows-x64-msys2-with-Service&#xff0c;zip GitHub - redis-windows/redis-windows: …

django学习入门系列之第十点《案例 用户管理》

文章目录 展示用户列表添加用户删除用户url中&#xff1f;的作用 往期回顾 展示用户列表 方向 展示用户列表 url函数 获取用户所有的信息基于HTML给他个渲染 views.py from django.shortcuts import render, HttpResponse, redirect# Create your views here.from app01.…

c++162 类的封装和访问

怎么样管理类管理对象 类如何定义对象 #include<iostream> using namespace std;//求圆的面积 class MyCirecle { public:double m_r;//属性 成员变量double m_s; public :double getR(){return m_r;}void setR(double r)//成员函数{m_r r;}double getS(){m_s 3.14…