大数据(9e)图解Flink窗口

news2024/12/29 9:10:02

文章目录

  • 1、代码模板
    • 1.1、pom.xml
    • 1.2、log4j.properties
    • 1.3、Java模板
  • 2、按键分区(Keyed)、非按键分区(Non-Keyed)
    • 2.1、Keyed
    • 2.2、Non-Keyed
  • 3、窗口的分类
    • 3.1、基于时间的窗口
    • 3.2、基于事件个数的窗口
  • 4、窗口函数
  • 5、示例代码
    • 5.1、ReduceFunction
    • 5.2、AggregateFunction
    • 5.3、ProcessWindowFunction

1、代码模板

本地开发环境:WIN10+IDEA
只改##################### 业务逻辑 #####################之间的代码

1.1、pom.xml

<!-- 配置 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
</dependencies>

1.2、log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

1.3、Java模板

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //加入自定义数据源
        DataStreamSource<String> dss = env.addSource(new MySource());
        //################################### 业务逻辑 ########################################
        dss.print();
        //################################### 业务逻辑 ########################################
        env.execute();
    }

    public static class MySource implements SourceFunction<String> {
        public MySource() {}

        @Override
        public void run(SourceContext<String> sc) {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String str = scanner.nextLine().trim();
                if (str.equals("STOP")) {break;}
                if (!str.equals("")) {sc.collect(str);}
            }
            scanner.close();
        }

        @Override
        public void cancel() {}
    }
}

2、按键分区(Keyed)、非按键分区(Non-Keyed)

Non-Keyed的窗口的流的并行度=1

2.1、Keyed

基于时间的窗口

.keyBy(...)
.window(...)

基于事件个数的窗口

.keyBy(...)
.countWindow(...)

2.2、Non-Keyed

基于时间的窗口

.windowAll(...)

基于事件个数的窗口

.countWindowAll(...)

3、窗口的分类

  • 将 无界限的 数据 切分为 有界限的 数据
  • https://yellow520.blog.csdn.net/article/details/121288240

3.1、基于时间的窗口

基于时间滑动窗口

.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))

基于时间滚动窗口

.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))

基于时间会话窗口

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))

基于时间的全局窗口

.window(GlobalWindows.create())

3.2、基于事件个数的窗口

基于事件个数滑动窗口

.countWindow(4,3)

基于事件个数滚动窗口

.countWindow(4)

4、窗口函数

窗口函数窗口关闭时,窗口函数就去处理窗口中的每个元素
ReduceFunction增量处理,高效
AggregateFunction增量处理,高效
ProcessWindowFunction函数执行前要在内部缓存窗口上所有的元素,低效

5、示例代码

修改代码模板中##################### 业务逻辑 #####################之间的代码

5.1、ReduceFunction

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.keyBy(s -> s)
   .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
   .reduce((ReduceFunction<String>) (v1, v2) -> v1 + "," + v2)
   .print("输出");

基于时间滚动窗口

5.2、AggregateFunction

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
   //AggregateFunction<IN, ACC, OUT>
   .aggregate(new AggregateFunction<String, Long, Long>() {
       //创建累加器
       @Override
       public Long createAccumulator() {return 0L;}
       //累加
       @Override
       public Long add(String in, Long acc) {return acc + 1L;}
       //从累加器获取结果
       @Override
       public Long getResult(Long acc) {return acc;}
       //合并累加器
       @Override
       public Long merge(Long a1, Long a2) {return a1 + a2;}
   })
   .print("输出");

基于时间滑动窗口

5.3、ProcessWindowFunction

源码截取

abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> {
    abstract void process(
        ProcessAllWindowFunction<IN, OUT, W>.Context var1,  //上下文对象
        Iterable<IN> var2,                                  //窗口内的所有输入
        Collector<OUT> var3                                 //收集器
    );

代码

import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
   .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
       @Override
       public void process(Context context, Iterable<String> in, Collector<String> out) {
           //打印窗口范围
           System.out.println(context.window().toString());
           //在窗口内,收集元素
           out.collect(String.valueOf(in));
       }
   })
   .print("输出");

测试运行截图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/21915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TIA博途_水处理项目中开启累计运行时间最短的泵_程序示例

TIA博途_水处理项目中开启累计运行时间最短的泵_程序示例 需求: 有N台水泵,每个水泵统计累计运行时间。当满足条件时,根据设定开泵的数量,启动累计运行时间最短的对应数量的泵。故障切换时,也切换到运行时间最短的泵。 具体方法可参考以下内容: 如下图所示,打开TIA博途后…

【毕业设计】62-基于单片机的防酒驾\酒精浓度检测系统设计研究(原理图、源代码、仿真工程、低重复率参考设计、PPT)

【毕业设计】62-基于单片机的防酒驾\酒精浓度检测系统设计研究&#xff08;原理图、源代码、仿真工程、低重复率参考设计、PPT&#xff09;[toc] 资料下载链接 资料下载链接 资料链接&#xff1a;https://www.cirmall.com/circuit/33758/ 包含此题目毕业设计全套资料&#xf…

国科大课程自动评价脚本JS

国科大课程一键评估 操作流程&#xff1a; 方法 打开F12点击console/控制台复制粘贴下面代码回车 for(var i 0; i<1000; i) { if($("input[nameitem_"i"]").length) $("input[nameitem_"i"]").get(Math.round(Math.random()*2)…

C++11--lambda表达式--包装器--bind--1119

1.lambda表达式 lambda表达式书写格式&#xff1a;[捕捉列表] (参数列表) mutable -> 返回值类型 { 比较的方法 } int func() {int a, b, c, d, e;a b c d e 1;// 全部传值捕捉auto f1 []() {cout << a << b << c << d << e << …

BLE学习(3):ATT和GATT详解

本文章将介绍在面向连接的蓝牙模式中&#xff0c;ATT(attribute protocol,属性协议)和GATT(generic attribute profile,通用属性配置文件)这两个重要的协议层&#xff0c;它与蓝牙的数据传输密切相关。 1 设备之间如何建立连接(Gap层) 若BLE设备之间要进行数据传输&#xff0…

Qt5 QML TreeView currentIndex当前选中项的一些问题

0.前言 Qt5 QML Controls1.4 中的 TreeView 存在诸多问题&#xff0c;比如节点连接的虚线不好实现&#xff0c;currentIndex 的设置和 changed 信号的触发等。我想主要的原因在于 TreeView 是派生自 BasicTableView&#xff0c;而 TableView 内部又是由 ListView 实现的。 正…

二、openCV+TensorFlow入门

目录一、openCV入门1 - 简单图片操作2 - 像素操作二、TensorFlow入门1 - TensorFlow常量变量2 - TensorFlow运算本质3 - TensorFlow四则运算4 - tensorflow矩阵基础5 - numpy矩阵6 - matplotlib绘图三、神经网络逼近股票收盘均价&#xff08;案例&#xff09;1 - 绘制15天股票K…

编译原理 x - 练习题

简答题逆波兰后缀表达式和三元式序列源程序翻译成中间代码DAG优化正则文法 构造正则表达式正规式 改 上下文无关文法表示DFA有限状态机图移进-规约消除左递归文法-最左推导-短语LL(1)文法LR(0) | SLR(1)文法简答题 编译过程可分为前端和后端&#xff0c;描述一下前端和后端分别…

【设计模式】装饰者模式:以造梦西游的例子讲解一下装饰者模式,这也是你的童年吗?

文章目录1 概述1.1 问题1.2 定义1.3 结构1.4 类图2 例子2.1 代码2.2 效果图3 优点及适用场景3.1 优点3.2 适用场景1 概述 1.1 问题 众所周知&#xff0c;造梦西游3有四个角色&#xff0c;也就是师徒四人&#xff0c;这师徒四人每个人都有自己专属的武器和装备。假定我们以及设…

推荐10个Vue 3.0开发的开源前端项目

Vue 是一款用于构建用户界面的 JavaScript 框,它基于标准 的HTML、CSS 和 JavaScript 构建,并提供了一套声明式的、组件化的编程模型,用以帮助开发者高效地开发用户界面。目前,Vue 3.0正式版也发布了两年的时间,越累越多的开发者也用上了Vue 3.0。 对比Vue2.x,Vue 3.0在…

并发bug之源(二)-有序性

什么是有序性&#xff1f; 简单来说&#xff0c;假设你写了下面的程序&#xff1a; java int a 1; int b 2; System.out.println(a); System.out.println(b);但经过编译器/CPU优化&#xff08;指令重排序&#xff0c;和编程语言无关&#xff09;后可能就变成了这样&#x…

【DDR3 控制器设计】(7)DDR3 的用户端口读写模块设计

写在前面 本系列为 DDR3 控制器设计总结&#xff0c;此系列包含 DDR3 控制器相关设计&#xff1a;认识 MIG、初始化、读写操作、FIFO 接口等。通过此系列的学习可以加深对 DDR3 读写时序的理解以及 FIFO 接口设计等&#xff0c;附上汇总博客直达链接。 【DDR3 控制器设计】系列…

CSS---复合选择器

目录 一&#xff1a;复合选择器的介绍 二、复合选择器的讲解 &#xff08;1&#xff09;后代选择器 &#xff08;2&#xff09;子元素选择器 &#xff08;3&#xff09;并集选择器 &#xff08;4&#xff09;链接伪类选择器 &#xff08;5&#xff09;focus伪类选择器 一&…

基于SpringBoot的线上买菜系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SpringBoot 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目…

【Spring系列】- Spring事务底层原理

Spring事务底层原理 &#x1f604;生命不息&#xff0c;写作不止 &#x1f525; 继续踏上学习之路&#xff0c;学之分享笔记 &#x1f44a; 总有一天我也能像各位大佬一样 &#x1f3c6; 一个有梦有戏的人 怒放吧德德 &#x1f31d;分享学习心得&#xff0c;欢迎指正&#xff0…

Vue-CLI的安装、使用及环境配置(超详细)

Vue CLI 是一个基于 Vue 进行快速项目开发的工具。它可以提供可交互式的项目脚手架和运行时的服务依赖&#xff0c;帮助你快速完成一个风格统一、拓展性强的现代化 web 单页面应用。 Vue-CLI 所需环境 Vue-CLI 是一个需要全局安装的NPM包&#xff0c;安装需要在 Node.js 环境下…

一、openCV+TensorFlow环境搭建

目录一、anaconda安装二、tensorflow安装三、Opencv安装四、pycharm新建项目使用Anaconda的环境五、验证环境安装六、tensorflow安装jupyter notebook一、anaconda安装 anaconda官网&#xff1a;https://www.anaconda.com/anaconda下载&#xff1a;https://repo.anaconda.com/…

【k8s】10.网络插件

文章目录一、etcd详解1、etcd的特点2、准备签发证书的环境二、网络插件原理1、flannel1.1 UDP模式&#xff08;性能差&#xff09;1.2 VXLAN模式&#xff08;性能较好&#xff09;1.3 host-gw模式&#xff08;性能最高&#xff09;2、calico插件3、总结一、etcd详解 etcd是Cor…

Redis_第二章_实战篇_第一节_ 短信登录

Redis_第二章_实战篇_第一节_ 短信登录 文章目录Redis_第二章_实战篇_第一节_ 短信登录短信登录1.1、导入黑马点评项目1.1.1 、导入SQL1.1.2、有关当前模型1.1.3、导入后端项目1.1.4、导入前端工程1.1.5 运行前端项目:1.2 、基于Session实现登录流程1.3 、实现发送短信验证码功…

ANDI数据集介绍|补充信息|2022数维杯国际赛C题

目录 1.患者基本信息 2.生物标记物量化值 3.认知评估 4.解剖结构量化值 5.Other 6.上述各信息的bl值 1.患者基本信息 RID (Participant roster ID) ex. 2、PTID (Original study protocol) ex. 011_S_0002、VISCODE (Visit code) ex. bl、SITE ex. 11、COLPROT (Study p…