记录使用FlinkSql进行实时工作流开发

news2025/1/11 5:45:04

使用FlinkSql进行实时工作流开发

  • 引言
  • Flink SQL实战
    • 常用的Connector
      • 1. MySQL-CDC 连接器配置
      • 2. Kafka 连接器配置
      • 3. JDBC 连接器配置
      • 4. RabbitMQ 连接器配置
      • 5. REST Lookup 连接器配置
      • 6. HDFS 连接器配置
    • FlinkSql数据类型
      • 1. 基本数据类型
      • 2. 字符串数据类型
      • 3. 日期和时间数据类型
      • 4. 复杂数据类型
      • 5. 特殊数据类型
      • 数据类型的使用示例

引言

在这里插入图片描述

在大数据时代,实时数据分析和处理变得越来越重要。Apache Flink,作为流处理领域的佼佼者,提供了一套强大的工具集来处理无界和有界数据流。其中,Flink SQL是其生态系统中一个重要的组成部分,允许用户以SQL语句的形式执行复杂的数据流操作,极大地简化了实时数据处理的开发流程。

什么是Apache Flink?

Apache Flink是一个开源框架,用于处理无边界(无尽)和有边界(有限)数据流。它提供了低延迟、高吞吐量和状态一致性,使开发者能够构建复杂的实时应用和微服务。Flink的核心是流处理引擎,它支持事件时间处理、窗口操作以及精确一次的状态一致性。

为什么选择Flink SQL?

易用性:Flink SQL使得非专业程序员也能快速上手,使用熟悉的SQL语法进行实时数据查询和处理。
灵活性:可以无缝地将SQL与Java/Scala API结合使用,为用户提供多种编程模型的选择。
性能:利用Flink的高性能流处理引擎,Flink SQL能够实现实时响应和低延迟处理。
集成能力:支持多种数据源和数据接收器,如Kafka、JDBC、HDFS等,易于集成到现有的数据生态系统中。

Flink SQL实战

常用的Connector

在配置FlinkSQL实时开发时,使用mysql-cdc、Kafka、jdbc和rabbitmq作为连接器是一个很常见的场景。以下是详细的配置说明,你可以基于这些信息来撰写你的博客:

1. MySQL-CDC 连接器配置

MySQL-CDC(Change Data Capture)连接器用于捕获MySQL数据库中的变更数据。配置示例如下:

CREATE TABLE mysql_table (
    -- 定义表结构
    id INT,
    name STRING,
    -- 其他列
) WITH (
    'connector' = 'mysql-cdc',  			-- 使用mysql-cdc连接器
    'hostname' = 'mysql-host',  			-- MySQL服务器主机名
    'port' = '3306',            			-- MySQL端口号
    'username' = 'user',        			-- MySQL用户名
    'password' = 'password',    			-- MySQL密码
    'database-name' = 'db',     			-- 数据库名
    'table-name' = 'table'      			-- 表名
  	'server-time-zone' = 'GMT+8',           -- 服务器时区
    'debezium.snapshot.mode' = 'initial',  	-- 初始快照模式,initial表示从头开始读取所有数据;latest-offset表示从最近的偏移量开始读取;timestamp则可以指定一个时间戳,从该时间戳之后的数据开始读取。
    'scan.incremental.snapshot.enabled' = 'true'	-- 可选,设置为true时,Flink会尝试维护一个数据库表的增量快照。这意味着Flink不会每次都重新读取整个表,而是只读取自上次读取以来发生变化的数据。这样可以显著提高读取效率,尤其是在处理大量数据且频繁更新的场景下。
    'scan.incremental.snapshot.chunk.size' = '1024'  -- 可选, 增量快照块大小
    'debezium.snapshot.locking.mode' = 'none', 		 -- 可选,控制在快照阶段锁定表的方式,以防止数据冲突。none表示不锁定,lock-tables表示锁定整个表,transaction表示使用事务来锁定。
    'debezium.properties.include-schema-changes' = 'true',  -- 可选,如果设置为true,则在CDC事件中会包含模式变更信息。
    'debezium.properties.table.whitelist' = 'mydatabase.mytable',  -- 可选,指定要监控的表的白名单。如果table-name未设置,可以通过这个属性来指定。
   	'debezium.properties.database.history' = 'io.debezium.relational.history.FileDatabaseHistory'  -- 可选,设置数据库历史记录的实现类,通常使用FileDatabaseHistory来保存历史记录,以便在重启后能恢复状态。
);

2. Kafka 连接器配置

Kafka连接器用于读写Kafka主题中的数据。配置示例如下:

CREATE TABLE kafka_table (
    -- 定义表结构
    id INT,
    name STRING,
    -- 其他列
) WITH (
    'connector' = 'kafka',      -- 使用kafka连接器
    'topic' = 'topic_name',     -- Kafka主题名
    'properties.bootstrap.servers' = 'kafka-broker:9092',  -- Kafka服务器地址
    'format' = 'json'           -- 数据格式,例如json
    'properties.group.id' = 'flink-consumer-group',  -- 消费者组ID
    'scan.startup.mode' = 'earliest-offset',  -- 启动模式(earliest-offset, latest-offset, specific-offset, timestamp)
    'format' = 'json',  -- 数据格式
    'json.fail-on-missing-field' = 'false',  -- 是否在字段缺失时失败
    'json.ignore-parse-errors' = 'true',     -- 是否忽略解析错误
    'properties.security.protocol' = 'SASL_SSL', -- 安全协议(可选)
    'properties.sasl.mechanism' = 'PLAIN',       -- SASL机制(可选)
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'  -- SASL配置(可选)
);

3. JDBC 连接器配置

JDBC连接器用于与其他关系型数据库进行交互。配置示例如下:

CREATE TABLE jdbc_table (
    -- 定义表结构
    id INT,
    name STRING,
    -- 其他列
) WITH (
    'connector' = 'jdbc',       -- 使用jdbc连接器
    'url' = 'jdbc:mysql://mysql-host:3306/db',  -- JDBC连接URL
    'table-name' = 'table_name', -- 数据库表名
    'username' = 'user',        -- 数据库用户名
    'password' = 'password'     -- 数据库密码
    'driver' = 'com.mysql.cj.jdbc.Driver',   -- JDBC驱动类
    'lookup.cache.max-rows' = '5000',        -- 可选,查找缓存的最大行数
    'lookup.cache.ttl' = '10min',            -- 可选,查找缓存的TTL(时间到期)
    'lookup.max-retries' = '3',              -- 可选,查找的最大重试次数
    'sink.buffer-flush.max-rows' = '1000',   -- 可选,缓冲区刷新最大行数
    'sink.buffer-flush.interval' = '2s'      -- 可选,缓冲区刷新间隔
);

4. RabbitMQ 连接器配置

RabbitMQ连接器用于与RabbitMQ消息队列进行交互。配置示例如下:

CREATE TABLE rabbitmq_table (
    -- 定义表结构
    id INT,
    name STRING,
    -- 其他列
) WITH (
    'connector' = 'rabbitmq',   -- 使用rabbitmq连接器
    'host' = 'rabbitmq-host',   -- RabbitMQ主机名
    'port' = '5672',            -- RabbitMQ端口号
    'username' = 'user',        -- RabbitMQ用户名
    'password' = 'password',    -- RabbitMQ密码
    'queue' = 'queue_name',     -- RabbitMQ队列名
    'exchange' = 'exchange_name' -- RabbitMQ交换机名
    'routing-key' = 'routing_key',   -- 路由键
    'delivery-mode' = '2',           -- 投递模式(2表示持久)
    'format' = 'json',               -- 数据格式
    'json.fail-on-missing-field' = 'false',  -- 是否在字段缺失时失败
    'json.ignore-parse-errors' = 'true'      -- 是否忽略解析错误
);

5. REST Lookup 连接器配置

REST Lookup 连接器允许在 SQL 查询过程中,通过 REST API 进行查找操作。

CREATE TABLE rest_table (
    id INT,
    name STRING,
    price DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'rest-lookup',
    'url' = 'http://api.example.com/user/{id}',  -- REST API URL,使用占位符 {product_id}
    'lookup-method' = 'POST'	-- 'GET' 或 'POST'
    'format' = 'json',  -- 数据格式
    'asyncPolling' = 'false'	-- 可选,指定查找操作是否使用异步轮询模式。默认值为 'false'。当设置为 'true' 时,查找操作会以异步方式执行,有助于提高性能。
    'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'	-- 可选,设置 Content-Type 请求头。用于指定请求体的媒体类型。例如,设置为 application/json 表示请求体是 JSON 格式。
    'gid.connector.http.source.lookup.header.Origin' = '*'	-- 可选,设置 Origin 请求头。通常用于跨域请求。
    'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff'	-- 可选,设置 X-Content-Type-Options 请求头。用于防止 MIME 类型混淆攻击。
	'json.fail-on-missing-field' = 'false',  -- 可选,是否在字段缺失时失败
	'json.ignore-parse-errors' = 'true'  -- 可选,是否忽略解析错误
    'lookup.cache.max-rows' = '5000',  -- 可选,查找缓存的最大行数
    'lookup.cache.ttl' = '10min',  -- 可选,查找缓存的TTL(时间到期)
    'lookup.max-retries' = '3'  -- 可选,查找的最大重试次数
);

6. HDFS 连接器配置

HDFS connector用于读取或写入Hadoop分布式文件系统中的数据。

创建HDFS Source

CREATE TABLE hdfsSource (
   line STRING
) WITH (
   'connector' = 'filesystem',
   'path' = 'hdfs://localhost:9000/data/input',		-- HDFS上的路径。
   'format' = 'csv'		-- 文件格式。
);

创建HDFS Sink

CREATE TABLE hdfsSink (
   line STRING
) WITH (
   'connector' = 'filesystem',
   'path' = 'hdfs://localhost:9000/data/output',
   'format' = 'csv'
);

FlinkSql数据类型

在FlinkSQL中,数据类型的选择和定义是非常重要的,因为它们直接影响数据的存储和处理方式。FlinkSQL提供了多种数据类型,可以满足各种业务需求。以下是FlinkSQL中的常见数据类型及其详细介绍:

1. 基本数据类型

  • BOOLEAN: 布尔类型,表示TRUEFALSE

    CREATE TABLE example_table (
        is_active BOOLEAN
    );
    
  • TINYINT: 8位带符号整数,范围是-128127

    CREATE TABLE example_table (
        tiny_value TINYINT
    );
    
  • SMALLINT: 16位带符号整数,范围是-3276832767

    CREATE TABLE example_table (
        small_value SMALLINT
    );
    
  • INT: 32位带符号整数,范围是-21474836482147483647

    CREATE TABLE example_table (
        int_value INT
    );
    
  • BIGINT: 64位带符号整数,范围是-92233720368547758089223372036854775807

    CREATE TABLE example_table (
        big_value BIGINT
    );
    
  • FLOAT: 单精度浮点数。

    CREATE TABLE example_table (
        float_value FLOAT
    );
    
  • DOUBLE: 双精度浮点数。

    CREATE TABLE example_table (
        double_value DOUBLE
    );
    
  • DECIMAL(p, s): 精确数值类型,p表示总精度,s表示小数位数。

    CREATE TABLE example_table (
        decimal_value DECIMAL(10, 2)
    );
    

2. 字符串数据类型

  • CHAR(n): 定长字符串,n表示字符串的长度。

    CREATE TABLE example_table (
        char_value CHAR(10)
    );
    
  • VARCHAR(n): 可变长字符串,n表示最大长度。

    CREATE TABLE example_table (
        varchar_value VARCHAR(255)
    );
    
  • STRING: 可变长字符串,无长度限制。

    CREATE TABLE example_table (
        string_value STRING
    );
    

3. 日期和时间数据类型

  • DATE: 日期类型,格式为YYYY-MM-DD

    CREATE TABLE example_table (
        date_value DATE
    );
    
  • TIME§: 时间类型,格式为HH:MM:SSp表示秒的小数位精度。

    CREATE TABLE example_table (
        time_value TIME(3)
    );
    
  • TIMESTAMP§: 时间戳类型,格式为YYYY-MM-DD HH:MM:SS.sssp表示秒的小数位精度。

    CREATE TABLE example_table (
        timestamp_value TIMESTAMP(3)
    );
    
  • TIMESTAMP§ WITH LOCAL TIME ZONE: 带有本地时区的时间戳类型。

    CREATE TABLE example_table (
        local_timestamp_value TIMESTAMP(3) WITH LOCAL TIME ZONE
    );
    

4. 复杂数据类型

  • ARRAY: 数组类型,T表示数组中的元素类型。

    CREATE TABLE example_table (
        array_value ARRAY<INT>
    );
    
  • MAP<K, V>: 键值对映射类型,K表示键的类型,V表示值的类型。

    CREATE TABLE example_table (
        map_value MAP<STRING, INT>
    );
    
  • ROW<…>: 行类型,可以包含多个字段,每个字段可以有不同的类型。

    CREATE TABLE example_table (
        row_value ROW<name STRING, age INT>
    );
    

5. 特殊数据类型

  • BINARY(n): 定长字节数组,n表示长度。

    CREATE TABLE example_table (
        binary_value BINARY(10)
    );
    
  • VARBINARY(n): 可变长字节数组,n表示最大长度。

    CREATE TABLE example_table (
        varbinary_value VARBINARY(255)
    );
    

数据类型的使用示例

以下是一个包含各种数据类型的表的定义示例:

CREATE TABLE example_table (
    id INT,
    name STRING,
    is_active BOOLEAN,
    salary DECIMAL(10, 2),
    birth_date DATE,
    join_time TIMESTAMP(3),
    preferences ARRAY<STRING>,
    attributes MAP<STRING, STRING>,
    address ROW<street STRING, city STRING, zip INT>
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1969173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring 如何集成日志框架

文章目录 一、日志依赖1.1 Spring 如何集成日志1.2 SpringBoot 的默认日志门面和日志系统 二、日志配置2.1 SpringBoot 日志配置方式2.2 SpringBoot 日志重定向到文件 参考资料 一、日志依赖 1.1 Spring 如何集成日志 从Spring Framework 5.0开始&#xff0c;Spring 在 sprin…

淘宝的商品信息缓存体系是如何构建的?

0 前言 在电商系统中&#xff0c;商品信息的快速获取对用户体验至关重要。本文将详细讲解一个多层级的商品信息缓存体系&#xff0c;旨在提高系统性能和可靠性。 开局一张图&#xff0c;剩下全靠编&#xff01; 1 整体架构 该缓存体系采用了多级缓存策略&#xff0c;从前端到…

influxDB的常用命令

目录 1.查看数据库命令 2.进入某数据库命令 3.创建表的命令 (host 和region 字段是必须的) 4.显示所有的表命令 5. 删除表 6.查询表数据 7.显示数据库用户 8.创建用户 9.创建管理员用户 10.修改密码(密码用单引号括住&#xff0c;不要用双引号) 11. 分配数据库访问权…

满客宝后台管理系统 downloadWebFile 任意文件读取漏洞复现(XVE-2024-18926)

0x01 产品简介 满客宝后台管理系统由正奇晟业&#xff08;北京&#xff09;科技有限公司开发&#xff0c;满客宝智慧食堂系统的重要组成部分&#xff0c;它为餐饮管理者提供了一个全面的、智能化的管理平台。该系统集成了用户管理、消费限制、菜谱管理、卡务管理、进销存管理、…

Linux进程间通信1

文章目录 前言管道命名管道 / FIFO消息队列 前言 进程之间可能会存在特定的协同工作的场景&#xff0c;而协同就必须要进行进程间通信&#xff0c;协同工作可能有以下场景。 数据传输:一个进程需要将它的数据发送给另一个进程 资源共享:多个进程之间共享同样的资源。 通知事件…

【TDH社区版大事件】图分析、全文检索、小文件治理、数据开发工具通通都有!

星环科技大数据基础平台TDH社区版&#xff0c;在保留了商业版核心技术优势的基础上最大程度地降低了用户使用大数据技术的门槛与成本&#xff0c;具有更轻量、更简单、更易用等特性。 此次TDH社区开发版、社区版、社区订阅版均发布了新版本&#xff0c;带来新的产品组件和新的…

我是如何给阿里大神Tree工具类做CodeReview并优化的

首发公众号&#xff1a;赵侠客 引言 前段时间我写了一篇关于树操作的工具类《解密阿里大神写的天书般的Tree工具类&#xff0c;轻松搞定树结构&#xff01;》&#xff0c;当时主要把精力集中在分析代码的实现层面&#xff0c;没有从设计层面、性能层考虑&#xff0c;然后就被很…

Linux网络之多路转接——老派的select

目录 一、高级IO 1.1 概念 1.2 五种IO模型 1.3 小结 二、多路转接的老派 2.1 select 的作用 2.2 select 的接口 三、select 的编写 3.1 类的预先准备 3.2 类的整体框架 3.3 类的执行 Loop 四、Loop 中的回调函数 4.1 HandlerEvent 4.2 AcceptClient 4.3 Service…

二、4 函数的递归与迭代

1、n 的阶乘 2、斐波那契数列 &#xff08;1&#xff09;递归 用递归解决这个问题&#xff0c;由于需要多次重复计算&#xff0c;当 n 较大时&#xff0c;计算效率就非常慢 &#xff08;2&#xff09;迭代&#xff08;速度更快&#xff09;

C# 下⽀持表达式树的框架类型详解与示例

文章目录 什么是表达式树&#xff1f;表达式树的主要用途包括&#xff1a;表达式树节点类型示例&#xff1a;创建一个简单的加法表达式树示例&#xff1a;使用表达式树进行数据绑定示例&#xff1a;动态生成代码总结 在C#中&#xff0c;表达式树&#xff08;Expression Tree&am…

首届「中国可观测日」圆满落幕

首届中国可观测日&#xff08;Observability Day&#xff09;在上海圆满落幕&#xff0c;为监控观测领域带来了一场技术盛宴。作为技术交流的重要平台&#xff0c;此次活动不仅促进了观测云与亚马逊云科技之间的深化合作&#xff0c;更标志着双方共同推动行业发展的重要里程碑。…

红蓝绿三巨头集体拉胯,NVIDIA新显卡被核显秒了

最近蓝厂 intel 的瓜想必大家都已经吃上了吧&#xff1f;13-14 代中高端 CPU 大面积故障崩溃事件。 后续是 intel 官方回应&#xff0c;系微代码错误&#xff0c;请求电压较高导致的。 intel 目前给出的方案是&#xff0c;出现了问题的 CPU&#xff0c;intel 给予免费换新售后…

微信小程序之单选框

微信小程序中的单选框&#xff08;Radio&#xff09;是一个常用的输入组件&#xff0c;用于在多个选项中进行选择。常见的应用场景有性别选择、选项过滤、问卷调查等。本文将介绍小程序中单选框的特点和作用及相应示例。 一、单选框的特点和作用 特点&#xff1a; 单一选择&a…

php yii2 foreach中使用事务,事务中使用了 continue

问题描述&#xff1a;使用yii2&#xff0c;在foreach中使用事务&#xff0c;每个循环一个事务&#xff0c;在事务进行判断,然后直接continue,导致后面的循环数据没有保存成功 如下图&#xff1a; 修改后&#xff1a;如下图

【人工智能学习之商品检测实战】

【人工智能学习之商品检测实战】 1 开发过程2 网络训练效果2.1 分割网络2.2 特征网络 3 跟踪与后处理4 特征库优化5 项目源码解析5.1 yolo训练train_yolo.pygood_net.pydataset.pygood_cls_data.pysave_feature.pyanalyse_good.pyshop_window.pytest.py 6 结语 1 开发过程 拍摄…

Spring boot 整合influxdb2

一.服务安装 docker search influxdb docker pull influxdb docker run -dit --name influxdb --restart always -p 8086:8086 -v /dp/docker/file/influxdb:/var/lib/influxdb influxdb 访问8086 初始化 账号组织和新建bucket 创建密钥 这些豆记录下来 二.项目配置 引入依赖…

什么是物流锁控,RFID物流智能锁对于物流锁控有什么意义

在当今竞争激烈的全球商业环境中&#xff0c;物流行业作为经济发展的重要支撑&#xff0c;其高效、安全的运作至关重要。物流锁控作为保障物流运输过程中货物安全、准确和及时交付的关键环节&#xff0c;正面临着日益复杂的挑战。 一、物流锁控的定义与范畴 物流锁控&#xf…

JavaScript学习笔记(十一):JS Browser BOM

1、JavaScript Window - 浏览器对象模型 浏览器对象模型&#xff08;Browser Object Model (BOM)&#xff09;允许 JavaScript 与浏览器对话。 1.1 浏览器对象模型&#xff08;Browser Object Model (BOM)&#xff09; 不存在浏览器对象模型&#xff08;BOM&#xff09;的官方…

【周易哲学】生辰八字入门讲解(一)

&#x1f60a;你好&#xff0c;我是小航&#xff0c;一个正在变秃、变强的文艺倾年。 &#x1f514;本文讲解【周易哲学】生辰八字入门讲解&#xff0c;期待与你一同探索、学习、进步&#xff0c;一起卷起来叭&#xff01; 目录 生辰八字阴阳五行天干地支天干天干天干五合天干…