flinkSql 将流和表的互相转换

news2025/1/19 17:16:23

流——>表

方式一

方式二

方式一:写sql 
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
tableEnv.createTemporaryView("t_1",source,$("word"));

方式二:使用dsl
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
Table table = tableEnv.fromDataStream(source,$("word"));

 表——>流

Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");

// 方式一:toAppendStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);

// 报错:toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])

// 这个不支持分组和聚合操作,若出现聚合操作使用方式二将表转为流

//方式二:toRetractStream
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);

 wordCount案例

方式一:使用sql

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @基本功能:
 * @program:flinkProject
 * @author: 堇年
 * @create:2024-11-28 14:42:27
 **/
public class _06_flink_wordcounnt {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        //2. 创建表对象
        tEnv.createTemporaryView("t_1",flatMap,$("word"));
        //3. 编写sql语句
        Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");
        //4. 将Table变为stream流
        //使用toAppendStream时会报错 因为有聚合操作
        //DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);
        // toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])
        // 在这里可以映射为ROW对象,也可以映射为自己定义的实体类
        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
        retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
            @Override
            public boolean filter(Tuple2<Boolean, Row> value) throws Exception {
                return value.f0;
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

方式二:使用dsl语句 

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

public class _06_flink_wordcounnt_dsl {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        //2. 创建表对象
        Table table = tEnv.fromDataStream(flatMap,$("word"));

        //3. 编写sql语句
        Table rsTable = table.groupBy($("word")).select($("word"),$("word").count().as("wordcount"));
        rsTable.printSchema();

        //4. 将Table变为stream流

        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(rsTable, Row.class);
        retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
            @Override
            public boolean filter(Tuple2<Boolean, Row> value) throws Exception {
                return value.f0;
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

结果展示 

+I 表示有一条新数据进行了插入
+U 表示有一条已存在的数据有插入了一条,需要进行更新
-U 在+U前表示,先删除原本的,在update新的

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2255968.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linuxCNC(五)HAL驱动的指令介绍

HAL驱动的构成 指令举例详解 从终端进入到HAL命令行&#xff0c;执行halrun&#xff0c;即可进入halcmd命令行 # halrun指令描述oadrt加载comoonent&#xff0c;loadrt threads name1 period1创建新线程loadusr halmeter加载万用表UI界面loadusr halscope加载示波器UI界面sho…

SQL SERVER 2016 AlwaysOn 无域集群+负载均衡搭建与简测

之前和很多群友聊天发现对2016的无域和负载均衡满心期待&#xff0c;毕竟可以简单搭建而且可以不适用第三方负载均衡器&#xff0c;SQL自己可以负载了。windows2016已经可以下载使用了&#xff0c;那么这回终于可以揭开令人憧憬向往的AlwaysOn2016 负载均衡集群的神秘面纱了。 …

vue3+elementPlus封装的数据过滤区

目录结构 源码 index.vue <template><el-form class"mb-5" :rules"rules" :model"queryForm" ref"queryDOM" label-width"80"><el-row :gutter"20"><slot></slot><el-col cla…

iOS如何自定义一个类似UITextView的本文编辑View

对于IOS涉及文本输入常用的两个View是UITextView和UITextField&#xff0c;一个用于复杂文本输入&#xff0c;一个用于简单文本输入&#xff0c;在大多数开发中涉及文本输入的场景使用这两个View能够满足需求。但是对于富文本编辑相关的开发&#xff0c;这两个View就无法满足自…

《黑神话:悟空》闪退,提示D3D12崩溃,游戏崩溃无法启动是什么原因?要怎么解决?

《黑神话&#xff1a;悟空》闪退、D3D12崩溃及游戏无法启动&#xff1a;原因、解决方案与预防措施 作为一名软件开发从业者&#xff0c;我深知电脑游戏运行时可能遇到的各种问题&#xff0c;尤其是像《黑神话&#xff1a;悟空》这样的高品质游戏&#xff0c;其对硬件和系统配置…

JUC:Synchronized和锁升级

1. 面试题 谈谈你对Synchronized的理解Sychronized的锁升级你聊聊Synchronized实现原理&#xff0c;monitor对象什么时候生成的&#xff1f;知道monitor的monitorenter和monitorexit这两个是怎么保证同步的嘛&#xff1f;或者说这两个操作计算机底层是如何执行的偏向锁和轻量级…

SAP SD学习笔记19 - 形式发票(Proforma Invoice)

上面几章讲了投诉处理。 SAP SD学习笔记18 - 投诉处理4 - 请求书订正依赖&#xff0c;投诉处理流程的总结-CSDN博客 本章继续学习SD 模块的其他内容。 本章讲了形式发票&#xff08;Proforma Invoice&#xff09;的概要及系统操作。 形式发票是在出库确认之前&#xff0c;有…

M005 PHP+MYSQL+web编程课程网站的设计与实现 源码 配置 文档

web编程课程网站 1.摘要2.开发目的和意义3.系统功能设计4.系统界面截图5.源码获取 1.摘要 随着互联网的飞速发展&#xff0c;各行各业的信息化进程逐步加快。商业信息化、政务信息化、教育信息、服务信息化等等已遍布全国各地。信息化的服务平台能更加高效的为用户提供各种服务…

【力扣】13.罗马数字转整数

问题描述 思路解析 对于这种限制字符的问题&#xff0c;使用Map来对键值存储 对其进行判断&#xff0c;如果前面的数小于后面的数&#xff0c;那么结果相减 否则&#xff0c;正常相加。 代码 class Solution {Map<Character,Integer> mapnew HashMap<Character,In…

docker安装ddns-go(外网连接局域网)

docker先下载镜像&#xff0c;目前最新版是v6.7.6 也可以csdn资源下载 再导入dockers https://download.csdn.net/download/u014756339/90096748 docker load -i ddns-go.tar 启动 docker run -d --name ddns-go --restartalways --nethost -v /opt/ddns-go:/root jeessy/…

洛谷P4913 【深基16.例3】二叉树深度(c嘎嘎)

题目链接&#xff1a;P4913 【深基16.例3】二叉树深度 - 洛谷 | 计算机科学教育新生态 题目难度&#xff1a;普及 解题思路&#xff1a;本题要求树的深度&#xff0c;即求左右子树高度的最大值&#xff0c;首先我们用结构体存树左右节点&#xff0c;然后分别递归地去左右子树的…

Android -- [SelfView] 自定义多行歌词滚动显示器

Android – [SelfView] 自定义多行歌词滚动显示器 流畅、丝滑的滚动歌词控件* 1. 背景透明&#xff1b;* 2. 外部可控制进度变化&#xff1b;* 3. 支持屏幕拖动调节进度&#xff08;回调给外部&#xff09;&#xff1b;效果 歌词文件&#xff08;.lrc&#xff09; 一. 使用…

DNS/域名

概述 每个应用层协议都是为了解决某一类应用问题&#xff0c;而问题的解决又往往是通过位于不同主机中的多个应用进程之间的通信和协同工作来完成的。应用层的具体内容就是规定应用进程在通信时所遵循的协议。 应用层的许多协议都是基于客户服务器方式。客户(client)和服务器…

淘宝直播间智能化升级:基于LLM的学习与分析

自营直播应用技术团队负责的业务中&#xff0c;淘宝买菜的直播业务起步较晚&#xff0c;业务发展压力较大&#xff0c;业务上也就有了期望能够对一些二方的标杆直播间进行学习&#xff0c;并将其优点应用到自己直播间的需求。 最初 - 人海战术&#xff0c;学习PK 业务侧最直接的…

数学拯救世界(一)———寻“数”记

一、 很久很久以前&#xff0c;在一个只认识整数和小数的国度&#xff0c;有一个很残暴的国王提了一个要求&#xff1a;要是不能表示出把一段1米的绳子三等分后的大小&#xff0c;就要把所有的大臣杀掉。 1➗3 0.333&#xff0c;怎么办呀&#xff1f;怎么办呀&#xff1f; 袁q…

夏普MX-4608N复印机维修模式进入方法及载体初始化方法

夏普MX-4608N复印机载体型号&#xff08;图&#xff09;&#xff1a; 型 号&#xff1a;载体&#xff08;黑色&#xff09;MX-561CV 净含量&#xff1a;395克 下面图片中分别有载体、刮板、鼓芯、上纸盒搓纸轮一套&#xff0c;均原装正品&#xff1b; 保养周期将至的时候建…

FPGA Xilinx维特比译码器实现卷积码译码

FPGA Xilinx维特比译码器实现卷积码译码 文章目录 FPGA Xilinx维特比译码器实现卷积码译码1 Xilinx维特比译码器实现2 完整代码3 仿真结果 MATLAB &#xff08;n,k,m&#xff09;卷积码原理及仿真代码&#xff08;你值得拥有&#xff09;_matlab仿真后代码-CSDN博客 MATLAB 仿真…

java+ssm+mysql水产品商城

项目介绍&#xff1a; 使用javassmmysql开发的水产品商城&#xff0c;系统包含管理员、用户角色&#xff0c;功能如下&#xff1a; 管理员&#xff1a;用户管理&#xff1b;种类管理&#xff1b;商品管理&#xff1b;订单管理&#xff1b;评论管理&#xff1b;新闻管理&#…

【传感器技术】第5章 电容式传感器,变极距式电容传感器,变面积式电容传感器,变介质式电容传感器

关注作者了解更多 我的其他CSDN专栏 过程控制系统 工程测试技术 虚拟仪器技术 可编程控制器 工业现场总线 数字图像处理 智能控制 传感器技术 嵌入式系统 复变函数与积分变换 单片机原理 线性代数 大学物理 热工与工程流体力学 数字信号处理 光电融合集成电路…

二叉树前序遍历

什么是前序遍历&#xff1f; 一个二叉树的前序遍历就是对于树中的每一个节点而言&#xff0c;都是先遍历自己&#xff0c;再遍历左右。 如&#xff1a; 递归实现前序遍历 题目链接&#xff1a;144. 二叉树的前序遍历 - 力扣&#xff08;LeetCode&#xff09; 实现步骤&…