Flink05 Windows 操作轻松应对复杂的场景

news2025/1/16 21:15:40

Flink Windows 操作

上篇文章介绍了Flink 几种类型 Windows 本文介绍窗口操作相关API,以及各自使用场景 。

本期Flink Windows 相关操作apply/union/join/collect/CoMap/CoFlatMap

Windows apply

通过实现WindowFunction或AllWindowFunction接口来完成的,这些接口允许你定义如何对窗口内的数据进行处理。通过使用这些函数,你可以实现复杂的窗口计算逻辑,满足各种业务需求。

例如对窗口内数值进行求和:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class WindowDemo {
    public static void main(String[] args) throws Exception {


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> input = ...;

        DataStream<Tuple2<String, Integer>> result = input
                .keyBy(f->f.f0) // 按第一个字段(假设是key)进行分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 定义一个10秒的时间窗口
                .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> value : values) {
                            sum += value.f1;
                        }
                        out.collect(new Tuple2<>(key, sum)); // 输出键和窗口内值的总和
                    }
                });

        result.print(); // 打印结果


        env.execute();
    }

   
}


Union

两个或多个数据流的联合会创建一个包含所有流中所有元素的新流。

注意事项 :使用Union操作符合并的数据流,其数据类型必须相同

Union 操作,很方便实现对"多源数据"进行分析, 在实际应用中,数据可能来自多个不同的源,不同的系统的,不同的数据库。那么Union 就有用武之地。

当然也有在一些复杂的流处理场景中,可能会先将一个大的数据流根据某些条件分流成多个小的数据流进行独立处理,然后再将这些处理后的数据流通过Union操作符合并成一个数据流,以便进行进一步的汇总或分析。

以下是一个Union 使用示例代码

package com.codetonight.datastream.operater;

import com.codetonight.Example;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UnionExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<Example.Person> flintstones1 = env.fromElements(
                new Example.Person("Fred", 35),
                new Example.Person("Wilma", 35),
                new Example.Person("Pebbles", 2));

        DataStream<Example.Person> flintstones2 = env.fromElements(
                new Example.Person("Tom", 35),
                new Example.Person("java", 35),
                new Example.Person("CPlus", 2));

        DataStream<Example.Person> flintstones = flintstones1.union(flintstones2);

        DataStream<Example.Person> adults = flintstones.filter(new FilterFunction<Example.Person>() {
            @Override
            public boolean filter(Example.Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {}

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}


Window Join

它允许开发者在两个或多个数据流之间基于时间和/或键值进行联接操作。Window Join作用于两个流中有相同key且处于相同窗口的元素上

使用方式

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Interval Join

Interval Join允许数据流(我们称之为“左流”)去Join另一条数据流(我们称之为“右流”)中前后一段时间内的数据。Interval Join处理具有时间相关性的数据流时特别有用,比如在处理用户行为分析、交易日志等场景中。

下面代码展示  Interval Join 使用方式。

满足 ** key1 == key2 && leftTs - 2 < rightTs < leftTs + 2**,

Interval Join的使用场景包括但不限于:

用户行为分析:例如,分析用户浏览行为交易行为之间关系

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new ProcessJoinFunction{...});

coGroup

Window CoGroup是将两个数据流按照指定的key和窗口进行分组,并在每个窗口结束时将两个流中相同key的数据合并在一起。这种操作通常用于需要同时对两个流中的数据进行聚合、比较或关联的场景

Window CoGroup适用于需要对两个流中的数据进行复杂关联或聚合操作的场景,如:

实时用户行为分析:将用户的浏览行为和点击行为数据合并,分析用户的兴趣偏好。

用户可以在CoGroupFunction中实现自定义的处理逻辑,如聚合、过滤、比较等操作

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

    /**
     * This method must be implemented to provide a user implementation of a coGroup. It is called
     * for each pair of element groups where the elements share the same key.
     *
     * @param first The records from the first input.
     * @param second The records from the second.
     * @param out A collector to return elements.
     * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
     *     and may trigger the recovery logic.
     */
    void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
  

Connect

它允许开发者将两个DataStream(数据流)连接起来,以便后续可以对这两个流中的数据进行统一处理。与Union操作不同,Connect操作不要求两个流的数据类型必须相同

两个流之间可以共享状态,这在处理复杂的数据流逻辑时非常有用

dataStream.connect 操作会返回一个 ConnectedStreams

   ConnectedStreams collectStream  = dataStream.connect(otherStream);

CoMap, CoFlatMap

CoMap 和 CoFlatMap 将不同类型的数据流进行组合和转换以生成新的数据流

ConnectedStreams → DataStream

下面例子CoMap使用例子。

  1. Interger数据流 与 String 数据流 进行 connect 操作 得到connectedStreams

  2. 将connectedStreams  通过CoMapFunction  转换为一个String 数据流

  3. CoMapFunction map1 对原Interger数据流中元素 乘2 后再转String

  4. CoMapFunction map2 对原String数据流中元素转为大写

public class CoMapExample {

    public static void main(String[] args) throws Exception {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建两个不同类型的数据流
        DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
        DataStream<String> stream2 = env.fromElements("a", "b", "c");

        // 连接两个数据流
        ConnectedStreams<Integer, String> connectedStreams = stream1.connect(stream2);

        // 使用 CoMap 进行转换操作
        DataStream<String> resultStream = connectedStreams.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) {
                return String.valueOf(value * 2);
            }

            @Override
            public String map2(String value) {
                return value.toUpperCase();
            }
        });

        // 打印结果
        resultStream.print();

        // 执行任务
        env.execute();
    }
}

CoFlatMap 读者参考上篇文章FlatMap 类比学习。CoFlatMap 可以完成CoMap 功能,前面例子稍作修改。读者根据输出结果自行体会CoFlatMap 与 Map 区别。

总结

本期Flink Windows 相关操作apply/union/join/collect/CoMap/CoFlatMap 。介

绍了各自的使用场景,并给出简单的例子。关于Flink 操作还有很多,期待我们一起持续学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2214545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

工具篇:(三)MacOS 两种方式下载 Node.js 并进行测试教程

MacOS 两种方式下载 Node.js 并进行测试教程 1.Node.js 官网 下载 步骤 1: 访问 Node.js 官网 打开浏览器&#xff0c;访问 Node.js 的官方网站&#xff1a;https://nodejs.org。 在首页&#xff0c;你会看到两个版本可供下载&#xff1a; LTS&#xff08;长期支持版本&…

从0到1封装一个image/pdf预览组件

目录结构 content.vue <template><div class"no-content-block"><i class"iconfont icondocument large-file" /><div class"text-wrapper">{{ t(__ui__.siPreview.previewSupported) }}</div><div class&quo…

【CSS in Depth 2 精译_049】7.2 CSS 响应式设计中的媒体查询原则(下):响应式列的添加

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 【第七章 响应式设计】&#xff08;概述&#xff09; 7.1 移动端优先设计原则&#xff08;上篇&#xff09; 7.1.1 创建移动端菜单&#xff08;下篇&#xff09;7.1.2 给视口添加 meta 标签&#xf…

RabbitMQ 核心功能详解

引言 在现代分布式系统中&#xff0c;消息队列已经成为一种不可或缺的组件。它不仅能够实现应用之间的解耦&#xff0c;还能提高系统的灵活性和可扩展性。RabbitMQ 是一款基于 AMQP&#xff08;Advanced Message Queuing Protocol&#xff09;协议的消息中间件&#xff0c;以其…

香港举办AIHCIR 2024国际学术会议,领先人工智能、人机交互和机器人技术

第三届人工智能、人机交互和机器人国际学术会议 &#xff08;AIHCIR 2024&#xff09;将于2024年11月在中国香港举行&#xff0c;聚焦AI、人机交互与机器人领域&#xff0c;邀请知名学者演讲&#xff0c;促进学术交流。论文经评审后提交EI检索&#xff0c;投稿需全英文&#xf…

图解Redis 04 | Set数据类型的原理及应用场景

介绍 Redis 的 Set 类型是一个不允许重复元素的集合&#xff0c;元素存储的顺序不按照插入的顺序&#xff0c;因此属于无序集合。一个 Set 最多可以存储 2^32 - 1 个元素&#xff0c;这与数学中的集合概念类似。Set 类型不仅支持增、删、改、查等操作&#xff0c;还支持多个Se…

微软的 Drasi:一种轻量级的事件驱动编程方法

微软的开源数据变化处理平台有望提供一种全新的方式来构建和管理可产生持续事件流的云应用程序。 Microsoft Azure 孵化团队是微软超大规模云中比较有趣的组成部分之一。它介于传统软件开发团队和研究组织之间&#xff0c;致力于构建大规模分布式系统问题的解决方案。 这些解决…

使用拖拽生成活动海报(vue项目)

<template><div class"poster-editor"><div class"toolbar" v-if"edit 1"><div style"text-align: center;font-size: 14px;font-weight: 700;margin-bottom: 10px;">工具栏</div><div class"…

WPF组件的自定义模板和触发器全面解析

Windows Presentation Foundation&#xff08;WPF&#xff09;是微软提供的一个用于构建桌面客户端应用程序的UI框架。其依赖于XAML&#xff08;Extensible Application Markup Language&#xff09;进行用户界面设计&#xff0c;提供了一套强大的控件和组件模型。在WPF开发中&…

C++ 的存储类型与新的 thread_local

1 C 的存储类型 1.1 存储周期&#xff08;Storage duration&#xff09; 存储周期表示一个变量的存储空间持续的时间&#xff0c;它应该与对象的语义生命周期一致&#xff08;或至少不小于对象的语义生命周期&#xff09;。C 98从 C 继承了三种存储周期&#xff0c;分别是静态…

【黑马点评】项目知识点及面经整理

【黑马点评】项目知识点及面经整理 1 短信登录&#xff08;Session&#xff0c;Redis&#xff0c;JWT验证&#xff09;1.1 JWT和Session的区别1.2 Session1.3 Redis1.3.1 基于Redis实现登录验证1.3.2 登录拦截 1.4 JWT1.4.1 JWT的有效验证1.4.2 JWT定义 2 热点数据缓存2.1 缓存…

区块链-智能合约Solidity编程

文章目录 一、ubuntu安装二、FISCO BCOS安装五、 WeBASE安装5.1 WeBASE简介5.2 节点前置服务搭建5.3 调用HelloWorld合约 七、Solidity极简入门7.1. 值类型7.2. 变量数据存储和作用域7.3. 函数7.4 控制流7.5 数组&映射7.6 结构体7.7 修饰符7.8 事件7.9 面向对象7.10 抽象合…

SoC芯片中Clock Gen和Reset Gen的时钟树综合

社区目前已经开设了下面列举的前四大数字后端实战课程&#xff0c;均为直播课&#xff0c;且均是小编本人亲自授课&#xff01;遇到项目问题&#xff0c;都可以远程一对一指导解决具体问题。小编本人是一线12年后端经验的数字后端工程师。想找一线IC后端技术专家亲自带你做后端…

Java Fork-Join框架学习

概述 Fork/Join是Java7提供的一个用于并行执行任务的框架&#xff0c;是一个把大任务分割成若干个小任务&#xff0c;最终汇总每个小任务结果后得到大任务结果的框架。Fork负责把一个大任务切分为若干并行执行的子任务&#xff0c;Join负责合并这些子任务的执行结果&#xff0…

Ubuntu系衍生版手动修改配置网卡的配置总结

一、Ubuntu系的IP地址配置文件的目录&#xff1a; sudo vim /etc/network/interfaces 二、以DHCP方式配置网卡&#xff1a; 在以上配置文件中添加以下两行&#xff1a; auto enp3s0 iface enp3s0 inet dhcp 三、为网卡配置静态IP地址&#xff1a; 在以上配置文件中添…

实验3,网络地址转换

实验3&#xff1a;网络地址转换 实验目的及要求&#xff1a; 通过实验&#xff0c;掌握NAT技术的工作原理&#xff0c;了解三种不同类型NAT技术的主要作用以及各自的主要应用环境。能够完成静态NAT和复用NAT技术的应用&#xff0c;并熟练掌握NAT技术相关的配置命令。 实验设…

el-date-picker选择时间后标准时间少1小时问题

问题 前端开发中发现Element的时间组件el-date-picker在选择选择部分时间后js对象的标准时间少1小时&#xff0c;如果选择的小时为0&#xff0c;会导致部分转换条件下结果少1天。 比如组件中选择的本地时间为&#xff1a; 1988-08-01 00:00:00 而js对象获取到是标准时间是&am…

ubuntu 安装kali命令补全功能

输入命令时&#xff0c;之前的命令会以阴影显示&#xff0c;按下右键或 Tab 键可以直接补全 安装zsh-autosuggestions sudo apt install zsh-autosuggestions编辑 ~/.zshrc环境变量 if [ -f /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh ]; then. /usr/share/zs…

【从零开始的LeetCode-算法】945. 使数组唯一的最小增量

给你一个整数数组 nums 。每次 move 操作将会选择任意一个满足 0 < i < nums.length 的下标 i&#xff0c;并将 nums[i] 递增 1。 返回使 nums 中的每个值都变成唯一的所需要的最少操作次数。 生成的测试用例保证答案在 32 位整数范围内。 示例 1&#xff1a; 输入&am…

【Hadoop】HDFS基本操作

参考&#xff1a;3.HDFS基本操作_哔哩哔哩_bilibili 创建目录 hadoop fs -mkdir -p /training/qiang查看当前根目录下文件 hadoop fs -ls /hadoop fs -ls /training/目录授权 hadoop fs -chmod -R 777 /training/qm777是最大权限&#xff0c;读写 4、2、1 上传文件 先创…