0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

news2024/10/6 0:34:07

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [
    ("A",2),("A",1),
    ("B",3),("B",1),("B",2),
    ("C",3),("C",1),("C",4),("C",2),
    ("D",3),("D",1),("D",4),("D",2),("D",5),
    ("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
    def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
        return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducing
    reduced=keyed.count_window(2) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducing
    reduced=keyed.count_window(3) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducing
    reduced=keyed.count_window(4) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducing
    reduced=keyed.count_window(5) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducing
    reduced=keyed.count_window(6) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterable

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindow

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
    def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
        return [(key,  len([e for e in inputs]))]


word_count_data = [
    ("A",2),("A",1),
    ("B",3),("B",1),("B",2),
    ("C",3),("C",1),("C",4),("C",2),
    ("D",3),("D",1),("D",4),("D",2),("D",5),
    ("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.count_window(2) \
                .apply(SumWindowFunction(),
                       Types.TUPLE([Types.STRING(), Types.INT()]))

    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1156162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

go中url.ParseRequestURI和url.Parse函数的踩坑记

使用url.Parse函数 package mainimport ("fmt""net/url" )func main() {attrRawUrl : "http://localhost?wifitrue&carrier#Staysafe AIS&osandroid"urlObj, _ : url.Parse(attrRawUrl)fmt.Printf("urlObj:%#v\n", *urlObj)…

python 安装 pyquicklz 库函数

问题描述: 安装 pyquicklz 库的时候 报错 error: subprocess-exited-with-error 解决方法: 安装 Cython 库: pip install Cython -i https://pypi.tuna.tsinghua.edu.cn/simple/ 安装 Microsoft C Build Tools Microsoft C Build Tools 的下…

Spring MVC 续

一、拦截器 1.介绍拦截器 1.拦截器(springmvc提供) 2.只拦截控制单元(生效必须配置拦截器) 3.会拦截静态资源,使用拦截器时,尽量使用局部配置,配置拦截的控制单元即可。 注意:只…

python多环境并存

1. 现况简介 1.1 本人windows所存Python版本 Python 2.7 Python 3.6 Python 3.7 1.2 Python 各版本路径如下 Python 2.7Python 3.6Python 3.7C:\Server\Python27C:\Server\Python36C:\Server\Python37 1.3 系统环境变量配置如下 2. 解决方案 2.1 进入目录 cd C:\Server…

【机器学习】三、特征选择与稀疏学习

特征选择和稀疏学习 子集搜索与评价 对象都有很多属性来描述,属性也称为特征(feature),用于刻画对象的某一个特性。对一个学习任务而言,有些属性是关键有用的,而有些属性则可能不必要纳入训练数据。对当前学…

win10 下编译ffmpeg3.36.tar.gz

所需工具: win10 ffmpeg3.36.tar.gz。 或其他版本,下载地址:Index of /releases msys2。 下载地址:http://www.msys2.org。 Visual Studio 2017。 1. 安装MSYS MSYS2像是windows下的一个子系统,…

`.NET Web`新人入门必学项目`EarthChat`

.NET Web新人入门必学项目EarthChat EarthChat是一个基于.NET 7的实战项目,EarthChat提供了很多的最佳实践,EarthChat的目标也是成为一个很多人都喜欢的大型聊天业务系统,并且将结合SKAI大模型进行打造智能业务系统,在EarthChat中…

05_es分布式搜索引擎1

一、初始elasticsearch 1.elasticsearch简单介绍 ①什么是elasticsearch? 开源的分布式搜索引擎,实现海量数据搜索,日志统计,分析,系统监控等功能 ②什么是elastic stack? 是以elasticsearch为核心的技…

【COMP304 LEC4 LEC5】

LEC 4 1. Truth-Functionality Propositional logic 的connectives(连接词)are truth-functional 但是,有时候的描述不是true-functional的,比如:"Knowing that", "It is necessary that",&quo…

微信小程序开发-微信支付退款功能【附有完整代码】

之前有写过详细的微信支付功能:微信支付 我们使用weixin-java-pay的jar包等,配置上的流程同微信支付,可以看上面的文章。 退款使用的WxPayService类的refundV3方法。使用该方法需要在微信支付配置的基础上加上:apiclient_key.pem…

Linux期末复习——多线程编程

线程概述 线程基本编程 函数说明 pthread_create(): 创建线程,成功返回0pthread_exit(): 主动退出线程,成功返回0pthread_join(): 挂起线程等待结束,成功返回0pthread_cancel在别的线程中终止另一个线程的执行,成功返回0 示例…

sql文件批处理程序-java桌面应用

项目效果: 支持sql文件夹批处理,选中文件夹或者sql文件 支持测试连接,可以校验数据库配置 支持报错回显,弹出报错文件名以及问题语句 支持在程序中修改错误语句,用户可以选择保存修改内容继续执行或不保存修改只执行…

线性代数 第四章 线性方程组

一、矩阵形式 经过初等行变换化为阶梯形矩阵。当,有解;当,有非零解。 有解,等价于 可由线性表示 克拉默法则:非齐次线性方程组中,系数行列式,则方程组有唯一解,且唯一解为 其中是…

TestCenter测试管理工具

estCenter(简称TC)一款广受好评的测试管理工具,让测试工作更规范、更有效率,实现测试流程无纸化,测试数据资产化。 产品概述 TC流程图 产品功能 一、案例库 案例库集中化管理,支持对测试用例集中管理&…

Gartner报告 | Mendix再次成为低代码领导者!Mendix在愿景完整性和执行能力领域的排名最为靠前。

本文作者: Hans de Visser, Mendix 首席产品官 在最新发布的2023 Gartner魔力象限图™ 中对于企业级低代码应用程序平台,Gartner指出,企业IT领导者需要满足“应用程序开发向更高抽象级别迈进的步伐,并采用低代码平台来加速应用程序开发。” 此…

【华为】路由器以PPPoE拨号接入广域网

组网需求 用户希望以PPPoE拨号方式接入广域网,如图1所示,Router作为PPPoE客户端,得到PPPoE服务器的认证后获得IP地址,实现用户接入互联网的需求。内网网关地址(即VLANIF1接口的IP地址)为10.137.32.1/24。 …

N-131基于jsp,ssm物流快递管理系统

开发工具:eclipse,jdk1.8 服务器:tomcat7.0 数据库:mysql5.7 技术: springspringMVCmybaitsEasyUI 项目包括用户前台和管理后台两部分,功能介绍如下: 一、用户(前台)功能: 用…

UE5 Android下载zip文件并解压缩到指定位置

一、下载是使用市场的免费插件 二、解压缩是使用市场的免费插件 三、Android路径问题 windows平台下使用该插件没有问题,只是在Android平台下,只有使用绝对路径才能进行解压缩,所以如何获得Android下的绝对路径?增加C文件获得And…

生态扩展:Flink Doris Connector

生态扩展:Flink Doris Connector 官网地址: https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector flink的安装: tar -zxvf flink-1.16.0-bin-scala_2.12.tgz mv flink-1.16.0-bin-scala_2.12.tgz /opt/flinkflink环境…