Flink数据流类型之间的转换(WindowedStream、DataStream、KeyedStream、AllWindowStream之间的转换)

news2025/1/16 20:51:20

       Flink提供了一些流API,其中包括WindowedStream、DataStream、KeyedStream和AllWindowStream。

🍊WindowedStream是一种特殊的流,其中数据已按时间或数据元素的键进行分组,并且每个分组的数据都在窗口中按时间划分。这意味着,如果你有一个WindowedStream,你可以对每个窗口执行转换,例如聚合或统计。

🍊DataStream是Flink中最基本的流类型,表示一个无界的、有序的数据流。它可以是任何类型的数据,例如数值、字符串或复杂的对象。

🍊KeyedStream是一种特殊的DataStream,其中数据已按照一个键(通常是一个数值或字符串)进行分组。这意味着你可以对每个键执行转换,例如聚合或计数。

🍊AllWindowStream是一种特殊的WindowedStream,其中数据流被分成固定大小的所有窗口。这意味着你可以对整个数据流执行转换,而无需将数据分组。

如下图所示,WindowedStream、DataStream、KeyedStream、AllWindowStream之间的转换

在这里插入图片描述

~下面使用代码做一些简单的转换示例,希望能对你有所帮助

       如,你可以使用keyBy()函数将DataStream转换为KeyedStream。( DataStream -> KeyedStream)

DataStream<String> dataStream = ...;
//DataStream -> KeyedStream
KeyedStream<String, String> keyedStream = dataStream.keyBy(new KeySelector<String, String>() {
    @Override
    public String getKey(String value) throws Exception {
        return value;
    }
});

       要将KeyedStream转换为WindowedStream,你可以使用window()函数。例如,以下代码将每个数据元素的键的流分成5秒的滑动窗口( KeyedStream-> WindowedStream):

KeyedStream<String, String> keyedStream = ...;
//KeyedStream-> WindowedStream
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(SlidingTimeWindows.of(Time.seconds(5)));

       还可以使用windowAll()函数将DataStream转换为AllWindowStream(DataStream-> AllWindowStream)。例如,以下代码将数据流分成10秒的滑动窗口:

DataStream<String> dataStream = ...;
//DataStream-> AllWindowStream
AllWindowStream<String, TimeWindow> allWindowStream = dataStream.windowAll(SlidingTimeWindows.of(Time.seconds(10)));

       你可以使用以下代码将WindowedStream转换为DataStream(WindowedStream-> DataStream):

WindowedStream<T> windowedStream = ...;
DataStream<T> dataStream = windowedStream.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10)));

       你可以使用reduce函数将KeyedStream转换为DataStream(KeyedStream-> DataStream)。例如,假设你有一个整数类型的KeyedStream,并希望将其转换为所有键的和的DataStream,你可以使用以下代码:

KeyedStream<Integer, String> keyedStream = ...;
DataStream<Integer> sumStream = keyedStream.reduce(new ReduceFunction<Integer>() {
  public Integer reduce(Integer value1, Integer value2) {
    return value1 + value2;
  }
});

       你可以使用以下代码将DataStream转换为WindowedStream(DataStream-> WindowedStream)。这段代码将DataStream转换为带有滑动窗口的KeyedStream,然后使用window函数将其转换为WindowedStream,最后使用WindowFunction将WindowedStream中的数据进行转换。

DataStream<T> dataStream = ...;
WindowedStream<T, K, TimeWindow> windowedStream = dataStream.keyBy(new KeySelector<T, K>() {
  public K getKey(T value) {
    // Return the key for the value
  }
}).window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5)))
  .apply(new WindowFunction<T, T, K, TimeWindow>() {
    public void apply(K key, TimeWindow window, Iterable<T> values, Collector<T> out) {
      for (T value : values) {
        out.collect(value);
      }
    }
  });

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/142586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年出入境政策-喜忧参半

2023年已经到来&#xff0c;随着卫健委公布中国防控新冠措施调整优化以后&#xff0c;出入境政策相应也有了很大变化&#xff0c;知识人网小编概括为喜忧参半。喜的是从国外入境中国不再需要集中隔离&#xff1b;忧的是有些国家对于中国人入境增加了核酸检测要求。下面我们就这…

第一章 Java入门开发

第一章 Java入门开发 目录一&#xff0e; 概述二&#xff0e; JDK1. 概述2. 安装3. JDK目录一&#xff0e; 概述 Java是一门高级程序设计语言&#xff0c;是支持跨平台和完成面向对象的程序设计语言。针对不同的开发市场&#xff0c;sun公司将Java分为Java SE&#xff08;标准版…

关于clip通信架构设计的调研

网络上大部分关于clip-as-service的描述都是关于它如何使用&#xff0c;基于它的编码功能上去计算文本相似度&#xff0c;根据文字推荐图片等等&#xff0c;只有作者的创作思路里面提及通信架构的设计。 作者博客&#xff1a; 链接: link 如何解决多个客户端同时请求服务端的场…

STS4中MVC项目中把log4j从1.x升级到2.x中遇到的两个问题

文章目录问题一 升级后看Maven Dependencies中还是有依赖1.x的log4j问题二 web.xml配置不对项目原来的log4j版本是1.2.14&#xff0c;有漏洞需要升级到2.18.0.问题一 升级后看Maven Dependencies中还是有依赖1.x的log4j 原因是有关联依赖&#xff0c; 项目中别的jar库有依赖低…

【算法笔记】【专题】RMQ 问题:ST表/树状数组/线段树

0. 前言 好久没更算法笔记专栏了&#xff0c;正好学了新算法来更新…… 这也是本专栏的第一个专题问题&#xff0c;涉及到三种数据结构&#xff0c;如果写得有问题请各位大佬多多指教&#xff0c;谢谢&#xff01; 1. 关于 RMQ 问题 RMQ 的全称是 Range Minimum/Maximum Que…

《Linux运维实战:Centos7.6基于docker-compose一键离线部署单节点redis6.2.8 》

一、部署背景 由于业务系统的特殊性&#xff0c;我们需要面向不通的客户安装我们的业务系统&#xff0c;而作为基础组件中的redis针对不同的客户环境需要多次部署&#xff0c;作为一个运维工程师&#xff0c;提升工作效率也是工作中的重要一环。所以我觉得有必要针对redis6.2.8…

使用 .NET 标记游戏地图关键坐标点

本文以天涯明月刀 OL 游戏的云上之城探索玩法为例&#xff0c;介绍如何使用 .NET 在游戏地图中标记大量关键坐标点。 1. 背景 大概很多程序员都是喜欢玩游戏的吧&#xff0c;我也不例外。我们经常会看到电视剧中的各路游戏大神&#xff0c;要么是有只有他一个人会的骚操作&…

Linux--信号--信号的产生方式--核心转储--0104

1. 什么是信号 生活中的信号&#xff1a;红绿灯&#xff0c;狼烟&#xff0c;撤退、集合...。 我们认识这些信号&#xff0c;首先是因为自己记住了对应场景下的信号后续需要执行的动作。如果信号没有产生&#xff0c;我们依旧知道如何处理这个信号。收到信号&#xff0c;我们…

springboot学习(七十八) springboot中通过自定义注解实现数据脱敏的功能

文章目录前言一、引入hutools工具类二、定义常用需要脱敏的数据类型的枚举三、定义脱敏方式枚举四、自定义脱敏的注解五、自定义Jackson的序列化方式六、使用七、脱敏效果前言 对于某些接口返回的信息&#xff0c;涉及到敏感数据的必须进行脱敏操作&#xff0c;例如银行卡号、…

带你了解ssh服务过程

远程连接服务 1、什么是远程连接服务器 远程连接服务器通过文字或图形接口方式来远程登录系统&#xff0c;让你在远程终端前登录linux主机以取得可操作主机接口&#xff08;shell&#xff09;&#xff0c;而登录后的操作感觉就像是坐在系统前面一样。 2、远程连接服务器的功…

【C++】函数重载的使用及原理

概述 在学校里&#xff0c;我们都会有班里同学被起外号的经历&#xff0c;而且同一个人可能还会有好几个外号。 在自然语言中&#xff0c;一个词可以有多重含义&#xff0c;人们可以通过上下文来判断该词真实的含义&#xff0c;即该词被重载了。 目录 概述 什么是函数重载 …

项目管理:如何制作项目进度计划表?

项目进度管理是根据项目目标&#xff0c;编制合理的进度计划&#xff0c;并在项目推进过程中随时检查项目执行情况。 项目进度管理的目的就是为了实现最优工期&#xff0c;多快好省地完成任务。 而甘特图&#xff0c;就是用表格图形的方式来展示项目的进展&#xff0c;是一个比…

赛狐ERP:优秀的亚马逊运营具备的五项能力!

我们都知道&#xff0c;亚马逊运营是整个店铺的主导&#xff0c;很大程度上会影响着一个店铺经营的好坏&#xff0c;那么一个好的亚马逊运营&#xff0c;应该具备哪些能力呢&#xff1f;今天赛狐ERP就来给和大家聊一聊&#xff0c;希望对各位亚马逊运营们会有启发&#xff01;1…

ORB-SLAM2 --- LocalMapping::Run 局部建图线程解析

目录 一、线程作用 二、局部建图线程主要流程 三、局部建图线程主函数 四、调用函数解析 4.1 设置"允许接受关键帧"的状态标志LocalMapping::SetAcceptKeyFrames函数解析 4.2 查看列表中是否有等待被插入的关键帧LocalMapping::CheckNewKeyFrames函数 4.3 …

十分钟学会在linux上部署chrony服务器(再见 NTP,是时候拥抱下一代时间同步服务 Chrony 了)

chrony服务器 Chrony 相较于 NTPD 服务的优势 安装与配置&#xff08;Chrony的配置文件是/etc/chrony.conf&#xff09; 同步网络时间服务器 设置开机启动&#xff0c;重启服务 chronyc sources 输出结果解析 练习 实验模型图如下 实验a如下 实验b如下 再见 NTP&#x…

中国手机市场全面衰退,连苹果也未能幸免,大跌近三成

CINNO公布了11月份国内手机市场的数据&#xff0c;数据显示2022年11月份中国市场的手机出货量同比下滑21.7%&#xff0c;在整体大环境出现销量下滑的情况下&#xff0c;此前曾持续逆势增长的苹果也顶不住了&#xff0c;苹果在中国市场的出货量也出现了下滑的势头。数据显示2022…

06-Alibaba Nacos注册中心源码剖析

Nacos&Ribbon&Feign核心微服务架构图 架构原理 1、微服务系统在启动时将自己注册到服务注册中心&#xff0c;同时外发布 Http 接口供其它系统调用(一般都是基于SpringMVC) 2、服务消费者基于 Feign 调用服务提供者对外发布的接口&#xff0c;先对调用的本地接口加上注…

JS继承有哪些,你能否手写其中一两种呢?

引言 JS系列暂定 27 篇&#xff0c;从基础&#xff0c;到原型&#xff0c;到异步&#xff0c;到设计模式&#xff0c;到架构模式等&#xff0c; 本篇是 JS系列中第 3 篇&#xff0c;文章主讲 JS 继承&#xff0c;包括原型链继承、构造函数继承、组合继承、寄生组合继承、原型…

前端vue项目发送请求不携带cookie(vue.config.js和nginx反向代理)

一、本地环境——使用vue.config.js配置了跨域代理本来发现问题&#xff0c;是因为后台记录到接收到的sessionId一直在变化&#xff0c;导致需要在同一个sessionId下处理的逻辑无法实现。一开始以为是前后端分离跨域导致的&#xff0c;网上给出了解决方案&#xff1a;main.js中…

线程同步的实现

线程同步 同步就是协同步调&#xff0c;按预定的先后次序进行运行。如:你说完&#xff0c;我再说。 "同"字从字面上容易理解为一起动作 其实不是&#xff0c;"同"字应是指协同、协助、互相配合。 如进程、线程同步&#xff0c;可理解为进程或线程A和B一…