Understanding Flink

news2025/1/19 7:59:45
  1. Flink 下载:
mkdir ~/flink && cd ~/flink

wget --no-check-certificate https://archive.apache.org/dist/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz 
wget --no-check-certificate https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar
wget --no-check-certificate https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.3/flink-connector-jdbc-1.15.3.jar
wget --no-check-certificate https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar

tar -xf flink-1.15.3-bin-scala_2.12.tgz
cp *.jar flink-1.15.3/lib/
cd flink-1.15.3
chmod -R 777 ./bin/*
./bin/start-cluster.sh
echo http://`hostname -i`:8081/
./bin/sql-client.sh

  1. 对 Flink 的理解

Flink 有 cdc 的 connector,有 jdbc 的 connector。其中:

  • 基于日志的CDC:cdc connector 用于做实时同步,jdbc 则用于数据写入。
    假设源端使用的是 cdc,那么一个 Flink SQL Job insert into target select from cdc_table 会一直在后台执行,监听数据的变化,并根据变化做计算。
  • 基于查询的CDC:假设源端使用的是 jdbc connector,那么 Flink SQL 会立即执行,读取源端的全部数据并做计算,然后 Job 退出。源端有新的插入,也不会做任何同步操作,因为 Job 已经结束。

// https://cloud.tencent.com/developer/article/2193358

  1. Flink 的市场定位

方案一、Debezium+Kafka+计算程序+存储系统
采用Debezium订阅MySql的Binlog传输到Kafka,后端是由计算程序从kafka里面进行消费,最后将数据写入到其他存储。

在这里插入图片描述

方案二、Debezium + Kafka + Flink Sql+存储系统
Flink Sql具备解析 Kafka 中 debezium-json 和 canal-json 格式的 binlog 能力,具体的框架如下

在这里插入图片描述

方案三、Flink CDC + JDBC Connector
方案一与方案二的相同点是组件维护复杂,Flink 1.11中CDC Connectors内置了 Debezium 引擎,可以替换 Debeziuum+Kafka.
在这里插入图片描述

总结:Flink 的市场定位就是干掉所有传输通道上的人。

通过Flink CDC Connector替换Debezium+Kafka的数据采集模块,实现 Flink Sql 采集+计算+传输(ETL)一体化。优点如下

  • 开箱即用,容易上手
  • 减少维护的组件,简化实时链路,减轻部署成本
  • 减少端到端延迟
  • Flink 自身支持Exactly Once的读取计算
  • 数据不落地,减少存储成本
  • 支持全量和增量流式读取
  • binlog采集位点可回溯

最后,Flink 得到下面这样一个架构图:

在这里插入图片描述

还有一点薄纱要翻开:Flink SQL 支持什么呢?

  • CTE
WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;
  • SELECT & WHERE
SELECT price + tax FROM Orders WHERE id = 10
  • SELECT DISTINCT
SELECT DISTINCT id FROM Orders
  • Windowing table-valued functions (Windowing TVFs)
    Apache Flink provides 3 built-in windowing TVFs: TUMBLE, HOP and CUMULATE. 利用这些 table function,可以把原始表的数据进行分组/扩行。例如下面的例子,用 TUMBLE 把 6 行数据按照 10 分钟的间隔分成了 2 组,然后基于这些组就能做进一步的聚合分析。

Flink 支持等间隔窗口(TUMBLE)、滑动窗口(HOP)、累积窗口(CUMULATE),可以根据实际业务场景选用。比如:

  • 实时统计每个小时的销量,用 TUMBLE 就比较合适,按小时划分间隔。
  • 实时统计最近 60 分钟的销量,则使用 HOP(60min)比较合适,每分钟更新一次。
Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
|          bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 |  4.00 | C    |
| 2020-04-15 08:07 |  2.00 | A    |
| 2020-04-15 08:09 |  5.00 | D    |
| 2020-04-15 08:11 |  3.00 | B    |
| 2020-04-15 08:13 |  1.00 | E    |
| 2020-04-15 08:17 |  6.00 | F    |
+------------------+-------+------+

Flink SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
  • Window Aggregate
    这就是基于上面的 TVF 的应用。上面 Windowing table-valued functions 提供了数据输入,基于这些数据输入做聚合,就能得到一些统计信息。
SELECT ...
FROM <windowed_table> -- relation applied windowing TVF
GROUP BY window_start, window_end, ...


-- hopping window aggregation
Flink SQL> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00  |
+------------------+------------------+-------+

  • GROUPING SET

基于 Window 做一些更高级的窗口函数,按照多种分组方式统计数据。

Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
+------------------+------------------+-------------+-------+
|     window_start |       window_end | supplier_id | price |
+------------------+------------------+-------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) | 11.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |  5.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |  6.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |  9.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |  1.00 |
+------------------+------------------+-------------+-------+
  • ROLLUP、CUBE
    这些都是 GROUPING SET 的简写,把所有的组合都枚举出来。计算不要钱么?坑爹! ▄█▀█●

  • GROUP
    这里就是各种分组聚合的语法,包括一般聚合函数,DISTINCT,GROUP BY、GROUPING SET、CUBE、ROLLUP、HAVING 等。

SELECT COUNT(*) FROM Orders; //  COUNT, SUM, AVG (average), MAX (maximum) and MIN (minimum) 

SELECT COUNT(DISTINCT order_id) FROM Orders;

SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ());

SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50;
  • OVER () 标准窗口函数
SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders
  • INNER Equi-JOIN
    不是在 流上 join,而是和所有数据做 join,过去的、现在的。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
  • OUTER Equi-JOIN
    不是在 流上 join,而是和所有数据做 join,过去的、现在的。
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id
  • Temporal Joins
SELECT 
     order_id,
     price,
     orders.currency,
     conversion_rate,
     order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

等等。对于 Flink 来说,Join 是它的重头戏。

Flink 也支持常见的 funciton,字符串的,算术运算的,比较的等等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1587912.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP+MySQL组合开发 易企秀H5场景源码系统 带完整的安装代码包以及搭建教程

在数字化时代&#xff0c;企业对于宣传与推广的需求日益增长&#xff0c;而H5页面作为一种轻量级、跨平台的宣传方式&#xff0c;深受企业青睐。为了满足企业对于H5页面制作的需求&#xff0c;我们基于PHPMySQL组合开发了一套易企秀H5场景源码系统&#xff0c;并提供了完整的安…

PTA(题目集一 题目 代码 C++ 注解)

目录 题目一&#xff1a; 代码&#xff1a; 题目二&#xff1a; 代码&#xff1a; 题目三&#xff1a; 代码&#xff1a; 题目四&#xff1a; 代码&#xff1a; 题目五&#xff1a; 代码&#xff1a; 题目六&#xff1a; 代码&#xff1a; 题目七&#xff1a; 代…

VM虚拟机Linux系统Redhat7.4版本进行网络配置

日常中自己搭建的虚拟机一般用到两种网络方式&#xff0c;第一种是仅主机模式、还有一种是NAT模式。 1、仅主机模式&#xff1a;可以和自己本地电脑&#xff0c;或者虚拟机和虚拟机之间进行网络通信&#xff0c;相当于一个局域网&#xff0c;是不能连接外网的。 2、NAT模式&a…

Leetcode刷题之消失的数字(C语言版)

Leetcode刷题之消失的数字&#xff08;C语言版&#xff09; 一、题目描述二、题目解析 一、题目描述 数组nums包含从0到n的所有整数&#xff0c;但其中缺了一个。请编写代码找出那个缺失的整数。你有办法在O(n)时间内完成吗&#xff1f; 注意&#xff1a;本题相对书上原题稍作…

BIM信息如何整合到可视化大屏中,告诉你步骤。

BIM&#xff08;Building Information Modeling&#xff09;是一种数字化建筑信息模型技术&#xff0c;可以将建筑物的设计、施工和运营过程进行集成和管理。将BIM整合到可视化大屏中可以提供更直观、全面的建筑信息展示和分析。 BIM&#xff08;建筑信息模型&#xff09;可以通…

SFP光模块和媒体转换器的区别

SFP光模块和媒体转换器都是光电转换设备。它们是否可以互换使用&#xff1f;它们之间有什么区别&#xff1f; SFP光模块与媒体转换器&#xff1a;它们是什么&#xff1f; SFP模块是一种可热插拔的光模块&#xff0c;用于连接网络交换机。它可以将电信号转换为光信号&#xff…

Doris 内网安装部署,基于 CentOS 7

实测 CentOS 7.6 和 7.9都可用&#xff0c;CentOS安装包为&#xff1a;标准安装盘DVD版&#xff0c;如果系统安装的是精简版&#xff0c;需要挂载DVD版或者自行下载依赖。 参考文档 快速开始 - Apache Doris Doris 下载地址&#xff1a;2.1.1 ( Latest ) -> x64 ( avx2 )…

spring.rabbitmq.listener.simple.default-requeue-rejected = false 和放入死信队列的区别

目录 一、场景 二、使用 spring.rabbitmq.listener.simple.default-requeue-rejected false 2.1 特点 三、 放入死信队列 四、两种区别 一、场景 当我们使用RabbitMq的时候&#xff0c;我们如果业务中有异常&#xff0c;很有可能造成死循环&#xff0c;因为 在RabbitMQ和…

TSINGSEE青犀边缘计算AI智能分析网关V4客流统计算法的配置步骤及使用

TSINGSEE青犀AI智能分析网关V4内置了近40种AI算法模型&#xff0c;支持对接入的视频图像进行人、车、物、行为、烟火等实时检测分析&#xff0c;上报识别结果&#xff0c;并能进行语音告警播放。硬件支持RTSP、GB28181协议、以及厂家私有协议接入&#xff0c;可兼容市面上常见的…

Netty学习——实战篇1 BIO、NIO入门demo 备注

1 BIO 实战代码 Slf4j public class BIOServer {public static void main(String[] args) throws IOException {//1 创建线程池ExecutorService threadPool Executors.newCachedThreadPool();//2 创建ServerSocketServerSocket serverSocket new ServerSocket(8000);log.in…

java下载网络上的文件、图片保存到本地 FileUtils

java下载网络上的文件、图片保存到本地 FileUtils 1. 引入FileUtils依赖2. 实现代码3. 输出结果 1. 引入FileUtils依赖 <!--FileUtils依赖--> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency><groupId>commons-io&l…

(Java)数据结构——图(第九节)AOV网以及拓扑排序

前言 本博客是博主用于复习数据结构以及算法的博客&#xff0c;如果疏忽出现错误&#xff0c;还望各位指正。 AOV网 先前我们了解了有向无环图DAG的概念。 所有的工程或者某种流程可以分为若干个小的工程或者阶段&#xff0c;这些小的工程或者阶段就称为活动。若以图中的顶…

IPV6的相关网络问题

问题 ​​​​​​​ 目录 问题 一.什么是NAT64转换 1.NAT64的工作原理 IPv6到IPv4转换 IPv4到IPv6的响应转换 2.NAT64的优点 3.NAT64的缺点 二.NAT64转换如何实现 1.工作原理 2.实现步骤 DNS查询转换&#xff08;DNS64&#xff09; 地址转换&#xff08;NAT64&a…

ECharts的时间轴样式设置

timeline: {orient: vertical,axisType: category,autoPlay: false,inverse: true,right: 0,top: 5,bottom: 5,width: 100,realtime : true,symbolSize: 3,itemStyle: { // 轴默认样式color : #000000},checkpointStyle: { // 拖动按钮样式borderWidth: 0,width: 5,color: #7f8…

如何正确使用数字化仪前端信号调理?(一)

一、前言 板卡式的数字转换器和类似测量仪器&#xff0c;比如图1所示的德思特TS-M4i系列&#xff0c;都需要为各种各样的特性信号与内部模数转换器&#xff08;ADC&#xff09;的固定输入范围做匹配。 图1&#xff1a;德思特TS-M4i系列高速数字化仪&#xff0c;包括2或4通道版…

Nacos-默认token.secret.key-配置不当权限绕过漏洞复现

漏洞描述&#xff1a; Nacos 身份认证绕过漏洞(QVD-2023-6271)&#xff0c;开源服务管理平台 Nacos在默认配置下未对 token.secret.key 进行修改&#xff0c;导致远程攻击者可以绕过密钥认证进入后台&#xff0c;造成系统受控等后果。 漏洞信息 公开时间&#xff1a;2023-03…

很难不爱啊!颠覆认知的13个Edge神级插件

::: block-1 “时问桫椤”是一个致力于为本科生到研究生教育阶段提供帮助的不太正式的公众号。我们旨在在大家感到困惑、痛苦或面临困难时伸出援手。通过总结广大研究生的经验&#xff0c;帮助大家尽早适应研究生生活&#xff0c;尽快了解科研的本质。祝一切顺利&#xff01;—…

[Algorithm][双指针][有效三角形的个数]详细解读 + 代码实现

题目链接优化&#xff1a;对整个数组排序&#xff0c;可以简化比较模型&#xff0c;减少比较次数在有序的情况下&#xff0c;只需较⼩的两条边之和⼤于第三边即可设最⻓边枚举到max位置&#xff0c;区间[left, right]是max位置左边的区间(也就是⽐它⼩的区间) if (nums[left] …

前端React笔记(尚硅谷)

react 尚硅谷react教程 jsx语法规则 1.定义虚拟dom时不加引号&#xff08;不是字符串&#xff09; 2.标签中混入js表达式时要用{} js表达式与js语句不同。 js语句是if&#xff08;&#xff09;&#xff0c;for&#xff08;&#xff09;&#xff0c;switch&#xff08;&#x…

花趣短视频源码淘宝客系统全开源版带直播带货带自营商城流量主小游戏

首页设计 仿抖音短视频&#xff1a;采用短视频流的形式展示内容&#xff0c;用户可浏览、点赞、评论和分享短视频。关注与我的&#xff1a;提供用户关注列表和个人中心入口&#xff0c;方便用户管理关注对象和查看个人信息。本地直播&#xff1a;集成直播功能&#xff0c;支持…