【Table/SQL Api】Flink Table/SQL Api表转流读取MySQL

news2024/11/27 4:33:03

引入依赖

jdbc依赖

flink-connector-jdbc + mysql-jdbc-driver 操作mysql数据库

        <!-- Flink-Connector-Jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        </dependency>

        <!-- mysql jdbc driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

Table/SQL Api依赖

  1. Table/SQL Api 扩展依赖
  2. Table/SQL Api 基础依赖
  3. Table/SQL Api 和 DataStream Api 交互的依赖 bridge
  4. Flink Planner 依赖
        <!-- Table/SQL Api 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
        </dependency>
        <!-- Table/SQL Api 扩展依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
        </dependency>
        <!-- bridge桥接器,主要负责Table API和 DataStream API的连接支持 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        </dependency>
        <!-- Flink Planner 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        </dependency>

对应版本在这 (项目Flink版本为1.14.5

image-20231210161727111

Flink读写MySQL工具类

Table Api 环境加载

Table API和SQL Api都是基于Table接口

Table Api上下文环境有3种类型

  1. TableEnvironment:只支持Batch作业
  2. BatchTableEnvironment:只支持Batch作业
  3. StreamTableEnvironment: 支持流计算【用这个】

Planner(查询处理器)

Planner(查询处理器):解析sql、优化sql和执行sql

Flink Planner的类型:

  1. Flink Planner (Old Planner)
  2. Blink Planner (Flink 1.14之前需要手动导入依赖)

Blink Planner从Flink 1.11版本开始为Flink-table的默认查询处理器

Blink Planner使得Table Api & SQL 层实现了流批统一

Catalog对象

Catalog对象是提供了元数据信息,数据源与数据表的信息则存储在Catalog中

// 创建Catalog对象
new JdbcCatalog(catalog_name, database, username, passwd, url);

Catalog对象是接口

Catalog接口的实现:(Flink 1.14版本之前)

  1. PG (PostgresSQL) Catalog
  2. HiveCatalog
  3. Mysql Catalog (Flink 1.15 才有)

DDL与数据库表结构必须一模一样,建立映射,这种方式数据库表结构如果变化,代码也必须随之变化重新打包,因此这种方式用的不多,一般catalog会用的比较多。

但由于项目Flink依赖用的是1.14.5,因此还是使用DDL语句实现。

代码实现

public class MysqlUtil {

    /**
     * 数据库连接对象
     */
    private static Connection connection = null;
    /**
     * SQL语句对象
     */
    private static PreparedStatement preparedStatement = null;
    /**
     * 结果集对象
     */
    private static ResultSet rs = null;


    /**
     * 使用 Flink Table/SQL Api 读取Mysql
     *
     * @param env:           流计算上下文环境
     * @param parameterTool: 参数工具
     * @param clazz:         流水线输出对象的类
     * @param tableName:     表名
     * @param ddlString:     DDL字符串
     * @param sql:           SQL查询语句
     * @return DataStream<T>:DataStream对象
     */
    public static <T> DataStream<T> readWithTableOrSQLApi(
            StreamExecutionEnvironment env,
            ParameterTool parameterTool,
            Class<T> clazz,
            String tableName,
            String ddlString,
            String sql

    ) throws Exception {

        // 创建TableApi运行环境
        EnvironmentSettings bsSettings =
                EnvironmentSettings.newInstance()
                        // Flink 1.14不需要再设置 Planner
                        //.useBlinkPlanner()
                        // 设置流计算模式
                        .inStreamingMode()
                        .build();

        // 创建StreamTableEnvironment实例
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        // 指定方言 (选择使用SQL语法还是HQL语法)
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        // 编写DDL ( 数据定义语言 )
        String ddl = buildMysqlDDL(parameterTool, tableName, ddlString);

        // StreamTableEnvironment注册虚拟表
        tableEnv.executeSql(ddl);
        // 查询结果是Table对象
        Table table = tableEnv.sqlQuery(sql);
        // 将Table对象转换为DataStream对象
        return tableEnv.toDataStream(table, clazz);
    }

    /**
     * 根据参数生成MySQL的DDL语句
     *
     * @param parameterTool  参数工具,用于获取MySQL连接信息
     * @param tableName      要创建的表名
     * @param ddlFieldString 表字段的DDL语句
     * @return 生成的完整的MySQL DDL语句
     */
    public static String buildMysqlDDL(
            ParameterTool parameterTool,
            String tableName,
            String ddlFieldString
    ) {

        // 从参数工具中获取mysql连接的url
        String url = parameterTool.get(ParameterConstants.Mysql_URL);
        // 从参数工具中获取mysql连接的用户名
        String username = parameterTool.get(ParameterConstants.Mysql_USERNAME);
        // 从参数工具中获取mysql连接的密码
        String passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);
        // 从参数工具中获取MySQL的驱动程序
        String driver = parameterTool.get(ParameterConstants.Mysql_DRIVER);

        // 返回完整的DDL语句
        return "CREATE TABLE IF NOT EXISTS " +
                tableName +
                " (\n" +
                ddlFieldString +
                ")" +
                " WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = '" + driver + "',\n" +
                "'url' = '" + url + "',\n" +
                "'username' = '" + username + "',\n" +
                "'password' = '" + passwd + "',\n" +
                "'table-name' = '" + tableName + "'\n" +
                ")";
    }

    /**
     * 初始化 jdbc Connection
     */
    public static Connection init(ParameterTool parameterTool) {

        String _url = parameterTool.get(ParameterConstants.Mysql_URL);
        String _username = parameterTool.get(ParameterConstants.Mysql_USERNAME);
        String _passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);

        try {
            connection = DriverManager.getConnection(_url, _username, _passwd);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return connection;
    }

    /**
     * 生成 PreparedStatement
     */
    public static PreparedStatement initPreparedStatement(String sql) {
        try {
            preparedStatement = connection.prepareStatement(sql);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return preparedStatement;
    }

    /**
     * 关闭 jdbc Connection
     */
    public static void close() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭 PreparedStatement
     */
    public static void closePreparedStatement() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭 ResultSet
     */
    public static void closeResultSet() {
        try {
            if (rs != null) {
                rs.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 执行 sql 语句
     */
    public static ResultSet executeQuery(PreparedStatement ps) {
        preparedStatement = ps;
        try {
            rs = preparedStatement.executeQuery();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return rs;
    }

}

测试一下

测试库中有个tb_user表

image-20231210174346826

创建与表映射的实体类

@Data
public class UserPO {
    private Long id;
    private String name;
}
class MysqlUtilTest {

    @DisplayName("测试使用 Flink Table/SQL Api 读取Mysql")
    @Test
    public void testReadWithTableOrSQLApi() throws Exception {
        // 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // 设置并行度1
        env.setParallelism(1);
        // 获取参数工具实例
        ParameterTool parameterTool = ParameterUtil.getParameters();

        /* **********************
         *
         * CREATE 语句用于向当前或指定的 Catalog 中注册表。
         * 注册后的表、视图和函数可以在 SQL 查询中使用
         *
         * *********************/
        // 表名
        String tableName = "tb_user";

        // 表字段ddl
        String ddlFieldString =
                "id BIGINT,\n" +
                        "name STRING \n";

        // 查询表的全部字段
        String sql = "SELECT * FROM " + tableName;

        DataStream<UserPO> rowDataStream =
                MysqlUtil.readWithTableOrSQLApi(
                        env,
                        parameterTool,
                        UserPO.class,
                        tableName,
                        ddlFieldString,
                        sql
                );

        rowDataStream.print("mysql");
        env.execute();
    }
}

image-20231210174720832

查询成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1300812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习——第4.1章 深度学习的数学基础

第4章 深度学习的数学基础 目录 4.1 向量 4.2 求和符号 4.3 累乘符号 4.4 导数 4.5 偏导数 4.6 矩阵 4.7 指数函数和对数函数 注意&#xff1a;4.6和4.7位于4.2章 第4章 深度学习的数学基础 本章总结一下机器学习所需的数学知识&#xff0c;同时介绍如何在Python中使用…

PolarCTF网络安全2023冬季个人挑战赛 WEB方向题解 WriteUp

完工&#xff0c;最后CB链没时间打了&#xff0c;估计也不怎么打得出来&#xff0c;今天一边在打polar一边弄服务外包赛&#xff0c;好累呜呜呜。 Polar 冬 干正则&#xff08;WEB&#xff09; 直接给了源码 parse_str()&#xff1a;将字符串解析成多个变量 payload&#xff…

基于JavaWeb+SSM+Vue马拉松报名系统微信小程序的设计和实现

基于JavaWebSSMVue马拉松报名系统微信小程序的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想 1 2相关技术 2 2.…

23 秋 计网真题回忆

总体 计网整体部分还是以全龙老师的PPT和王道的考研题和课后题考的部分很细 都是PPT上的内容 最好不要光做王道 PPT上很细的点也要看题量很大25个选择 5个判断 十个填空 5个大题好像是 不熟练的话可能写不完所有的配图都是网上找的资源 比如19-22的学校题和408真题 非本人所有…

Low Cost and High Performance FPGA with ARM and SDRAM inside

AG10KSDE176 AGM AG10KSDE176 是由 AGM FPGA AG10K 与 SDRAM 叠封集成的芯片&#xff0c;具有 AG10K FPGA 的可编程功能&#xff0c;提供更多可编程 IO&#xff0c;同时内部连接大容量 SDRAM。  FPGA 外部管脚输出 EQFP176 封装底部 Pad 为 GND&#xff0c;管脚说明请见下表&…

使用cmake构建的工程的编译方法

1、克隆项目工程 2、进入到工程目录 3、执行 mkdir build && cd build 4、执行 cmake .. 5、执行 make 执行以上步骤即可完成对cmake编写的工程进行编译 &#xff0c;后面只需执行你的编译结果即可 $ git clone 你想要克隆的代码路径 $ cd 代码文件夹 $ mkdir bu…

鸿蒙前端开发-构建第一个ArkTS应用(Stage模型)

创建ArkTS工程 若首次打开DevEco Studio&#xff0c;请点击Create Project创建工程。如果已经打开了一个工程&#xff0c;请在菜单栏选择File > New > Create Project来创建一个新工程。 选择Application应用开发&#xff08;本文以应用开发为例&#xff0c;Atomic Serv…

四. 基于环视Camera的BEV感知算法-环视背景介绍

目录 前言0. 简述1. 环视背景介绍2. 环视思路3. 主流基于环视Camera的算法详解总结下载链接参考 前言 自动驾驶之心推出的《国内首个BVE感知全栈系列学习教程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 本次课程我们来学习下课程第四章——基于环视Camer…

Kafka Streams:深度探索实时流处理应用程序

Apache Kafka Streams 是一款强大的实时流处理库&#xff0c;为构建实时数据处理应用提供了灵活且高性能的解决方案。本文将深入探讨 Kafka Streams 的核心概念、详细原理&#xff0c;并提供更加丰富的示例代码&#xff0c;以帮助读者深入理解和应用这一流处理框架。 1. Kafka…

Redis,什么是缓存穿透?怎么解决?

Redis&#xff0c;什么是缓存穿透&#xff1f;怎么解决&#xff1f; 1、缓存穿透 一般的缓存系统&#xff0c;都是按照key去缓存查询&#xff0c;如果不存在对用的value&#xff0c;就应该去后端系统查找&#xff08;比如DB数据库&#xff09;。一些恶意的请求会故意查询不存在…

ELK简单介绍二

学习目标 能够部署kibana并连接elasticsearch集群能够通过kibana查看elasticsearch索引信息知道用filebeat收集日志相对于logstash的优点能够安装filebeat能够使用filebeat收集日志并传输给logstash kibana kibana介绍 Kibana是一个开源的可视化平台,可以为ElasticSearch集群…

【Spring教程25】Spring框架实战:从零开始学习SpringMVC 之 SpringMVC入门案例总结与SpringMVC工作流程分析

目录 1.入门案例总结2. 入门案例工作流程分析2.1 启动服务器初始化过程2.2 单次请求过程 欢迎大家回到《Java教程之Spring30天快速入门》&#xff0c;本教程所有示例均基于Maven实现&#xff0c;如果您对Maven还很陌生&#xff0c;请移步本人的博文《如何在windows11下安装Mave…

react.js源码二

三、调度Scheduler scheduling(调度)是fiber reconciliation的一个过程&#xff0c;主要决定应该在何时做什么?在stack reconciler中&#xff0c;reconciliation是“一气呵成”&#xff0c;对于函数来说&#xff0c;这没什么问题&#xff0c;因为我们只想要函数的运行结果&…

高云GW1NSR-4C开发板M3硬核应用

1.M3硬核IP下载&#xff1a;Embedded M3 Hard Core in GW1NS-4C - 科技 - 广东高云半导体科技股份有限公司 (gowinsemi.com.cn) 特别说明&#xff1a;IDE必须是1.9.9及以后版本&#xff0c;1.9.8会导致编译失败&#xff08;1.9.8下1.1.3版本IP核可用&#xff09; 以下根据官方…

【后端开发】Next.js 13.4:前端开发的游戏规则改变者!

自我介绍 做一个简单介绍&#xff0c;酒架年近48 &#xff0c;有20多年IT工作经历&#xff0c;目前在一家500强做企业架构&#xff0e;因为工作需要&#xff0c;另外也因为兴趣涉猎比较广&#xff0c;为了自己学习建立了三个博客&#xff0c;分别是【全球IT瞭望】&#xff0c;【…

云计算大屏,可视化云计算分析平台(云实时数据大屏PSD源文件)

大屏组件可以让UI设计师的工作更加便捷&#xff0c;使其更高效快速的完成设计任务。现分享可视化云分析系统、可视化云计算分析平台、云实时数据大屏的大屏Photoshop源文件&#xff0c;开箱即用&#xff01; 若需 更多行业 相关的大屏&#xff0c;请移步小7的另一篇文章&#…

浅析不同NAND架构的差异与影响

SSD的存储介质是什么&#xff0c;它就是NAND闪存。那你知道NAND闪存是怎么工作的吗&#xff1f;其实&#xff0c;它就是由很多个晶体管组成的。这些晶体管里面存储着电荷&#xff0c;代表着我们的二进制数据&#xff0c;要么是“0”&#xff0c;要么是“1”。NAND闪存原理上是一…

TCP为什么可靠之“重传机制”

TCP重传机制 TCP针对数据包丢失的情况&#xff0c;会通过重传机制解决&#xff0c;包括像超时重传、快速重传、选择确认SACK、D-SACK 超时重传 TCP会设置一个定时器&#xff0c;如果在发送数据之后的规定时间内&#xff0c;没有收到对方的ACK报文&#xff0c;就会触发重新发…

基于SpringBoot+JSP+Mysql宠物领养网站+协同过滤算法推荐宠物(Java毕业设计)

大家好&#xff0c;我是DeBug&#xff0c;很高兴你能来阅读&#xff01;作为一名热爱编程的程序员&#xff0c;我希望通过这些教学笔记与大家分享我的编程经验和知识。在这里&#xff0c;我将会结合实际项目经验&#xff0c;分享编程技巧、最佳实践以及解决问题的方法。无论你是…

基于JavaWeb+SSM+Vue微信小程序的科创微应用平台系统的设计和实现

基于JavaWebSSMVue微信小程序的科创微应用平台系统的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想 1 2相关技术…