flink table view datastream互转

news2024/11/27 13:43:49

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convert

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataType


object streamPOJO2table {
  case class outer(f1:String,f2:Inner)
  case class outerV1(f1:String,f2:Inner,f3:Int)
  case class Inner(f3:String,f4:Int)

  def main(args: Array[String]): Unit = {
    // flink1.13 流处理环境初始化
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)
    import org.apache.flink.streaming.api.scala._
    val ds1: DataStream[outer] = env.fromElements(
      outer("a",Inner("b",2))
      ,outer("d",Inner("e",4))
    )


    val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()
    /*
    +----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+
     */



//    table1
//      .print()
    /*
    5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]
     */


    tEnv.createTemporaryView("view1", ds1)

    val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")
    tableResult1.print()
    /*
    +----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
     */


//
    val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()

//    println(t1.getResolvedSchema)
    /*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(
  `f1` STRING,
  `f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,
  `f3` INT NOT NULL
)
     */

    println("---- 1 -------")
    // tableResult转datastream
    val o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()

    println("---- 2 -------")

    tEnv.executeSql(
      """
        |select
        |f1
        |,f2.f3
        |,f2.f4
        |from view1
        |""".stripMargin)
//      .print()
    /*
    +----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+
     */

    tEnv.executeSql(
        """
          |select
          |f1
          |,(f2.f3,f2.f4)
          |from view1
          |""".stripMargin)
//      .print()


    env.execute("jobName1")
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1357177.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

canvas绘制直角梯形(向左)

查看专栏目录 canvas示例教程100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

visual studio 2022在查找和替换使用正则表达式查找if()

文件内容如下&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace ConsoleApp1 {internal class Program{static void Main(string[] args){TempFunction();}private static void T…

初识Web服务器

一、web服务器 1、什么是web服务器&#xff1f; web服务器就是web项目的容器&#xff0c;我们将开发好的web项目部署到web容器中&#xff0c;才能使用网络中的用户通过浏览器进行访问。 一张图带你了解web服务器有啥作用&#xff1a; 在我的电脑上有一个已经做好的项目&#…

【Python学习】Python学习1

目录 【Python学习】Python学习1 1.前言2.Python安装3.PyCharm安装4.PyCharm插件推荐5.参考 文章所属专区 Python学习 1.前言 Python 是一种解释型、面向对象、动态数据类型的高级程序设计语言。Python 由 Guido van Rossum 于 1989 年底发明&#xff0c;第一个公开发行版发…

如何借助于AI自研一款换脸app

文章目录 背景涉及的关键技术解析技术流程详解后续待补充 背景 在当今的数字时代&#xff0c;人工智能&#xff08;AI&#xff09;技术已经深入到各个领域&#xff0c;其中之一就是换脸技术。现在&#xff0c;有一个免费的AI换脸应用程序&#xff0c;可以让用户轻松地将自己的…

CTF数据分析题详解

目录 题目一(1.pcap) 题目二(2.pcap) 题目三(3.pcap) 题目四(4.pcap) CTF流量分析经典例题详解-CSDN博客 本文章涉及的所有题目附件下载地址&#xff1a; 链接&#xff1a; https://pan.baidu.com/s/18mWo5vn1zp_XbmcQrMOKRA 提取码&#xff1a;hrc4 声明&#xff1a;这里…

python统计分析——直方图(df.hist)

使用dataframe.hist()或series.hist()函数绘制直方图 import numpy as np import pandas as pd from matplotlib import pyplot as plt.dfpd.DataFrame(data{type:[A,A,A,A,A,A,A,A,A,A,B,B,B,B,B,B,B,B,B,B],value:[2,3,3,4,4,4,4,5,5,6,5,6,6,7,7,7,7,8,8,9] }) serpd.Serie…

Focal Loss

1、样本不均衡的 问题 与 方案 Focal loss 用于解决上述 样本不均衡的问题 : \quad 1、正负样本数量不均衡 \quad 2、易分类的样本和难分类的样本数量不均衡

4.快速实现增删改查,模糊查询功能

打开springboot项目&#xff0c;在com.example下建包common,在common下新建Result.java 4.1封装统一的返回数据结构 1.在Result.java中编写如下代码&#xff1a; private static final String *SUCCESS*"0"; private static final String *ERROR*"-1"; p…

使用Kafka与Spark Streaming进行流数据集成

在当今的大数据时代&#xff0c;实时数据处理和分析已经变得至关重要。为了实现实时数据集成和分析&#xff0c;组合使用Apache Kafka和Apache Spark Streaming是一种常见的做法。本文将深入探讨如何使用Kafka与Spark Streaming进行流数据集成&#xff0c;以及如何构建强大的实…

rime中州韵小狼毫 help lua Translator 帮助消息翻译器

lua 是 Rime中州韵/小狼毫输入法强大的武器&#xff0c;掌握如何在Rime中州韵/小狼毫中使用lua&#xff0c;你将体验到什么叫 随心所欲。 先看效果 在 rime中州韵 输入效果一览 中的 &#x1f447; help效果 一节中&#xff0c; 我们看到了在Rime中州韵/小狼毫输入法中输入 h…

Python(wordcloud):根据文本数据(.txt文件)绘制词云图

一、前言 本文将介绍如何利用python来根据文本数据&#xff08;.txt文件&#xff09;绘制词云图&#xff0c;除了绘制常规形状的词云图&#xff08;比如长方形&#xff09;&#xff0c;还可以指定词云图的形状。 二、相关库的介绍 1、安装相关的库 pip install jieba pip i…

how2heap-2.23-04-unsorted_bin_leak

#include<stdio.h> #include<malloc.h>int main() {char* a malloc(0x88);char* b malloc(0x8);free(a);long* c malloc(0x88);printf("%lx , %lx\n",c[0],c[1]);return 0; }unsorted bin leak原理&#xff1a;将chunk从unsorted bin申请回来时&#…

Transforer逐模块讲解

本文将按照transformer的结构图依次对各个模块进行讲解&#xff1a; 可以看一下模型的大致结构&#xff1a;主要有encode和decode两大部分组成&#xff0c;数据经过词embedding以及位置embedding得到encode的时输入数据 输入部分 embedding就是从原始数据中提取出单词或位置&…

matlab如何标定相机内外参和畸变参数

关于内外参矩阵和畸变矩阵可以学习 https://blog.csdn.net/qq_30815237/article/details/87530011?spm1001.2014.3001.5506 在APP中找到 camera Calibrator 点击 Add Images&#xff0c;导入拍照图片。标定20张左右就够了&#xff0c;然后角度变一下&#xff0c;但不需要变太…

微信小程序+前后端开发学习材料

目录结构 全局文件 1.app.json 文件 用来对微信小程序进行全局配置&#xff0c;决定页面文件的路径、窗口表现、设置网络超时时间、设置多 tab 等。文件内容为一个 JSON 对象。 1.1 page用于指定小程序由哪些页面组成&#xff0c;每一项都对应一个页面的 路径&#xff08;含文…

计算机基础面试题 |10.精选计算机基础面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

UE相关杂项笔记

1.PAK包解析 UE4如何反向查找Pak里面包含哪些文件 - 哔哩哔哩 CMD控制台命令输入 D:&quot;Epic Games&quot;\UE_5.1\Engine\Binaries\Win64\UnrealPak.exe 包路径 -list *文件夹带空格时 添加“ ”包裹住文件夹名 解包工具路径 UE引擎安装路径\UE_5.1\Engine\Binarie…

sql:定时执行存储过程(嵌套存储过程、使用游标)

BEGINDeclare FormNo nvarchar(20) --单号Declare Type nvarchar(50) --类型Declare PickedQty float -Declare OutQty float Declare 生产量 floatDeclare 已装箱数量 float Declare 已入库数量 floatDeclare 损耗数量 float Declare 退货品出库数量 intdeclare k c…

Mac上修复Gitee报错 Oauth: Access token is expired

一. 背景&#xff1a; 最近在gitee上拉了两次项目&#xff0c;两次使用的邮箱密码不一致&#xff08;换绑邮箱&#xff09;&#xff0c;第一次在idea中拉取后端项目&#xff0c;第二次在webstorm中拉取前端项目&#xff0c;出现该异常&#xff0c;记录下解决方案 二. 错误回显…