0基础学习PyFlink——使用Table API实现SQL功能

news2024/11/18 12:47:35

在《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL,来实现数据读取、清洗、计算和入库。
如下图所示SQL是最高层级的抽象,在它之下是Table API。本文我们会将例子中的SQL翻译成Table API来实现等价的功能。
在这里插入图片描述

Souce

    # """create table source (
    #         word STRING
    #     ) with (
    #         'connector' = 'filesystem',
    #         'format' = 'csv',
    #         'path' = '{}'
    #     )
    # """.format(input_path)

下面的SQL分为两部分:

  • Table结构:该表只有一个名字为word,类型为string的字段。
  • 连接器:是“文件系统”(filesystem)类型,格式是csv的文件。这样输入就会按csv格式进行解析。

SQL中的Table对应于Table API中的schema。它用于定义表的结构,比如有哪些类型的字段和主键等。
上述整个SQL整体对应于descriptor。即我们可以认为descriptor是表结构+连接器。
我们可以让不同的表和不同的连接器结合,形成不同的descriptor。这是一个组合关系,我们将在下面看到它们的组合方式。

schema

    # define the source schema
    source_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING()) \
        .build()

new_builder()会返回一个Schema.Builder对象;
column(self, column_name: str, data_type: Union[str, DataType])方法用于声明该表存在哪些类型、哪些名字的字段,同时返回之前的Builder对象;
最后的build(self)方法返回Schema.Builder对象构造的Schema对象。

descriptor

    # Create a source descriptor
    source_descriptor= TableDescriptor.for_connector("filesystem") \
        .schema(source_schema) \
        .option('path', input_path) \
        .format("csv") \
        .build()

for_connector(connector: str)方法返回一个TableDescriptor.Builder对象;
schema(self, schema: Schema)将上面生成的source_schema 对象和descriptor关联;
option(self, key: Union[str, ConfigOption], value)用于指定一些参数,比如本例用于指定输入文件的路径;
format(self, format: Union[str, ‘FormatDescriptor’], format_option: ConfigOption[str] = None)用于指定内容的格式,这将指导怎么解析和入库;
build(self)方法返回TableDescriptor.Builder对象构造的TableDescriptor对象。

Sink

    # """CREATE TABLE WordsCountTableSink (
    #         `word` STRING,
    #         `count` BIGINT,
    #         PRIMARY KEY (`word`) NOT ENFORCED
    #     ) WITH (
    #         'connector' = 'jdbc',
    #         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',
    #         'table-name' = 'WordsCountTable',
    #         'driver'='com.mysql.jdbc.Driver',
    #         'username'='admin',
    #         'password'='pwd123'
    #     );
    # """

schema

    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()

大部分代码在之前已经解释过了。我们主要关注于区别点:

  • primary_key(self, *column_names: str) 用于指定表的主键。
  • 主键的类型需要使用调用not_null(),以表明其非空。

descriptor

    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector("jdbc") \
        .schema(sink_schema) \
        .option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \
        .option("table-name", "WordsCountTable") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("username", "admin") \
        .option("password", "pwd123") \
        .build()

这块代码主要是通过option来设置一些连接器相关的设置。可以看到这是用KV形式设计的,这样就可以让option方法有很大的灵活性以应对不同连接器千奇百怪的设置。

Execute

使用下面的代码将表创建出来,以供后续使用。

t_env.create_table("source", source_descriptor)
tab = t_env.from_path('source')
t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    # execute insert
    # """insert into WordsCountTableSink
    #     select word, count(1) as `count`
    #     from source
    #     group by word
    # """
    tab.group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

这儿需要介绍的就是lit。它用于生成一个表达式,诸如sum、max、avg和count等。
execute_insert(self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False)用于将之前的计算结果插入到Sink表中

完整代码

import argparse
import logging
import sys

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
    
def word_count(input_path):
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)

    # """create table source (
    #         word STRING
    #     ) with (
    #         'connector' = 'filesystem',
    #         'format' = 'csv',
    #         'path' = '{}'
    #     )
    # """

    # define the source schema
    source_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING()) \
        .build()
        
    # Create a source descriptor
    source_descriptor = TableDescriptor.for_connector("filesystem") \
        .schema(source_schema) \
        .option('path', input_path) \
        .format("csv") \
        .build()
        
    t_env.create_table("source", source_descriptor)


    # """CREATE TABLE WordsCountTableSink (
    #         `word` STRING,
    #         `count` BIGINT,
    #         PRIMARY KEY (`word`) NOT ENFORCED
    #     ) WITH (
    #         'connector' = 'jdbc',
    #         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',
    #         'table-name' = 'WordsCountTable',
    #         'driver'='com.mysql.jdbc.Driver',
    #         'username'='admin',
    #         'password'='pwd123'
    #     );
    # """

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector("jdbc") \
        .schema(sink_schema) \
        .option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \
        .option("table-name", "WordsCountTable") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("username", "admin") \
        .option("password", "pwd123") \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    
    
    # execute insert
    # """insert into WordsCountTableSink
    #     select word, count(1) as `count`
    #     from source
    #     group by word
    # """
    
    tab = t_env.from_path('source')
    tab.group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input)

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/
  • https://nightlies.apache.org/flink/flink-docs-release-1.17/api/python//reference/pyflink.table/descriptors.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1129413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习合集】深度学习模型优化方法最优化问题合集 ->(个人学习记录笔记)

文章目录 最优化1. 最优化目标1.1 凸函数&凹函数1.2 鞍点1.3 学习率 2. 常见的深度学习模型优化方法2.1 随机梯度下降法2.2 动量法(Momentum)2.3 Nesterov accelerated gradient法(NAG)2.4 Adagrad法2.5 Adadelta与Rmsprop法2.6 Adam法2.7 Adam算法的改进 3. SGD的改进算法…

LVS+keepalived高可用集群

1、定义 keepalived为lvs应运而生的高可用服务。lvs的调度器无法做高可用,keepalived实现的是调度器的高可用,但keepalived不只为lvs集群服务的,也可以做其他代理服务器的高可用,比如nginxkeepalived也可实现高可用(重…

解密Kubernetes:探索开源容器编排工具的内核

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

zabbix6.0 部署配置

架构 先简单介绍zabbix监控的最主要的两个组件: zabbix server zabbix agent server 用来部署 web console以及相关的数据存储,所以需要配合一些数据库来保存数据,比如mysql,pgsql, 又有前端的页面所以还需要配置 nginx 和getway 所以 serve…

【makedown使用介绍】

如何使用makedown 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必…

计算机网络【CN】IPV4报文格式

版本(4bit):IPV4/IPV6首部长度(4bit):标识首部的长度 单位是4B最小为:20B最大为:60(15*4)B总长度(16bit):整个数据报&…

目录遍历漏洞

漏洞挖掘之目录遍历漏洞 (baidu.com) 从0到1完全掌握目录遍历漏洞 0x01 什么是目录遍历漏洞 目录遍历漏洞是由于网站存在配置缺陷,导致网站目录可以被任意浏览,这会导致网站很多隐私文件与目录泄露。 比如数据库备份文件、配置文件等,攻击…

Vue项目中使用require的方式导入图片资源,本地运行无法打开的问题

问题描述 项目经理说需快速要写一个大屏,然后拿给售前去给客户做个展示。其中有一块需要展示一个拓扑图,绘制拓扑图时用了定义了一个图片节点,然后图片的导入方式是 require的方式,然后本地npm run dev启动的时候可以正常显示&…

JVM进阶(1)

一)JVM是如何运行的? 1)在程序运行前先将JAVA代码转化成字节码文件也就是class文件,JVM需要通过类加载器将字节码以一定的方式加载到JVM的内存运行时数据区,将类的信息打包分块填充在运行时数据区; 2)但是字节码文件是JVM的一套指…

大数据技术学习笔记(二)—— Hadoop 运行环境的搭建

目录 1 准备模版虚拟机hadoop1001.1 修改主机名1.2 修改hosts文件1.3 修改IP地址1.3.1 查看网络IP和网关1.3.2 修改IP地址 1.4 关闭防火墙1.5 创建普通用户1.6 创建所需目录1.7 卸载虚拟机自带的open JDK1.8 重启虚拟机 2 克隆虚拟机3 在hadoop101上安装JDK3.1 传输安装包并解压…

likeadmin部署

以下内容写于2023年9月17日,likeadmin版本 1.登录页404,且无法登录 参照官方教程部署后,访问登录页,能打开但提示404,点登录也是404,在issues中搜到新搭建的环境,登录管理后台,报re…

系统设计 - 我们如何通俗的理解那些技术的运行原理 - 第八部分:Linux、安全

本心、输入输出、结果 文章目录 系统设计 - 我们如何通俗的理解那些技术的运行原理 - 第八部分:Linux、安全前言Linux 文件系统解释应该知道的 18 个最常用的 Linux 命令HTTPS如何工作?数据是如何加密和解密的?为什么HTTPS在数据传输过程中会…

java通过IO流下载保存文件

我们在开发过程中,可能会遇到需要到远程服务器上下载文件的需求,一般我们的文件可能会有一个url地址,我们拿到这个地址,可以构建URLConnection对象,之后可以根据这个URLConnection来获取InputStream,之后&a…

C++ list 的使用

目录 1. 构造函数 1.1 list () 1.2 list (size_t n, const T& val T()) 1.3 list (InputIterator first, InputIterator last) 2. bool empty() const 3. size_type size() const 4. T& front() 4. T& back() 5. void push_front (const T& val) 6.…

【Java系列】Java 基础

目录 基础1.JDK和JRE的区别2.Java为什么不直接实现lterator接口,而是实现lterable?3.简述什么是值传递和引用传递?4.概括的解释下Java线程的几种可用状态? 中级1.简述Java同步方法和同步代码块的区别 ?2.HashMap和Hashtable有什么区别?3.简述Java堆的结构? 什…

生命礼赞,带动世界第三次文化复兴——非洲回顾篇

一个民族的复兴需要强大的物质力量,也需要强大的精神力量。大型玉雕群组《生命礼赞》是对中华民族伟大生命的讴歌,是对百姓美好生活的赞美,完美诠释了中华民族的伟大图腾,它象征着中华民族在党的带领下艰苦奋斗,江山稳…

嵌入式软件工程师面试题——2025校招专题(二)

说明: 面试题来源于网络书籍,公司题目以及博主原创或修改(题目大部分来源于各种公司);文中很多题目,或许大家直接编译器写完,1分钟就出结果了。但在这里博主希望每一个题目,大家都要…

美团动态ThreadPoolExecutor底层实现源码实战

开篇:介绍springboot连接nacos实现动态线程池,同时得安装nacos,同时代码将有两个模块,dtp-spring-boot-starter 与 user 模块,前者将是独立的动态线程池,可以引入自己的项目中,后者模块主要用于…

面试官:听说你很了解Java8特性,给我优化一下这段代码吧?

文章目录 前言我的想法面试官 前言 在之前的一次面试过程中,我被问到了一道代码优化题:对于下面的代码,你有什么优化的思路呢? boolean handleStrList(String strList){for (String s :strList){if(s.length()%20){return true;…

测试用例的设计方法(全):等价类划分方法

一.方法简介 1.定义 是把所有可能的输入数据,即程序的输入域划分成若干部分(子集),然后从每一个子集中选取少数具有代表性的数据作为测试用例。该方法是一种重要的,常用的黑盒测试用例设计方法。 2.划分等价类: 等价类是指某个输入域的…