0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

news2025/1/10 23:44:06

大纲

  • 滑动(Sliding)和滚动(Tumbling)的区别
  • 样例
    • 窗口为2,滑动距离为1
    • 窗口为3,滑动距离为1
    • 窗口为3,滑动距离为2
    • 窗口为3,滑动距离为3
  • 完整代码
  • 参考资料

在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。

滑动(Sliding)和滚动(Tumbling)的区别

正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。
在这里插入图片描述
而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。
在这里插入图片描述
它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。

样例

我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。

word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 

窗口为2,滑动距离为1

count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。

    def count_window(self, size: int, slide: int = 0):
        """
        Windows this KeyedStream into tumbling or sliding count windows.

        :param size: The size of the windows in number of elements.
        :param slide: The slide interval in number of elements.

        .. versionadded:: 1.16.0
        """
        if slide == 0:
            return WindowedStream(self, CountTumblingWindowAssigner(size))
        else:
            return WindowedStream(self, CountSlidingWindowAssigner(size, slide))

我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。

    # reducing
    windows_size = 2
    sliding_size = 1
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

(E,2)
(E,2)
(E,2)
(E,2)
(E,2)

在这里插入图片描述

窗口为3,滑动距离为1

    # reducing
    windows_size = 3
    sliding_size = 1
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)
(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为2

    # reducing
    windows_size = 3
    sliding_size = 2
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为3

这个就等效于滚动窗口了,因为“滑”过了窗口大小。

    # reducing
    windows_size = 3
    sliding_size = 3
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

完整代码

from typing import Iterable

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindow

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
    def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
        return [(key,  len([e for e in inputs]))]


word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    windows_size = 3
    sliding_size = 1
    reduced=keyed.count_window(windows_size, sliding_size) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1155560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电源控制系统架构(PCSA)之电源控制框架

安全之安全(security)博客目录导读 PCSA的主要目的是描述SoC系统电源控制集成的标准方法。这种方法的一个关键组成部分是电源控制框架。下图显示了电源控制框架概念的高级说明。 电源控制框架是标准基础设施组件、接口和相关方法的集合,可用于构建SoC电源管理所需的…

【Java初阶练习题】-- 循环练习题

循环练习题 1. 根据年龄, 来打印出当前年龄的人是少年(低于18), 青年(19-28), 中年(29-55), 老年(56以上)2. 判定一个数字是否是素数3. 打印 1 - 100 之间所有的素数4. 输出 1000 - 2000 之间所有的闰年5. 输出乘法口诀表6. 求两个正整数的最大公约数7. 求出0~999之…

滚动条默认是隐藏的只有鼠标移上去才会显示

效果 在设置滚动条的类名中写 /* 滚动条样式 */.content-box::-webkit-scrollbar {width: 0px; /* 设置纵轴(y轴)轴滚动条 */height: 0px; /* 设置横轴(x轴)轴滚动条 */}/* 滚动条滑块(里面小方块) */.…

Bellman-ford 贝尔曼-福特算法

Bellman-ford算法可以解决负权图的单源最短路径问题 --- 它的优点是可以解决有负权边的单源最短路径问题,而且可以判断是否负权回路 它也有明显的缺点,它的时间复杂度O(N*E)(N是点数 , E是边数&#xff09…

实现多平台兼容性:开发同城外卖小程序的技术策略

在移动互联网时代,外卖行业的快速崛起改变了人们点餐的方式。随着小程序的兴起,开发同城外卖小程序成为了许多企业的重要战略。然而,小程序的多平台兼容性成为了一项关键挑战。本文将探讨如何实现多平台兼容性,以开发高效且用户友…

LeetCode 2402.会议室III ----堆+模拟

5e5 的st与ed 容易看出来是用堆来写的一道题目,一开始我只用了一个堆,出现了问题 问题就是当我们当前这个会议有多个可以选择的会议室可以选择的时候不一定选择那个最先结束的会议室而是应该选择可以选择的那些里面编号最小的那一个,因此我们…

ConfigurationProperties之宽松绑定

前面我们讲了ConfigurationProperties 但这个东西只能匹配小写 我这样写就正常 但当我们将配置文件和属性类都改成大写时 配置文件一切正常 但ConfigurationProperties就开始报错了 这涉及到一个知识点 宽松绑定 也叫做 松散绑定 其实 ConfigurationProperties提供给我们了…

Python 数学函数和 math 模块指南

Python 提供了一组内置的数学函数,包括一个广泛的数学模块,可以让您对数字执行数学任务。 内置数学函数。min() 和 max() 函数可用于在可迭代对象中查找最低或最高值: 示例:查找可迭代对象中的最低或最高值: x min…

深入理解udp

1.再谈端口号 1.1复习 我们上一篇谈了很久的应用层的http,并在此前我们使用socket编程写了一个能相互通信的客户端与服务端,但是我们也只是粗略的理解了一下tcp和udp在编程过程中所形成的差异性,并没有实质去了解一下其详细内容,…

KNN模型

使用K-Nearest Neighbors (KNN)算法进行分类。首先加载一个数据集,然后进行预处理,选择最佳的K值,并训练一个KNN模型。 # encodingutf-8 import numpy as np datas np.loadtxt(datingTestSet2.txt) # 加载数据集,返回一个numpy数…

68 内网安全-域横向PTHPTKPTT哈希票据传递

目录 演示案例:域横向移动PTH传递-Mimikatz域横向移动PTK传递-Mimikatz域横向移动PTT传递-MS14068&kekeo&local国产Ladon内网杀器测试验收-信息收集,连接等 涉及资源: PTH(pass the hash) #利用lm或ntlm的值进行的渗透测试 PTT(pass the ticket) #利用的票据凭证TGT进行…

餐饮连锁品牌2023:端起“外卖碗”,吃上“下沉饭”

作者 | 陈小江 文 | 螳螂观察 “没想到,蜜雪(蜜雪冰城)能到我们这乡镇来开,我觉得挺意外的。「柏记水饺」也算挺大一品牌,没想到也能来(我们)乡镇”。 谈起不断有连锁品牌进镇开店,黑龙江讷河…

算法笔记【3】-冒泡排序法

文章目录 一、原理二、代码实现三、算法特点 一、原理 冒泡排序是一种简单但有效的排序算法,它可以用于对数字进行升序排序。该算法通过多次比较相邻元素并交换位置来实现排序的目的。冒泡排序的基本思想是从第一个元素开始,依次比较相邻的两个元素&…

python 练习 在列表元素中合适的位置插入 输入值

目的: 有一列从小到大排好的数字元素列表, 现在想往其插入一个值,要求: 大于右边数字小于左边数字 列表元素: [1,4,6,13,16,19,28,40,100] # 方法: 往列表中添加一个数值,其目的方便元素位置往后…

ardupilot开发 --- 深度相机 篇

1. ZED 相机 1.1 规格 2. RealSense 需要机载计算机作为中介!!

【HarmonyOS】低代码平台组件拖拽使用技巧之网格布局

【关键字】 HarmonyOS、低代码平台、组件拖拽、网格布局 1、写在前面 前面分享了在低代码平台上使用堆叠容器和滚动容器的组件拖拽使用技巧,本篇我们继续来学习其它组件的使用,今天为大家介绍的是网格布局的使用,需要注意的是,网…

NLP之LSTM与BiLSTM

文章目录 代码展示代码解读双向LSTM介绍(BiLSTM) 代码展示 import pandas as pd import tensorflow as tf tf.random.set_seed(1) df pd.read_csv("../data/Clothing Reviews.csv") print(df.info())df[Review Text] df[Review Text].astyp…

【Linux进程控制】进程控制专篇

【Linux进程控制】进程控制专篇 目录 【Linux进程控制】进程控制专篇进程创建fork函数写实拷贝fork常规用法fork调用失败的原因 进程终止进程退出场景进程常见退出方法_exit函数return退出 进程等待进程等待必要性进程等待的方法获取子进程status 具体代码实现进程程序替换替换…

3.5 队列的表示和操作的实现

思维导图: 3.5.1 队列类型 3.5.1 队列的类型定义 1. 简介 队列是一种特殊的线性表,它的特性是只能在表的一端进行插入操作,而在另一端进行删除操作。通常将允许插入操作的一端称为队尾,允许删除操作的一端称为队头。 2. 抽象…