流批一体计算引擎-8-[Flink]的Table API连接器

news2025/1/11 9:50:11

参考官方文档Table API连接器

1 Table API连接器概述

Flink的Table API和SQL程序可以连接到其他外部系统,用于读取和写入批处理表和流式表。source表提供对存储在外部系统(如数据库、键值存储、消息队列或文件系统)中的数据的访问。sink表将table发送到外部存储系统。根据source和sink的类型,它们支持不同的格式,如CSV、Avro、Parquet或ORC。

本页介绍如何使用本机支持的连接器在Flink中注册source表和sink表。注册source或sink后,可以通过Table API和SQL语句访问它。

Name			Source								Sink
Filesystem		Bounded and Unbounded Scan,Lookup	Streaming,Batch
Elasticsearch	Not supported						Streaming,Batch
Opensearch		Not supported						Streaming,Batch
Apache Kafka	Unbounded Scan						Streaming,Batch***
JDBC			Bounded Scan,Lookup					Streaming,Batch***
Apache HBase	Bounded Scan,Lookup					Streaming,Batch
Apache Hive		Unbounded Scan,Bounded Scan,Lookup	Streaming,Batch
Amazon DynamoDB	Not supported						Streaming,Batch
Amazon Kinesis Data Streams		Unbounded Scan		Streaming
Amazon Kinesis Data Firehose	Not supported		Streaming

Flink支持使用SQL CREATE TABLE语句来注册表。可以定义表名、表模式和用于连接到外部系统的表选项。

2 Print SQL连接器

Print连接器允许将每一行写入标准输出流或者标准错误流。
设计目的:简单的流作业测试,对生产调试带来极大便利。

创建一张基于Print的表

# -*- coding: UTF-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 1. 创建 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2.创建source表
my_source_ddl = """
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '5'
    )
"""
# 注册source表
table_env.execute_sql(my_source_ddl)

# 3.创建sink表
my_sink_ddl = """
    CREATE TABLE print_table (
        id INT,
        data STRING
    ) WITH ('connector' = 'print')
"""
# 注册sink表
table_env.execute_sql(my_sink_ddl)

# 4.通过SQL查询语句来写入
table_env.execute_sql("INSERT INTO print_table SELECT * FROM datagen").wait()

在这里插入图片描述

3 JDBC SQL连接器

JDBC连接器允许使用JDBC驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立JDBC连接器来执行SQL查询。

3.1 依赖

flink-connector-jdbc-1.16.0.jar
mysql-connector-java-8.0.19.jar

3.2 创建表

DROP TABLE IF EXISTS `books`;
CREATE TABLE `books` (
  `id` int NOT NULL,
  `title` varchar(255) DEFAULT NULL,
  `authors` varchar(255) DEFAULT NULL,
  `years` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

一、连接器参数

connector	必填	(none)	String	指定使用什么类型的连接器,这里应该是'jdbc'。
url			必填	(none)	String	JDBC 数据库 url。
table-name	必填	(none)	String	连接到 JDBC 表的名称。
driver		可选	(none)	String	用于连接到此URL的JDBC驱动类名,如果不设置,将自动从URL中推导。
username	可选	(none)	String	JDBC用户名。如果指定了'username''password'中的任一参数,则两者必须都被指定。
password	可选	(none)	String	JDBC 密码。

二、创建JDBC表

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;

3.3 代码示例

# -*- coding: UTF-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 1. 创建 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2.创建source表
my_source_ddl = """
    CREATE TABLE books (
      id INT,
      title STRING,
      authors STRING,
      years INT
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC',
       'driver' = 'com.mysql.cj.jdbc.Driver',
       'table-name' = 'books',
       'username' = 'root',
       'password' = 'bigdata'
    )
"""
# 注册source表
table_env.execute_sql(my_source_ddl)

# 3.查看JDBC表中的数据
table = table_env.sql_query("select * from books")

table.execute().print()

在这里插入图片描述

4 Kafka SQL连接器

4.1 依赖

flink-connector-kafka-1.16.0.jar

4.2 示例代码

4.2.1 csv格式

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect

# 1. 创建 StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)

st_env = StreamTableEnvironment.create(s_env)
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")

# 2.创建source表
my_source_ddl = """
    CREATE TABLE KafkaTable (
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING,
      `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.bootstrap.servers' = '192.168.43.48:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv'
)
"""
# 注册source表
st_env.execute_sql(my_source_ddl)

# 3.(SQL表)KafkaTable转换成(Table API表)table
table = st_env.from_path("KafkaTable")

table.execute().print()

kafka中输入如下信息:

1,2,3
1,2,"good"

ts是自动生成的,系统时间。
在这里插入图片描述

4.2.2 json格式

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect

# 1. 创建 StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)

st_env = StreamTableEnvironment.create(s_env)
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")

# 2.创建source表
my_source_ddl = """
    CREATE TABLE KafkaTable (
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING,
      `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.bootstrap.servers' = '192.168.43.48:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
)
"""
# 注册source表
st_env.execute_sql(my_source_ddl)

# 3.(SQL表)KafkaTable转换成(Table API表)table
table = st_env.from_path("KafkaTable")

table.execute().print()

kafka中输入如下信息:

{"user_id":1,"item_id":2,"behavior":"good"}

ts是自动生成的,系统时间。
在这里插入图片描述

4.2.3 时间戳的处理

# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect

# 1. 创建 StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)

st_env = StreamTableEnvironment.create(s_env)
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")

# 2.创建source表
my_source_ddl = """
    CREATE TABLE KafkaTable (
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING,
      `ts` TIMESTAMP(3)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.bootstrap.servers' = '192.168.43.48:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
)
"""
# 注册source表
st_env.execute_sql(my_source_ddl)

# 3.(SQL表)KafkaTable转换成(Table API表)table
table = st_env.from_path("KafkaTable")

table.execute().print()

kafka中输入如下信息:

{"user_id":1,"item_id":2,"behavior":"good","ts":"2023-01-26 12:30:30.000"}

ts是输入的时间戳。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/180771.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Sentienl学习笔记

PS:本文为作者学习黑马程序员Springcould视频笔记实际技术参考价值不大,文章将持续更新。 文章目录一. 什么是Sentienl1. 介绍2. 与Hystrix对比3. 主要特性二. Sentienl安装配置1. 下载安装包2. 启动三. Sentienl的使用1. Sentienl的整合2. 簇点链路四. …

每日学术速递1.27

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 前沿推介: ICLR 2023 ICLR 全称为国际学习表征会议(International Conference on Learning Representations),今年将举办的是第 11 届,预计将于 5 月 1 日至 5 …

Redis实现附近商铺 | 黑马点评

一、GEO数据结构 1、入门 GEO是Geolocation的缩写,代表地理坐标。redis3.2中加入对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。 常见命令: GEOADD:添加一个地理空间信息,包含&…

springcloud3 Sentinel的服务熔断操作

一 服务熔断 1.1 服务熔断 Sentinel熔断降级会在调用链路中某个资源出现不稳定状态时(调用超时或者异常比例升高),对这个资源的调用进行限制,让请求快速失败,避免影响到其他资源进而导致级联错误。 当资源被降级后&…

07_linux中断控制

裸机开发要点 通用中断控制器(GIC) 中断类型、硬件中断号、分发器和cpu接口单元 中断向量表 一级查表、二级查表 中断处理流程 进入irq模式、保护现场、获取硬件中断编号、执行中断处理函数、还原现场 设备树构造 分为 gic中断控制器设备树节点 其他外设中断控制器节点 需要…

大数据相关组件

一、 HDFSHDFS是hadoop的核心组件,HDFS上的文件被分成块进行存储,默认块的大小是64M,块是文件存储处理的逻辑单元。HDFS是Master和Slave的结构。分NameNode、SecondaryNameNode、DataNode这几个角色。NameNode:是Master节点&#…

springcloud3 Sentinel的搭建以及案例操作

一 sentinel的概念 1.1 sentinel Sentinel是分布式系统流量控制的哨兵,阿里开源的一套服务容错的综合性解决方案。 主要用来处理: 服务降级 服务熔断 超时处理 流量控制 sentinel 的使用可以分为两个部分: 核心库(Java 客户端&#…

基于nodejs+vue的留学服务管理平台的设计与开发

目 录 摘 要 I Abstract I 第一章 绪论 1 1.1系统开发的背景 1 1.2系统开发的意义 1 1.3本文研究内容 2 第二章 系统开发技术 3 第三章 系统分析 6 3.1用户需求分析 6 3.1.1 老师用户 6 3.1.2 学生用户 6 3.1.3 管理员用户 6 3.2 系统…

6--总线

文章目录一.总线概述(一)总线特性(二)总线分类1.按功能分/按连接的部件分(1)片内总线/CPU内部总线(2)系统总线(3)通信总线/外部总线2.按数据传输格式分&#…

7、关系运算符与关系表达式

目录 一、关系运算符 二、关系表达式 三、优先级与结合性 一、关系运算符 关系运算符包括大于、大于等于、小于、小于等于、等于和不等于 注意&#xff1a;符号“>”&#xff08;大于等于&#xff09;与“<”&#xff08;小于等于&#xff09;的意思分别是大于或等于…

向QAbstractItemView子类如:QTreeView、QTableView等子项单元格插入窗体小部件的功能实现(第3种方法)

1.前言工作中经常会遇到这样的需求&#xff1a;向QAbstractItemView子类如QTreeView、QTableView单元格插入窗体小部件&#xff0c;如&#xff1a;进度条、按钮、单行编辑框等。下面链接的系列博文就是讲解如何实现该功能的。《向QAbstractItemView子类如:QTreeView、QTableVie…

Java 23种设计模式(9.结构型模式-外观模式)

结构型模式-外观模式 代码详解 类图 代码 public class SubOne {public void method1(){System.out.println("method1");} }public class SubTwo {public void method2(){System.out.println("method2");} }public class SubThree {public void method3(…

VSCode配置C/C++环境

(1).配置编译器 接下来配置编译器路径&#xff0c;按快捷键CtrlShiftP调出命令面板&#xff0c;输入C/C&#xff0c;选择“Edit Configurations(UI)”进入配置。这里配置两个选项&#xff1a; - 编译器路径&#xff1a;D:/mingw-w64/x86_64-8.1.0-win32-seh-rt_v6-rev0/mingw64…

3D创作元素将入住下一代Windows 10和HoloLens中

新 Windows 10 将会带来崭新的 3D 特性&#xff0c;任何用户都可以通过内置的工具来制作发布有关「3D、增强现实 AR 和混合现实 (mixed reality) 的游戏和素材」。 北京时间 10 月 26 号晚 10 点&#xff0c;微软在纽约召开的新品发布会如期而至。会上微软发布了大家期待已久的…

【Linux】进程的概念 | 进程控制块 PCB | task_struct

&#x1f923; 爆笑教程 &#x1f449; 《看表情包学Linux》&#x1f448; 猛戳订阅 &#x1f525; &#x1f4ad; 写在前面&#xff1a;本章我们将带着大家深入理解 "进程" 的概念&#xff0c;"进程" 这个概念其实使我们一直在接触的东西&#xff0c;只不…

浅谈操作系统

操作系统是一组主管并控制计算机操作、运用和运行硬件、软件资源和提供公共服务来组织用户交互的相互关联的系统软件程序。根据运行的环境&#xff0c;操作系统可以分为桌面操作系统&#xff0c;手机操作系统&#xff0c;服务器操作系统&#xff0c;嵌入式操作系统等。 通俗来…

Three.js坐标系与变换矩阵快速入门

很多东西汇集在一起构成一个美丽的 3D 场景&#xff0c;例如光照、材质、模型、纹理、相机设置、后期处理、粒子效果、交互性等等&#xff0c;但无论我们创建什么样的场景&#xff0c;没有比这更多的了 比组成它的乐曲的排列和运动更重要。 要创建建筑效果图&#xff0c;我们必…

CDH数仓项目(一) —— CDH安装部署搭建详细流程

0 说明 本文以CDH搭建数据仓库&#xff0c;基于三台阿里云服务器从零开始搭建CDH集群&#xff0c;节点配置信息如下&#xff1a; 节点内存安装服务角色chen10216Gcloudera-scm-serverchen1038Gcloudera-scm-agentchen1048Gcloudera-scm-agent 上传需要用到的安装包&#xff1…

[NPUCTF2020]ezinclude

目录 前提知识 信息收集 解题思路 前提知识 PHP LFI 利用临时文件 Getshell 姿势-安全客 - 安全资讯平台 PHP LFI 利用临时文件Get shell php7 Segment Fault&#xff08;7.0.0 < PHP Version < 7.0.28&#xff09; PHPINFO特性 信息收集 查看源码 <!--md5($secr…

【进阶】Spring MVC程序开发

努力经营当下&#xff0c;直至未来明朗&#xff01; 文章目录一、Spring MVC概述1. MVC定义2. MVC和Spring MVC的关系二、为什么要学Spring MVC三、如何学Spring MVC一&#xff09; 实现用户和程序的映射方法1&#xff1a;路由RequestMapping(“/xxx”)方法2&#xff1a; 使用P…