FlinkSql开窗实例:消费kafka写入文本

news2025/1/16 14:06:12

前言

以前写Flink从kafka入hdfs因为业务需求和老版本缘故都是自定义BucketSink入动态目录中,对于简单的需求可以直接用Flink SQL API进行输出。Flink版本1.13.1。

Flink官网示例

准备

本地下载个kafka(单机即可),新建个桌面目录文件夹k2f。

输入输出源

按照建表有:

执行操作语句:

 String opSql ="insert into fileOut select id,name,age,sum(score) from kafkaInput group by id";

报错如下,原因是这样数据是增量(不支持),需要进行开窗:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.fileOut' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id], select=[id, SUM(score) AS EXPR$1])

开窗需要指定水印字段(这里我们采用kafka自动生成的eventTine时间戳<kafka0.10.1.0后>,除此之前外我们还能获取offset和partition等元数据信息水印相关具体可见:Flink事件时间和水印详解),指定字段eventTime为kafka元数据的timestamp,以及生成水印时间1s

 		// 创建flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1.3 基于Blink的流处理
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                //.inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);

        String kafkaInput = "CREATE TABLE kafkaInput( \n" +
                "id string, \n" +
                "name string, \n" +
                "eventTime TIMESTAMP(3) METADATA FROM 'timestamp' ,\n" +
                "WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND,\n" +
                "score BIGINT \n" +
                ")WITH( \n" +
                "'connector' = 'kafka', \n" +
                "'topic' = 'k2f_topic_1', \n" +
                "'properties.bootstrap.servers' = '127.0.0.1:9092', \n" +
                "'properties.group.id' = 'testGroup', \n" +
                "'format' = 'csv' \n" +
                ")\n";

        String fileOut = "CREATE TABLE fileOut( \n" +
                "id string, \n" +
                "score BIGINT, \n" +
                "window_start TIMESTAMP(3),\n" +
                "window_end TIMESTAMP(3)\n" +
                ") WITH ( \n" +
                "'connector' = 'filesystem', \n" +
                "'format' = 'csv',"+
                "'path' = 'file:\\C:\\Users\\cbry\\Desktop\\k2f' \n" +
                ")";

user_action_time AS PROCTIME() – 声明一个额外的列作为处理时间属性,这个事件是系统计算的时间,不需要从我们的源头数据进行提供只需要声明

指定操作语句

前置数据准备好了,水印策略和字段也准备好了,开窗的窗口策略设置在操作语句,如下指定TUMBLE滚动窗口和

		String opSql ="insert into fileOut select id,sum(score),window_start, window_end from TABLE(" +
                "TUMBLE(TABLE kafkaInput, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS)) group by window_start, window_end,id";

        tableEnv.executeSql(kafkaInput);
        TableResult outTable = tableEnv.executeSql(fileOut);
        tableEnv.executeSql(opSql);
        outTable.print();
        env.execute("k2F");

若不指定水印报错:

在这里插入图片描述

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval) requires the timecol is a time attribute type, but is TIMESTAMP(3).

效果

kafka生产:

11020102,cbry,1
11020102,cbry,1
11020102,cbry,1
11020102,cbry,1
… 省略 …
11020102,cbry,100
11020102,cbry,1000
11020102,cbry,10000
11020102,cbry,1
11020102,cbry,1
11020102,cbry,1
11020102,cbry,1
… 省略 …

文件输出:

11020102,2,“2022-08-12 14:43:50”,“2022-08-12 14:44:00”
11020102,2,“2022-08-12 14:44:00”,“2022-08-12 14:44:10”
11020102,1,“2022-08-12 14:44:30”,“2022-08-12 14:44:40”
11020102,1,“2022-08-12 14:44:50”,“2022-08-12 14:45:00”
11020102,1,“2022-08-12 14:46:40”,“2022-08-12 14:46:50”
11020102,100,“2022-08-12 14:46:50”,“2022-08-12 14:47:00”
11020102,1000,“2022-08-12 14:47:10”,“2022-08-12 14:47:20”
11020102,10000,“2022-08-12 14:50:00”,“2022-08-12 14:50:10”
11020102,5,“2022-08-12 14:58:10”,“2022-08-12 14:58:20”
11020102,2,“2022-08-12 14:58:30”,“2022-08-12 14:58:40”
11020102,1,“2022-08-12 14:59:00”,“2022-08-12 14:59:10”
11020102,3,“2022-08-12 14:59:30”,“2022-08-12 14:59:40”
11020102,1,“2022-08-12 14:59:40”,“2022-08-12 14:59:50”

在这里插入图片描述

FlinkSQL的窗口类型

窗口函数TVF(table-valued functions)一共有三种:滚动(TUMBLE )、滑动(HOP)、累计(CUMULATE):Flink官网SQLAPI窗口函数TVF。

简单描述下:窗口函数除去原始source表的所有列外,额外有三个列:“window_start”、“window_end”、“window_time”。其中window_time指的是窗口生成的时间属性列(窗口计算时间),且window_time总是等于window_end - 1ms。所以使用FlinkSQL开窗必须要source表中有时间字段`。

加入一个window_time到最后:

11020102,2,“2022-08-12 16:46:20”,“2022-08-12 16:46:30”,“2022-08-12 16:46:29.999”
11020102,1,“2022-08-12 16:46:30”,“2022-08-12 16:46:40”,“2022-08-12 16:46:39.999”

目前不支持单窗口函数,必须跟

group by window_start, window_end, 。。。

一起使用聚合。

开窗规范(以滚动为例)

本质上上将窗口的结果封装称一张动态表。

TUMBLE(TABLE data, DESCRIPTOR(timecol), size , [(可选) offset ])
  • data: 数据原表;
  • timecol:作为窗口的常规时间字段纳入窗口表中。
  • size:开窗宽度(大小)
  • offset:窗口开始的偏移量

timecol不太好理解,简单的说就是:窗口采用的时间类型的时间字段,据此滚动。比如说采用source表中的eventTime,那么开窗时间就是第一条数据的eventTime的值。

当使用EventTime的时候必须指定水印。所以在Flink事件时间和水印详解中,水印的时间字段和窗口时间字段保持一致,因为创建水印的时候指定了EvenTime在元数据中的字段。

举个例子:当前是2022-08-12 19:15:20 ,将eventTime免去kafka的时间戳关联:

             "eventTime TIMESTAMP(3) ,\n" +

模拟数据:

在这里插入图片描述

关窗

在操作的过程中现象,要有新的数据才能刷新统计结果:新的数据:插入新的水印;

参考Flink窗口详解和各示例使用 触发窗口条件:

窗口Window会在以下的条件满足时被触发执行:

  • watermark时间 >= window_end_time(闭窗);
  • 在[window_start_time,window_end_time)中有数据存在(入窗);

所以说内存中始终有上一个窗口没关闭,而这个窗口需要等待新的数据关窗。

实际应用场景中:kafka写入生产多个分区的时候(kafka的topic多分区),如果写入对应的分区没有足够的数据量来触发窗口的闭合,会导致数据结果迟迟不出现和结果偏差(所以需要实时数据不断涌入),这个问题在测试的时候因为测试数据量不够导致查询了很久。。。mark一下

阿里的blink:DATE_FORMAT

问题

kafka只能增量

Exception in thread “main” org.apache.flink.table.api.TableException: Table sink ‘default_catalog.default_database.sink_LLRZ’ doesn’t support consuming update changes which is produced by node GroupAggregate(groupBy=[host, app, stream, dev_name, server_ip, $f5, batch_time, to_date_time, id, window_time], select=[host, app, stream, dev_name, server_ip, $f5, batch_time, to_date_time, id, window_time, SUM(cash_out_flow) AS EXPR$1, SUM(total_duration) AS EXPR$2, MIN(start_time) AS EXPR$8, MAX(end_time) AS EXPR$9])

原因:需要group by

目前不支持单窗口函数,必须跟

group by window_start, window_end, 。。。

一起使用聚合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/117337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unreal 读写自定义配置文件

基础 首先需要自定义一个继承自UObject的类&#xff0c;UCLASS加上config标志 UCLASS(config MyClass) class UMyClass: public UObject将想要和配置文件交互的属性&#xff0c;UFUNCTION加上Config标志 UPROPERTY(Config, EditAnywhere) float TestP;之后只要配置文件内存…

【日常系列】LeetCode《21·综合应用3》

数据规模->时间复杂度 <10^4 &#x1f62e;(n^2) <10^7:o(nlogn) <10^8:o(n) 10^8<:o(logn),o(1) 内容 lc 217 &#xff1a;存在重复元素 https://leetcode.cn/problems/contains-duplicate/ 提示&#xff1a; 1 < nums.length < 10^5 -10^9 < nums[…

Python基础教程(2)——列表、元组、字典、集合、斐波纳契数列、end 关键字、条件控制、循环语句

1.列表 &#xff08;1&#xff09;删除列表的元素 list [Google, Runoob, 1997, 2000] print ("原始列表 : ", list) del list[2] print ("删除第三个元素 : ", list)&#xff08;2&#xff09;Python列表脚本操作符 &#xff08;3&#xff09;嵌套列表…

Arco 属性

文章目录Arco介绍1. 简介1.1 背景1.2 运行环境1.3 浏览器兼容性2. 设计价值观2.1 清晰2.2 一致2.3 韵律2.4 开放3. 设计原则3.1 及时反馈3.2 贴近现实3.3 系统一致性3.4 防止错误发生3.5 遵从习惯3.6 突出重点3.7 错误帮助3.8 人性化帮助4. 界面总体风格4.1 页面风格4.1.1 主色…

知识答题小程序如何制作_分享微信答题抽奖小程序制作步骤

知识答题小程序如何制作&#xff1f;现在越来越多的企业和组织逐步进行各种获奖知识问答小程序。那么&#xff0c;如何制作一个答题小程序呢&#xff1f;今天&#xff0c;我们一起来看看~需要的老板不要走开哦~既然点进来了&#xff0c;那就请各位老板耐心看到最后吧~怎么做一个…

JDBC如何破坏双亲委派机制

JDBC的注册会涉及到java spi机制&#xff0c;即Service Provideer Interface&#xff0c;主要应用于厂商自定义组件或插件中&#xff1b;简单说就是java来定义接口规则和方法&#xff0c;厂商实现具体逻辑&#xff0c;每家厂商根据自己产品实现的逻辑肯定不相同&#xff0c;但上…

数据库查询语句-详细篇

今天来梳理一下数据库的一些查询语句&#xff0c;做软件/移动端/电脑端&#xff0c;开发程序时必然离不开数据库的设计以及查询&#xff1b; 一&#xff1a;具体的代码如下展示&#xff1a; 1.查询数据库指定表的所有信息 select * from uploadimagecode;2.查询当前数据表部…

说说PPT的“只读模式”和“限制编辑”有何区别

对PPT的内容进行保护&#xff0c;使其不能随意编辑&#xff0c;防止意外更改&#xff0c;我们可以将PPT设置成无法编辑、无法改动的“保护模式”。 设置“保护模式”&#xff0c;一般我们都会想到【限制编辑】模式&#xff0c;但在设置的时候&#xff0c;会发现PPT里&#xff…

毕业半年年终总结

毕业半年年终总结 如果说2021年主要的内容是求职和实习 那么2022年一年主要的内容便是毕业和工作 匆匆忙忙 本科毕业了 6月份的时候参加完毕业答辩&#xff0c;也就顺利的毕业了 实际上中途也有过一些插曲&#xff0c;比如毕业设计是制作某某管理系统&#xff0c;基本上所有…

【Java编程进阶】流程控制结构详解

推荐学习专栏&#xff1a;Java 编程进阶之路【从入门到精通】 文章目录1. 流程控制结构2. 顺序结构3. 分支结构3.1 单分支3.2 双分支3.3 多分支 (if-else)3.4 嵌套 if3.5 多分支结构 (switch)4. 循环结构4.1 for 循环4.2 while 循环4.3 do...while循环5. 流程跳转5.1 break5.2 …

【数据结构】优先级队列(堆)

成功就是失败到失败&#xff0c;也丝毫不减当初的热情 目录 1.理解优先级队列 2.优先级队列的底层 2.1 认识堆 2.1.1 堆的概念 2.2.2 堆的存储 2.2 堆的创建 2.2.1 向下调整算法 2.2.2 堆的创建 2.3 堆的插入 2.4 堆的删除 2.5 查看堆顶元素 2.6 堆的运用 3…

windows 11 安装jdk1.8

1.先去JDK官网下载 JDK1.8官网 2.进入到官网之后 3. 点击上图windows选项       按照你的电脑是32位还是64位按需下载(我电脑是64位) 4. 点击下载之后就会跳转到Oracle账号登录界面&#xff08;没有Oracle账号的注册一下这边我就省略了注册了&#xff09; 5.把下载好的…

商业智能BI财务分析,如何从财务指标定位到业务问题

商业智能BI开发人员都会思考如何从财务指标定位到业务问题&#xff0c;就是做了很多的商业智能BI开发&#xff0c;每次也都涉及到了财务分析&#xff0c;各种财务能力指标&#xff0c;各种可视化的分析图表。但是不知道这些财务指标到底能够反映出企业的什么问题&#xff0c;和…

蓝桥杯Python练习题3-十六进制转八进制

资源限制 内存限制&#xff1a;512.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述 给定n个十六进制正整数&#xff0c;输出它们对应的八进制数。 输入格式 输入的第一行为一个正整数n &#xff08;1<n<10&am…

Weston 纹理倒置(render-gl)

纹理倒置 背景 在 render-gl 接入 frame buffer object 实现 off-screen 渲染后,发现得到的渲染图发生了180的倒置. 查阅了有关资料后,在 eglspec.1.5 中的 2.2.2.1 Native Surface Coordinate Systems 找到了答案: The coordinate system for native windows and pixmaps …

2023届毕业生职场第一步:挡飞刀

这篇博客不会教你某一段代码怎么写&#xff0c;某一个知识点怎么入门&#xff0c;但却可以让你在2023年的职场上&#xff0c;躲避飞刀。 目录 1、啥是挡飞刀 2、其他知名大厂也好不到哪里去 3、 毕业生如何躲飞刀&#xff1f; 4、毕业生首选什么样的公司 5、不建议去这样的…

工具学习——ubuntu轻量桌面对比

因为最近要做一些ubuntu上的开发&#xff0c;然后使用ssh问题是经常会出现中断&#xff0c;虽然可以使用等tmux方法来挂起进程&#xff0c;但是感觉不如界面方便&#xff0c;然后现在问题来了&#xff0c;我的ubuntu服务器是一个双核的性能很差内存也少的机器&#xff0c;我怎么…

13-Golang中for循环的用法

Golang中for循环的用法for循环基本语法for循环流程图注意事项和使用细节for循环 就是让一段代码循环的执行。 基本语法 for循环变量初始化&#xff1b;循环条件&#xff1b;循环变量迭代{循环操作(语句)}package main import "fmt"func main(){for i : 1; i < …

C#,谷歌(Google)CityHash64与CityHash128散列哈希算法的C#源程序

1、CityHash简史 Google 2011年发布了 CityHash 系列字符串散列算法 。今天发布的有两种算法&#xff1a;CityHash64 与 CityHash128 。它们分别根据字串计算 64 和 128 位的散列值。这些算法不适用于加密&#xff0c;但适合用在散列表等处。 Google 一直在根据其数据中心常…

“刀片嗓”“水泥鼻”“咳出肺”可以这样缓解!

很多人感染新冠后&#xff0c;咽痛、鼻塞、干咳和其他不适&#xff0c;非常不舒服&#xff0c;在网上讨论也总结了“刀片嗓”、“水泥鼻”、“咳出肺”三个字生动地展现了他们的不适。今天&#xff0c;对于这三种症状&#xff0c;今天就为大家带来一些缓解的小方法。 病症一&am…