大数据-玩转数据-Flink窗口函数

news2025/1/15 7:49:48

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Window_s_function {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1,
                                              WaterSensor value2) throws Exception {
                        System.out.println("Window_s_function.reduce");
                        value1.setVc ( value1.getVc() + value2.getVc());
                        return (value1);
                    }
                })
                .print();

        env.execute();
    }
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;

import java.util.List;

public class Window_s_function_2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.socketTextStream("hadoop100",9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(
                        new AggregateFunction<WaterSensor, Avg, Double>() {

                            @Override
                            public Avg createAccumulator() {
                                return new Avg();
                            }

                            @Override
                            public Avg add(WaterSensor value, Avg acc) {
                                acc.sum += value.getVc();
                                acc.couunt++;
                                return acc;

                            }

                            @Override
                            public Double getResult(Avg acc) {
                                return acc.sum * 1.0 / acc.couunt;
                            }

                            @Override
                            public Avg merge(Avg avg, Avg acc1) {
                                return null;
                            }
                        },
                        new ProcessWindowFunction<Double, String, String, TimeWindow>() {
                            @Override
                            public void process(String key,
                                                Context ctx,
                                                Iterable<Double> elements,
                                                Collector<String> out) throws Exception {
                                Double result = elements.iterator().next();
                                long starttime = ctx.window().getStart();
                                long endtime = ctx.window().getEnd();

                                out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);

                            }
                        }
                )

                .print();


        env.execute();

    }
    public static class Avg {
        public Integer sum = 0;
        public Long couunt = 0L;
    };
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {
                            @Override
                            public void process(String key,
                                                Context ctx,
                                                Iterable<Double> elements,
                                                Collector<String> out) throws Exception {
                                Double result = elements.iterator().next();
                                long starttime = ctx.window().getStart();
                                long endtime = ctx.window().getEnd();

                                out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);

                            }
                        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/935624.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kafka为什么这么快?

Kafka 是一个基于发布-订阅模式的消息系统&#xff0c;它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率&#xff0c;即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢&#xff1f;本文将从七个方面来分析 Kafka 的速度优…

【ARMv8 SIMD和浮点指令编程】NEON 乘法指令——乘法知多少?

NEON 乘法指令包括向量乘法、向量乘加和向量乘减,还有和饱和相关的指令。总之,乘法指令是必修课,在我们的实际开发中会经常遇到。 1 MUL (by element) 乘(向量,按元素)。该指令将第一个源 SIMD&FP 寄存器中的向量元素乘以第二个源 SIMD&FP 寄存器中的指定值,将…

机器学习策略——优化深度学习系统

正交化&#xff08;Orthogonalization&#xff09; 老式电视机&#xff0c;有很多旋钮可以用来调整图像的各种性质&#xff0c;对于这些旧式电视&#xff0c;可能有一个旋钮用来调图像垂直方向的高度&#xff0c;另外有一个旋钮用来调图像宽度&#xff0c;也许还有一个旋钮用来…

基于SpringBoot实现MySQL与Redis的数据最终一致性

问题场景 在并发场景下&#xff0c;MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起&#xff0c;导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三…

《深入理解Java虚拟机》读书笔记:方法调用

方法调用并不等同于方法执行&#xff0c;方法调用阶段唯一的任务就是确定被调用方法的版本&#xff08;即调用哪一个方法&#xff09;&#xff0c;暂时还不涉及方法内部的具体运行过程。在程序运行时&#xff0c;进行方法调用是最普遍、最频繁的操作&#xff0c;但前面已经讲过…

Nginx详解 一:编译安装Nginx和Nginx模块

文章目录 1.HTTP 和 Nginx1.1 Socket套接字1.2 HTTP工作机制1.2.1一次http事务1.2.2 资源类型1.2.3提高HTTP连接性能 2. I/O模型2.1 I/O模型相关概念2.2 网络I/O模型2.2.1 **阻塞型** **I/O** 模型&#xff08;blocking IO&#xff09;2.2.2 **非阻塞型** **I/O** **模型** **(…

在React项目是如何捕获错误的?

文章目录 react中的错误介绍解决方案后言 react中的错误介绍 错误在我们日常编写代码是非常常见的 举个例子&#xff0c;在react项目中去编写组件内JavaScript代码错误会导致 React 的内部状态被破坏&#xff0c;导致整个应用崩溃&#xff0c;这是不应该出现的现象 作为一个框架…

Java基础 数据结构一【栈、队列】

什么是数据结构 数据结构是计算机科学中的一个重要概念&#xff0c;用于组织和存储数据以便有效地进行访问、操作和管理。它涉及了如何在计算机内存中组织数据&#xff0c;以便于在不同操作中进行查找、插入、删除等操作 数据结构可以看作是一种数据的组织方式&#xff0c;不…

[maven]关于pom文件中的<relativePath>标签

关于pom文件中的<relativePath>标签 为什么子工程要使用relativePath准确的找到父工程pom.xml.因为本质继承就是pom的继承。父工程pom文件被子工程复用了标签。&#xff08;可以说只要我在父工程定义了标签&#xff0c;子工程就可以没有&#xff0c;因为他继承过来了&…

Kotlin数据结构

数据结构基础 什么是数据结构 在计算机科学中&#xff0c;数据结构&#xff08;Data Structure&#xff09;是计算机中存储、组织数据的方式。数据结构是各种编程语言的基础。 一些使用场景 不同的数据结构适用于不同的应用场景。比如HashMap与ConcurrentHashMap&#xff0…

【太多网工对NAT还存在这4种误解!你是其中一个吗?】

NAT是解决公网地址不够用大家最熟悉的网络技术之一&#xff0c;而NAT最依赖的是NAT translation表项&#xff0c;至于NAT的概念和背景这里不再解释&#xff0c;网络上有很多关于此的类似介绍&#xff0c;自己搜索即可。下面主要是针对大家对NAT的一些误解进行分析。 1 误解一…

leetcode3. 无重复字符的最长子串(滑动窗口 - java)

滑动窗口 无重复字符的最长子串滑动窗口 上期经典 无重复字符的最长子串 难度 - 中等 3. 无重复字符的最长子串 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。 示例 1: 输入: s “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc…

0201hdfs集群部署-hadoop-大数据学习

文章目录 1 前言2 集群规划3 hadoop安装包上传与安装3.1 上传解压 4 hadoop配置5 从节点同步和环境变量配置6 创建用户7 集群启动8 问题集8.1 Invalid URI for NameNode address (check fs.defaultFS): file:/// has no authority. 结语 1 前言 下面我们配置下单namenode节点h…

基于Django的博客管理系统

1、克隆仓库https://gitee.com/lylinux/DjangoBlog.git 若失效&#xff1a;https://gitee.com/usutdzxy/DjangoBlog.git 2、环境安装 pip install -Ur requirements.txt3、修改djangoblog/setting.py 修改数据库配置&#xff0c;其他的步骤就按照官方文档。 DATABASES {def…

无涯教程-机器学习 - Jupyter Notebook函数

Jupyter笔记本基本上为开发基于Python的数据科学应用程序提供了一个交互式计算环境。它们以前称为ipython笔记本。以下是Jupyter笔记本的一些功能,使其成为Python ML生态系统的最佳组件之一- Jupyter笔记本可以逐步排列代码,图像,文本,输出等内容,从而逐步说明分析过程。 它有…

【js案例】滚动效果实现及简单动画函数抽离

目录 &#x1f31f;效果 &#x1f31f;实现思路 &#x1f31f;实现方法 HTML&CSS代码 初始化 滚动效果 完整JS代码 &#x1f31f;抽离动画函数 函数的简单使用 小案例一 小案例二 &#x1f31f;效果 &#x1f31f;实现思路 要实现自动滚动&#xff0c;无非就…

高等数学上册 第十章 重积分 第十一章 曲线积分与曲面积分 知识点总结

重积分 二重积分计算法&#xff1a; 直角坐标下&#xff1a;化为二次积分 { 如果图形是 X Y 型&#xff0c;则都可以&#xff0c;但要考虑哪个计算不定积分方便 如果图形既不是 X 也不是 Y 型&#xff0c;则要拆分 极坐标下&#xff1a; ∬ f ( x , y ) d x d y ∬ f ( ρ cos…

基于适应度相关算法优化的BP神经网络(预测应用) - 附代码

基于适应度相关算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于适应度相关算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.适应度相关优化BP神经网络2.1 BP神经网络参数设置2.2 适应度相关算法应用 4.测试结果&…

Python学习之一 基于交互式解释器的简单Python编程

在很奇葩的Deepin下Miniconda安装之旅 中完成了Deepin系统下的Miniconda安装&#xff0c;在使用Miniconda 中完成了Miniconda的使用。今天&#xff0c;将开始学习Python编程。 (一) 为Python编程学习创建虚拟环境 首先创建虚拟环境&#xff0c;选择Python3.7。 conda create…