flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

news2024/12/28 20:24:42

flink sql 实战实例 及延伸问题

  • Flink SQL 计算用户分布
  • Flink SQL 计算 DAU
  • 多topic 数据更新mysql topic接入mysql
    • 引入 upsert-kafka-connector 以1.14.4版本为例
  • 数据倾斜问题:
  • 让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?
  • 多维高阶聚合
  • FlinkSql Upsert 与 Primary Key
  • flinksql Hive 流批一体
    • Streaming Sink
    • Streaming Source
    • Hive Dialect
    • Filesystem Connector

Flink SQL 计算用户分布

⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。

- 如果需要可以打开 minibatch
select  
    level
    , count(1) as uv
    , max(time) as time
from (
    select 
        uid
        , level
        , time
        , row_number() over (partition by uid order by time desc) rn 
    from source
) tmp
where rn =1 
group by 
    level

Flink SQL 计算 DAU

⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。

SELECT  
    window_start
    , window_end
    , platform
    , sum(bucket_dau) as dau
from (
    SELECT
        window_start
        , window_end
        , platform
        , count(distinct uid) as bucket_dau
    FROM TABLE(
        CUMULATE(
        TABLE user_log,
        DESCRIPTOR(time),
        INTERVAL '60' SECOND
        , INTERVAL '1' DAY))
    GROUP BY                                  
        window_start
        , window_end
        , platform
        , MOD(HASH_CODE(user_id), 1024)
) tmp
GROUP by   
    window_start
    , window_end
    , platform

优点:如果是曲线图的需求,可以完美回溯曲线图。
缺点:大窗口之间如果有数据乱序,有丢数风险;并且由于是 watermark 推动产出,所以数据产出会有延迟。








或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
select 
    platform
    , count(1) as dau
    , max(time) as time
from (
    select 
        uid
        , platform
        , time
        , row_number() over (partition by uid, platform, time / 24 / 3600 / 1000 order by time desc) rn
    from source
) tmp
where rn = 1
group by
    platform
 优点:计算快。
 缺点:任务发生 failover,曲线图不能很好回溯。没法支持 cube 计算。




或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
SELECT   
    max(time) as time
    , platform
    , sum(bucket_dau) as dau
from (
    SELECT
        max(time) as time
        , platform
        , count(distinct uid) as bucket_dau
    FROM source
    GROUP BY
        platform
        , MOD(HASH_CODE(user_id), 1024)
) t 
GROUP by   
    platform
    
优点:计算快,支持 cube 计算。
缺点:任务发生 failover,曲线图不能很好回溯。

    

多topic 数据更新mysql topic接入mysql

-- 作业开发逻辑

 -- mysql -h数据库 -ubigdata_rw -pe20ycoy3yp09qij0kj8ngpcgxyywgmc9
 -- -Dyarn.application.queue=stream_data   -Dyarn.provided.lib.dirs=/streamx/flink/flink-1.12.5/lib/
CREATE TABLE Direction_Wind_create_source (

       properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()
 ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'Direction_Wind_create_feed',
        'connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092',
        'connector.startup-mode' = 'latest-offset',
        'update-mode' = 'append',
        'format.type' = 'json',
        'connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test',
        'format.derive-schema' = 'true'
 );

 CREATE TABLE Direction_Wind_like_source (
       properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()
 ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'Direction_Wind_like_feed',
        'connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092',
        'connector.startup-mode' = 'latest-offset',
        'update-mode' = 'append',
        'format.type' = 'json',
        'connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test',
        'format.derive-schema' = 'true'
 );

 CREATE TABLE Direction_Wind_comment_source (

       properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()
 ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'Direction_Wind_comment_feed',
        'connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092',
        'connector.startup-mode' = 'latest-offset',
        'update-mode' = 'append',
        'format.type' = 'json',
        'connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test',
        'format.derive-schema' = 'true'
 );


 CREATE TABLE Direction_Wind_share_source (

       properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()
 ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'Direction_Wind_share_feed',
        'connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092',
        'connector.startup-mode' = 'latest-offset',
        'update-mode' = 'append',
        'format.type' = 'json',
        'connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test',
        'format.derive-schema' = 'true'
 );




CREATE TABLE Direction_Wind_create_sink (
  feed_id VARCHAR
  ,user_id VARCHAR
  ,feed_type VARCHAR
  ,time_stamp timestamp(3)
  ,area VARCHAR
  , PRIMARY KEY (feed_id) NOT ENFORCED
 ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false',
        'connector.用户' = 'bigdata_all',
        'connector.pass@word' = '密码',
                'connector.table' = 'Direction_Wind_dynamic_test',

        'connector.write.flush.max-rows' = '1',
        'connector.write.flush.interval' = '2s'
       --  'connector' = 'print'
 );

INSERT INTO Direction_Wind_create_sink
select * from (
SELECT  properties.feed_id as feed_id
        ,properties.user_id as user_id
        ,cast(properties.feed_type as varchar) as feed_type
       --  ,TO_DATE(properties.`timestamp`) as time_stamp
       ,TO_TIMESTAMP(FROM_UNIXTIME(cast(properties.`timestamp` as bigint))) as time_stamp
        ,properties.area  as area
        ,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_create_source
) where rowNum = 1
;


CREATE TABLE Direction_Wind_like_sink (
  feed_id VARCHAR
  ,like_count BIGINT
  ,feed_type VARCHAR
  ,area VARCHAR
  , PRIMARY KEY (feed_id) NOT ENFORCED
 ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false',
        'connector.用户' = 'bigdata_all',
        'connector.pass@word' = '密码',
        'connector.table' = 'Direction_Wind_dynamic_test',
                'connector.write.flush.max-rows' = '1',
        'connector.write.flush.interval' = '2s'
       --  'connector' = 'print'
 );


INSERT INTO Direction_Wind_like_sink
select * from (
SELECT  properties.feed_id as feed_id
        ,properties.like_count  as like_count
        ,cast(properties.feed_type as varchar) as feed_type
        ,properties.area  as area
                ,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_like_source
) where rowNum = 1
;




CREATE TABLE Direction_Wind_comment_sink (
  feed_id VARCHAR
  ,comment_count BIGINT
  ,feed_type VARCHAR
  ,area VARCHAR
  , PRIMARY KEY (feed_id) NOT ENFORCED
 ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false',
        'connector.用户' = 'bigdata_all',
        'connector.pass@word' = '密码',
        'connector.table' = 'Direction_Wind_dynamic_test',
                'connector.write.flush.max-rows' = '1',
        'connector.write.flush.interval' = '2s'
       --  'connector' = 'print'
 );


INSERT INTO Direction_Wind_comment_sink
select * from (
SELECT  properties.feed_id as feed_id
        ,properties.comment_count  as comment_count
        ,cast(properties.feed_type as varchar) as feed_type
        ,properties.area  as area
        ,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_comment_source
) where rowNum = 1
;





CREATE TABLE Direction_Wind_share_sink (
  feed_id VARCHAR
  ,use_count BIGINT
  ,feed_type VARCHAR
  ,area VARCHAR
  , PRIMARY KEY (feed_id) NOT ENFORCED
 ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false',
        'connector.用户' = 'bigdata_all',
        'connector.pass@word' = '密码',
        'connector.table' = 'Direction_Wind_dynamic_test',
                'connector.write.flush.max-rows' = '1',
        'connector.write.flush.interval' = '2s'
       --  'connector' = 'print'
 );


INSERT INTO Direction_Wind_share_sink
select * from (
SELECT  properties.feed_id as feed_id
        ,properties.share_count  as   use_count
        ,cast(properties.feed_type as varchar) as feed_type
        ,properties.area  as area
        ,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_share_source
) where rowNum = 1
;
这里要注意,如果去重直接用group by的方式,在批处理中还好,流式处理中,这部分数据会存放到内容中,并且越积越大,没有ttl,时间一长就会oom了,
Flink SQL Deduplicate 写法,row_number partition by user_id order by proctime asc,此 SQL 最后生成的算子只会在第一条数据来的时候更新 state,后续访问不会更新 state TTL,因此 state 会在用户设置的 state TTL 时间之后过期。  所以按理说 这种去重方式 不会百分百 有用,只能保持一段时间的 去重,感觉是不对的,正在测试中。






经过测试 在flink 1.12 版本时,flinksql的upsert into 功能 ,也就是 这种写法
在这里插入图片描述
是可以实现 update 功能的,但 必须要 group by 数据才行,并且要求把 把所有select 语句中的字段 都加入到 group by 语句,但这么写,又会导致 state 不停增大,过一段时间就会 OOM

引入 upsert-kafka-connector 以1.14.4版本为例

基本工作机制:

  • source:

在这里插入图片描述

  • sink:
    在这里插入图片描述
tenv.executeSql(
    "CREATE TABLE upsert_kafka ("
        + "province STRING, "
        + "pv BIGINT, "
        + "PRIMARY KEY (province) NOT ENFORCED"
        + ") WITH ("
        + "'connector' = 'upsert-kafka', "
        + "'topic' = 'upsert_kafka2', "
        + "'properties.bootstrap.servers' = 'doitedu:9092', "
        + "'key.format' = 'csv', "
        + "'value.format' = 'csv'"
        + ")"
);

DataStreamSource<Row> stream = env.fromElements(
    Row.ofKind(RowKind.INSERT, "sx", 1),
    Row.ofKind(RowKind.INSERT, "sx", 2),
    Row.ofKind(RowKind.INSERT, "gx", 1),
    Row.ofKind(RowKind.INSERT, "sx", 2),
    Row.ofKind(RowKind.INSERT, "gx", 2)
);

tenv.createTemporaryView("s", stream, Schema.newBuilder()
    .column("f0", DataTypes.STRING().notNull())
    .column("f1", DataTypes.INT())
    .build());
// 将查询结果(changelog 流),写入 kafka
tenv.executeSql("insert into upsert_kafka select f0, sum(f1) as pv from s group by f0");

写入的数据为
在这里插入图片描述

// 从 kafka 再读出上面的 changelog 结果 tenv.executeSql(" select * from upsert_kafka").print();
tenv.executeSql("select * from upsert_kafka").print();

读出的数据为
在这里插入图片描述

数据倾斜问题:

⭐ 场景:拿计算直播间的同时在线观看用户数来说,大 v 直播间的人数会比小直播间的任务多几个量级,因此如果计算一个直播间的数据需要注意这种业务数据倾斜的特点
⭐ 解决方案:计算这种数据时,我们可以先按照直播间 id 将数据进行打散,如下 SQL 案例所示(DataStream 也是相同的解决方案),内层打散,外层合并:

select 
    id
    , sum(bucket_uv) as uv
from (
    select 
        id
        , count(distinct uid) as bucket_uv 
    from source 
    group by
        id
        , mod(uid, 1000) -- 将大 v 分桶打散
)
group by id

⭐ 数据任务处理时参数\代码处理逻辑导致倾斜:
⭐ 场景:比如有时候虽然用户已经按照 key 进行分桶计算,但是【最大并发度】设置为 150,【并发度】设置为 100,会导致 keygroup 在 sub-task 的划分不均匀(其中 50 个 sub-task 的 keygroup 为 2 个,剩下的 50 个 sub-task 为 1 个)导致数据倾斜。
⭐ 解决方案:设置合理的【最大并发度】【并发度】,【最大并发度】最好为【并发度】的倍数关系,比如【最大并发度】1024,【并发度】512
⭐ 我已经设置【数据分桶打散】+【最大并发为并发 n 倍】,为啥还出现数据倾斜?
⭐ 场景:你的【数据分桶】和【最大并发数】之间可能是不均匀的。因为 Flink 会将 keyby 的 key 拿到之后计算 hash 值,然后根据 hash 值去决定发送到那个 sub-task 去计算。这是就有可能出现你的【数据分桶】key 经过 hash 计算完成之后,并不能均匀的发到所有的 keygroup 中。比如【最大并发数】4096,【数据分桶】key 只有 1024 个,那么这些数据必然最多只能到 1024 个 keygroup 中,有可能还少于 1024,从而导致剩下的 3072 个 keygroup 没有任何数据
⭐ 解决方案:其实可以利用【数据分桶】key 和【最大并行度】两个参数,在 keyby 中实现和 Flink key hash 选择 keygroup 的算法一致的算法,在【最大并发数】4096,【数据分桶】为 4096 时,做到分桶值为 1 的数据一定会发送到 keygroup1 中,2 一定会发到 keygroup2 中,从而缓解数据倾斜。

最大并行度的设置:最大并行度可以自己设置,也可以框架默认生成;默认的算法是取当前算子并行度的 1.5 倍和 2 的 7 次方比较,取两者之间的最大值,然后用上面的结果和 2 的 15 次方比较,取其中的最小值为默认的最大并行度,非常不建议自动生成,建议用户自己设置。

让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?

在这里插入图片描述
⭐ 有提到将用户上线标记为 1,下线标记为 0 的,然后将上线下线数据发到消息队列用实时计算引擎统计的

⭐ 有提到将用户心跳日志借助 Session Window Dynamic Gap 计算的

在这里插入图片描述

多维高阶聚合

在这里插入图片描述

FlinkSql Upsert 与 Primary Key

在flink1.11 及以后,flinksql 与blink 做了merge 所以有重大变更
流计算的一个典型场景是把聚合的数据写入到 Upsert Sink 中,比如 JDBC、HBase,当遇到复杂的 SQL 时,时常会出现:
在这里插入图片描述
UpsertStreamTableSink 需要上游的 Query 有完整的 Primary Key 信息,不然就直接抛异常。这个现象涉及到 Flink 的 UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,需要按 Key 来更新,所以必须要有 Key 信息。

如何发现 Primary Key?一个方法是让优化器从 Query 中推断,如下图发现 Primary Key 的例子。

这种情况下在简单 Query 当中很好,也满足语义,也非常自然。但是如果是一个复杂的 Query,比如聚合又 Join 再聚合,那就只有报错了。不能期待优化器有多智能,很多情况它都不能推断出 PK,而且,可能业务的 SQL 本身就不能推断出 PK,所以导致了这样的异常。

在这里插入图片描述
怎么解决问题?Flink 1.11 彻底的抛弃了这个机制,不再从 Query 来推断 PK 了,而是完全依赖 Create table 语法。比如 Create 一个 jdbc_table,需要在定义中显式地写好 Primary Key(后面 NOT ENFORCED 的意思是不强校验,因为 Connector 也许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就告诉框架这个Jdbc Sink 会按照对应的 Key 来进行更新。如此,就跟 Query 完全没有关系了,这样的设计可以定义得非常清晰,如何更新完全按照设置的定义来。

CREATE TABLE jdbc_table (
    id BIGINT,
    ...
    PRIMARY KEY (id) NOT ENFORCED
)

flinksql Hive 流批一体

首先看传统的 Hive 数仓。一个典型的 Hive 数仓如下图所示。一般来说,ETL 使用调度工具来调度作业,比如作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的延迟。调度产生 Table1,再产生 Table2,再调度产生 Table3,计算延时需要叠加起来
在这里插入图片描述
问题是慢,延迟大,并且 Ad-hoc 分析延迟也比较大,因为前面的数据入库,或者前面的调度的 ETL 会有很大的延迟。Ad-hoc 分析再快返回,看到的也是历史数据。

所以现在流行构建实时数仓,从 Kafka 读计算写入 Kafka,最后再输出到 BI DB,BI DB 提供实时的数据服务,可以实时查询。Kafka 的 ETL 为实时作业,它的延时甚至可能达到毫秒级。实时数仓依赖 Queue,它的所有数据存储都是基于 Queue 或者实时数据库,这样实时性很好,延时低。但是:

第一,基于 Queue,一般来说就是行存加 Queue,存储效率其实不高。
第二,基于预计算,最终会落到 BI DB,已经是聚合好的数据了,没有历史数据。而且 Kafka 存的一般来说都是 15 天以内的数据,没有历史数据,意味着无法进行 Ad-hoc 分析。所有的分析全是预定义好的,必须要起对应的实时作业,且写到 DB 中,这样才可用。对比来说,Hive 数仓的好处在于它可以进行 Ad-hoc 分析,想要什么结果,就可以随时得到什么结果。
在这里插入图片描述
能否结合离线数仓和实时数仓两者的优势,然后构建一个 Lambda 的架构?

核心问题在于成本过高。无论是维护成本、计算成本还是存储成本等都很高。并且两边的数据还要保持一致性,离线数仓写完 Hive 数仓、SQL,然后实时数仓也要写完相应 SQL,将造成大量的重复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁移、对数据等将带来大量人力成本。如今,实时分析会越来越多,不断的发生迁移,导致重复开发的成本也越来越高。少部分重要的作业尚可接受,如果是大量的作业,维护成本其实是非常大的。

如何既享受 Ad-hoc 的好处,又能实现实时化的优势?一种思路是将 Hive 的离线数仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink 1.11 在 Hive 流批一体上做了一些探索和尝试,如下图所示。它能实时地按 Streaming 的方式来导出数据,写到 BI DB 中,并且这套系统也可以用分析计算框架来进行 Ad-hoc 的分析。这个图当中,最重要的就是 Flink Streaming 的导入。
在这里插入图片描述

Streaming Sink

早期 Flink 版本在 DataStreaming 层,已经有一个强大的 StreamingFileSink 将流数据写到文件系统。它是一个准实时的、Exactly-once 的系统,能实现一条数据不多,一条数据不少的 Sink。
在这里插入图片描述
具体原理是基于两阶段提交:

第一阶段:SnapshotPerTask,关闭需要 Commit 的文件,或者记录正在写的文件的 Offset。
第二阶段:NotifyCheckpointComplete,Rename 需要 Commit 的文件。注意,Rename 是一个原子且幂等的操作,所以只要保证 Rename 的 At-least-once,即可保证数据的 Exactly-once。

这样一个 File system 的 Writer 看似比较完美了。但是在 Hive 数仓中,数据的可见性是依赖 Hive Metastore 的,那在这个流程中,谁来通知 Hive Metastore 呢?
在这里插入图片描述
SQL 层在 StreamingFileSink,扩展了 Partition 的 Committer。

相当于不仅要进行 File 的 Commit,还要进行 Partition 的 Commit。如图所示,FileWriter 对应之前的 StreamingFileSink,它提供的是 Exactly-once 的 FileWriter。而后面再接了一个节点 PartitionCommitter。支持的 Commit Policy 有:

  • 内置支持 Add partition 到 Hive metastore;
  • 支持写 SuccessFile 到文件系统当中;
  • 并且也可以自定义 Committer,比如可以 analysis partition、合并 partition 里面的小文件。

Committer 挂在 Writer 后, 由 Commit Trigger 决定什么时机来 commit :

  • 默认的 commit 时机是,有文件就立即 commit。因为所有 commit 都是可重入的,所以这一点是可允许的。

  • 另外,也支持通过 partition 时间和 Watermark 来共同决定的。比如小时分区,如果现在时间到 11 点,10 点的分区就可以 commit 了。Watermark 保证了作业当前的准确性。

Streaming Source

Hive 数仓中存在大量的 ETL 任务,这些任务往往是通过调度工具来周期性的运行,这样做主要有两个问题:

  • 实时性不强,往往调度最小也是小时级。
  • 流程复杂,组件多,容易出现问题。

针对这些离线的 ETL 作业,Flink 1.11 为此开发了实时化的 Hive 流读,支持:

  • Partition 表,监控 Partition 的生成,增量读取新的 Partition。
  • 非 Partition 表,监控文件夹内新文件的生成,增量读取新的文件。

甚至可以使用 10 分钟级别的分区策略,使用 Flink 的 Hive streaming source 和 Hive streaming sink ,可以大大提高 Hive 数仓的实时性到准实时分钟级,在实时化的同时,也支持针对 Table 全量的 Ad-hoc 查询,提高灵活性。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'=’true’, 'streaming-source.consume-start-offset'='2020-05-20') */;


/*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.partition.include' = 'latest',
            'streaming-source.partition-order' = 'create-time','streaming-source.monitor-interval' = '1 h') */

另外除了 Scan 的读取方式,Flink 1.11 也支持了 Temporal Join 的方式,也就是以前常说的 Streaming Dim Join。

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

目前支持的方式是 Cache All,并且是不感知分区的,比较适合小表的情况。

Hive Dialect

Flink SQL 遵循的是 ANSI-SQL 的标准,而 Hive SQL 有它自己的 HQL 语法,它们之间的语法、语义都有些许不同。

如何让 Hive 用户迁移到 Flink 生态中,同时避免用户太大的学习成本?为此, Flink SQL 1.11 提供了 Hive Dialect,可以使得用户在 Flink 生态中使用 HQL 语言来计算。目前只支持 DDL,后续版本会逐步攻坚 Qeuries。

Filesystem Connector

Hive Integration 提供了一个重量级的集成,功能丰富,但是环境比较复杂。如果只是想要一个轻量级的 Filesystem 读写呢?

Flink table 在长久以来只支持一个 CSV 的 Filesystem Table,并且还不支持 Partition,行为上在某些方面也有些不符合大数据计算的直觉。

Flink 1.11 重构了整个 Filesystem connector 的实现:

  • 结合 Partition,现在,Filesystem connector 支持 SQL 中 Partition 的所有语义,支持 Partition 的 DDL,支持 Partition Pruning,支持静态 / 动态 Partition 的插入,支持 Overwrite 的插入。
  • 支持各种 Formats: ■ CSV ■ JSON ■ Aparch AVRO ■ Apache Parquet ■ Apache ORC
  • 支持 Batch 的读写。
  • 支持 Streaming sink,也支持 Partition commit,支持写 Success 文件。

用几句简单的 SQL,不用搭建 Hive 集成环境即可:

  • 启动一个流作业写入 Filesystem 中,然后在 Hive 端即可查询到 Filesystem 上的数据,相比之前 Datastream 的作业,简单 SQL 即可搞定离线数据的入库。
  • 通过 Filesystem Connector 来查询 Hive 数仓中的数据,功能没有 Hive 集成那么全,但是定义简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1460379.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Github代码仓库SSH配置流程

作者&#xff1a; Herman Ye Auromix 测试环境&#xff1a; Ubuntu20.04 更新日期&#xff1a; 2024/02/21 注1&#xff1a; Auromix 是一个机器人爱好者开源组织。 注2&#xff1a; 由于笔者水平有限&#xff0c;以下内容可能存在事实性错误。 相关背景 在为Github代码仓库配…

秦岭天台山隧道群荣获交通运输部科技示范工程,恒星科通群载波应急广播与无线调度系统产品应用其中

2023年9月12日&#xff0c;全国交通运输科技示范工程现场推进会在河南省平顶山市召开&#xff0c;会上为全国已通过验收的10项科技示范工程进行了授牌&#xff0c;其中由陕西交控集团负责实施的“秦岭天台山超长隧道群安全绿色科技示范工程”名列其中。 该科技示范工程为陕西省…

2024牛客寒假算法基础集训营4(视频讲解题目)

2024牛客寒假算法基础集训营4&#xff08;视频讲解题目&#xff09; 视频链接ABCDEFG、H&#xff08;下面是hard版本的代码两个都可以过&#xff09; 视频链接 2024牛客寒假算法基础集训营4&#xff08;视频讲解题目&#xff09; A #include<bits/stdc.h> #define en…

为全志D1开发板移植LVGL日历控件和显示天气

利用TCP封装HTTP包请求天气信息 Linux还真是逐步熟悉中&#xff0c;现在才了解到Linux即没有原生的GUI&#xff0c;也没有应用层协议栈&#xff0c;所以要实现HTTP应用&#xff0c;必须利用TCP然后自己封装HTTP数据包。本篇即记录封装HTTP数据包&#xff0c;到心知天气请求天气…

亿道丨三防平板电脑厂家丨三防平板PDA丨三防工业平板:数字时代

在当今数字化时代&#xff0c;我们身边的世界变得越来越依赖于智能设备和无线连接。其中&#xff0c;三防平板PDA&#xff08;Personal Digital Assistant&#xff09;作为一种功能强大且耐用的数字工具&#xff0c;正在引领我们进入数字世界的全新征程。 三防平板PDA结合了平板…

在Win系统部署WampServer并实现公网访问本地服务【内网穿透】

目录 推荐 前言 1.WampServer下载安装 2.WampServer启动 3.安装cpolar内网穿透 3.1 注册账号 3.2 下载cpolar客户端 3.3 登录cpolar web ui管理界面 3.4 创建公网地址 4.固定公网地址访问 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0…

【Pytorch深度学习开发实践学习】B站刘二大人课程笔记整理lecture02 线性模型

课程网址 Pytorch深度学习实践 部分课件内容&#xff1a; 代码部分如下&#xff1a; import numpy as np import matplotlib.pyplot as pltx_data [1.0,2.0,3.0] y_data [2.0,4.0,6.0]def forward(x):return x * wdef loss(x,y):y_pred forward(x)return (y_pred - y) ** …

vue实现列表自动无缝滚动列表

大家好&#xff0c;今天给大家分享的知识是vue基于vue-seamless-scroll实现自动无缝滚动列表 一、实现自动滚动 最近在开发过程中遇到一个问题&#xff0c;就是需要实现自动滚动列表&#xff0c;效果图如下 就是这样一个列表在自动循环展示。在这里我是运用的 vue-seamless-sc…

Springboot AOP开发

Springboot AOP开发 一 AOP概述 AOP&#xff0c;即面向切面编程&#xff0c;简言之&#xff0c;面向方法编程。 针对方法&#xff0c;在方法的执行前或执行后使用&#xff0c;用于增强方法&#xff0c;或拓展。 二 AOP开发 1.引入 spring-boot-starter-aop 在SpringBoot项…

计算机视觉基础【OpenCV轻松入门】:获取图像的ROI

OpenCV的基础是处理图像&#xff0c;而图像的基础是矩阵。 因此&#xff0c;如何使用好矩阵是非常关键的。 下面我们通过一个具体的实例来展示如何通过Python和OpenCV对矩阵进行操作&#xff0c;从而更好地实现对图像的处理。 ROI&#xff08;Region of Interest&#xff09;是…

十三、集合进阶——单列集合 及 数据结构

单列集合 及 数据结构 13.1 集合体系结构13.1.2 单列集合1. Collection2.Collection 的遍历方式迭代器遍历增强for遍历Lambda表达式遍历 3.List集合List集合的特有方法List集合的遍历方式五种遍历方式对比 4.数据结构1).栈2).队列3&#xff09;数组4&#xff09;链表小结5&…

【程序员英语】【美语从头学】初级篇(入门)(笔记)Lesson 15 At the Department Store 在百货商店

《美语从头学初级入门篇》 注意&#xff1a;被 删除线 划掉的不一定不正确&#xff0c;只是不是标准答案。 文章目录 Lesson 15 At the Department Store 在百货商店会话A会话B笔记 Lesson 15 At the Department Store 在百货商店 会话A A: Can you help me, please? B: Sur…

世强硬创与SMT激光模板制造商光盛激光达成平台合作

近日&#xff0c;世强先进&#xff08;深圳&#xff09;科技股份有限公司&#xff08;下称“世强先进”&#xff09;与一家专注生产研发电子工业用工模具的企业——东莞光盛激光科技有限公司&#xff08;下称“光盛激光”&#xff09;达成平台合作。 据了解&#xff0c;光盛激光…

Facebook与数字创新:引领社交媒体的数字化革命

在当今数字化时代&#xff0c;社交媒体已经成为了人们日常生活中不可或缺的一部分。而在众多社交媒体平台中&#xff0c;Facebook作为领头羊&#xff0c;一直致力于推动数字创新&#xff0c;引领着社交媒体的数字化革命。本文将探讨Facebook在数字创新方面的表现&#xff0c;以…

绝地求生:图纸的加量不加价是否预示着蓝洞经营模式的转变

成长型武器目前作为PUBG中除了究极异色皮肤外的最高等级武器&#xff08;传说级&#xff09;&#xff0c;也是PUBG核心利润来源&#xff0c;十分的珍贵。 一把成长型武器的保底价格为3000碎片&#xff0c;而每次通过G-coin抽取会赠送10个碎片&#xff0c;也就是需要抽取三百次&…

外汇交易解决方案丨实时选取外汇行情多价源最优价

在外汇交易中&#xff0c;存在多个价源。多个价源之间&#xff0c;同一时刻的报价可能存在差异。在多个价源之间&#xff0c;实时选取最优价格具有重要意义&#xff0c;它有助于投资者获得更有利的交易执行价&#xff0c;降低交易成本&#xff0c;优化流动性&#xff0c;分散交…

中科大计网学习记录笔记(十四):多路复用与解复用 | 无连接传输:UDP

前言&#xff1a; 学习视频&#xff1a;中科大郑烇、杨坚全套《计算机网络&#xff08;自顶向下方法 第7版&#xff0c;James F.Kurose&#xff0c;Keith W.Ross&#xff09;》课程 该视频是B站非常著名的计网学习视频&#xff0c;但相信很多朋友和我一样在听完前面的部分发现信…

YOLO-NAS浅析

YOLO-NAS&#xff08;You Only Look Once - Neural Architecture Search&#xff09;是一种基于YOLO&#xff08;You Only Look Once&#xff09;的目标检测算法&#xff0c;结合神经架构搜索&#xff08;NAS&#xff09;技术来优化模型性能。 YOLO是一种实时目标检测算法&…

攻防世界-web-Training-WWW-Robots

题目信息 In this little training challenge, you are going to learn about the Robots_exclusion_standard. The robots.txt file is used by web crawlers to check if they are allowed to crawl and index your website or only parts of it. Sometimes these files rev…

pytest框架学习总结:失败用例如何处理?

当我们跑用例的时候&#xff0c;有些用例可能会失败&#xff0c;可以对失败的用例设置做如下管理&#xff1a; 1、失败重跑: --reruns 2 --reruns-delay 5 2、失败了停止执行后续的用例&#xff1a;pytest -x 3、设置最多失败多少用例会停止执行:pytest --maxfail2 4、跳过用例…