【实时数仓】DWS层之地区主题表(FlinkSQL)、关键词主题表(FlinkSQL)

news2025/1/5 9:26:18

文章目录

  • 一 DWS层-地区主题表(FlinkSQL)
    • 1 分组开窗和聚合计算
      • (1)分组窗口
      • (2)选择分组窗口的开始和结束时间戳
      • (3)系统内置函数
      • (4)完整代码
    • 2 将动态表转换为流,写入ClickHouse
      • (1)定义地区统计宽表实体类ProvinceStats
      • (2)代码实现
      • (3)在ClickHouse中创建地区主题宽表
    • 3 整体测试
  • 二 DWS层-关键词主题表(FlinkSQL)
    • 1 需求分析与思路
      • (1) 关于分词
    • 2 IK分词器的使用
      • (1)在pom.xml中加入依赖
      • (2)封装分词工具类并进行测试
    • 3 自定义函数
      • (1)自定义函数分类
      • (2)封装KeywordUDTF函数
    • 4 创建KeywordStatsApp,定义流环境
    • 5 声明动态表和自定义函数
      • (1)`MAP`数据类型
      • (2)`FROM_UNIXTIME`函数
      • (3)代码

一 DWS层-地区主题表(FlinkSQL)

1 分组开窗和聚合计算

(1)分组窗口

SQL 查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:

分组窗口函数描述
TUMBLE(time_attr, interval)定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
HOP(time_attr, interval, interval)定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
SESSION(time_attr, interval)定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

详细说明。

(2)选择分组窗口的开始和结束时间戳

可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性:

辅助函数描述
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval)返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval)返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。注意: 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为 行时间属性 使用,比如 interval join 以及 分组窗口或分组窗口上的聚合。
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval)返回相对应的滚动、滑动和会话窗口范围以内的上界时间戳。返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如interval join 以及 分组窗口或分组窗口上的聚合。
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)返回一个可用于后续需要基于时间的操作的 处理时间参数,比如interval join 以及 分组窗口或分组窗口上的聚合.

注意: 辅助函数必须使用与 GROUP BY 子句中的分组窗口函数完全相同的参数来调用。

(3)系统内置函数

将时间戳转化为字符串。

DATE_FORMAT(timestamp, string)Attention This function has serious bugs and should not be used for now. Please implement a custom UDF instead or use EXTRACT as a workaround.

获取当前系统时间,单位为秒。

UNIX_TIMESTAMP()Gets current Unix timestamp in seconds.Note: This function is not deterministic which means the value would be recalculated for each record.Only supported in blink planner.

(4)完整代码

Table provinceStatTable = tableEnv.sqlQuery("select " +
        "  DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as stt, " +
        "  DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as edt, " +
        "  province_id, " +
        "  province_name, " +
        "  province_area_code area_code, " +
        "  province_iso_code iso_code, " +
        "  province_3166_2_code iso_3166_2, " +
        "  count(distinct order_id) order_count, " +
        "  sum(split_total_amount) order_amount, " +
        "  UNIX_TIMESTAMP() * 1000 as ts" +
        " from " +
        "  order_wide " +
        " group by " +
        "  TUMBLE(rowtime, INTERVAL '10' SECOND), " +
        "  province_id, " +
        "  province_name, " +
        "  province_area_code, " +
        "  province_iso_code, " +
        "  province_3166_2_code");

2 将动态表转换为流,写入ClickHouse

官网说明。

(1)定义地区统计宽表实体类ProvinceStats

package com.hzy.gmall.realtime.beans;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.Date;

/**
 * Desc:地区统计宽表实体类
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ProvinceStats {

    private String stt;
    private String edt;
    private Long province_id;
    private String province_name;
    private String area_code;
    private String iso_code;
    private String iso_3166_2;
    private BigDecimal order_amount;
    private Long  order_count;
    private Long ts;

    public ProvinceStats(OrderWide orderWide){
        province_id = orderWide.getProvince_id();
        order_amount = orderWide.getSplit_total_amount();
        province_name=orderWide.getProvince_name();
        area_code=orderWide.getProvince_area_code();
        iso_3166_2=orderWide.getProvince_iso_code();
        iso_code=orderWide.getProvince_iso_code();

        order_count = 1L;
        ts=new Date().getTime();
    }
}

(2)代码实现

        // TODO 5 将动态表转换为流
        DataStream<ProvinceStats> provinceStatsDS = tableEnv.toAppendStream(provinceStatTable, ProvinceStats.class);

        provinceStatsDS.print(">>>");

        // TODO 6 将流中的数据写到ClickHouse中
        provinceStatsDS.addSink(
                ClickhouseUtil.getJdbcSink("insert into  province_stats_2022  values(?,?,?,?,?,?,?,?,?,?)")
        );

(3)在ClickHouse中创建地区主题宽表

create table province_stats_2022 (
   stt DateTime,
   edt DateTime,
   province_id  UInt64,
   province_name String,
   area_code String ,
   iso_code String,
   iso_3166_2 String , 
   order_amount Decimal64(2),
   order_count UInt64 , 
   ts UInt64
)engine =ReplacingMergeTree(ts)
   partition by  toYYYYMMDD(stt)
   order by   (stt,edt,province_id );

3 整体测试

  • 启动ZK、Kafka、ClickHouse、Redis、HDFS、Hbase、Maxwell
  • 运行BaseDBApp
  • 运行OrderWideApp
  • 运行ProvinceStatsSqlApp
  • 运行rt_dblog目录下的jar包
  • 查看控制台输出
  • 查看ClickHouse中products_stats_2022表数据

注意:因为是事件时间,所以第一次运行rt_dblog的时候,不会触发watermark,第二次再运行rt_dblog的jar的时候,才会触发第一次运行的watermark。

二 DWS层-关键词主题表(FlinkSQL)

1 需求分析与思路

关键词主题主要是为了大屏展示中的字符云的展示效果,用于感性的让大屏观看者感知目前的用户都更关心的那些商品和关键词。

关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小。

关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词。

(1) 关于分词

因为无论是从用户的搜索栏中,还是从商品名称中文字都是可能是比较长的,且由多个关键词组成,如下图。

在这里插入图片描述
在这里插入图片描述

所以需要根据把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到。对于中文分词,现在的搜索引擎基本上都是使用的第三方分词器,在计算数据中也可以使用和搜索引擎中一致的分词器,IK。

2 IK分词器的使用

(1)在pom.xml中加入依赖

<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
    <version>2012_u6</version>
</dependency>

(2)封装分词工具类并进行测试

package com.hzy.gmall.realtime.utils;
/**
 * 使用IK分子器进行分词
 */
public class KeywordUtil {
    // 分词方法
    public static List<String> analyze(String text){
        StringReader reader = new StringReader(text);
        IKSegmenter ikSegmenter = new IKSegmenter(reader,true);
        List<String> resList = new ArrayList<>();
        try {
            Lexeme lexeme = null;
            while ( (lexeme = ikSegmenter.next()) != null){
                resList.add(lexeme.getLexemeText());
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
        return resList;
    }

    public static void main(String[] args) {
        String text = "荣耀Play6T Pro 天玑810 40W超级快充 6nm疾速芯 4800万超清双摄 全网通 5G手机 8GB+256GB 钛空银";
        System.out.println(KeywordUtil.analyze(text));
    }
}

3 自定义函数

有了分词器,那么另外一个要考虑的问题就是如何把分词器的使用揉进FlinkSQL中。

因为SQL的语法和相关的函数都是Flink内定的,想要使用外部工具,就必须结合自定义函数。

(1)自定义函数分类

  • Scalar Function(相当于 Spark的 UDF)
  • Table Function(相当于 Spark 的 UDTF)
  • Aggregation Functions (相当于 Spark的UDAF)

考虑到一个词条包括多个词语所以分词是指上是一种一对多的拆分,一拆多的情况,应该选择Table Function。

(2)封装KeywordUDTF函数

官网示例。

跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。

要定义一个表值函数,你需要扩展 org.apache.flink.table.functions 下的 TableFunction,可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 collect(T) 方法来发送要输出的行。

在 Table API 中,表值函数是通过 .joinLateral(...) 或者 .leftOuterJoinLateral(...) 来使用的。joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。

在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE(<TableFunction>) 的使用。

package com.hzy.gmall.realtime.app.fun;
/**
 * 自定义UDTF函数
 */
// 注解表示Row(一行)中有几列,列名是什么
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public class KeywordUDTF extends TableFunction<Row> {

    public void eval(String text) {
        List<String> keywordList = KeywordUtil.analyze(text);
        for (String keyword : keywordList) {
            // use collect(...) to emit a row
            collect(Row.of(keyword));
        }
    }
}

4 创建KeywordStatsApp,定义流环境

package com.hzy.gmall.realtime.app.dws;

import com.hzy.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class KeywordStatsApp {
    public static void main(String[] args) throws Exception {
        // TODO 1 环境准备
        // 1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 1.3 设置并行度
        env.setParallelism(4);

        // TODO 2 检查点相关设置(略)
        env.execute();
    }
}

5 声明动态表和自定义函数

注意json格式的要定义为Map对象,数据类型说明。

(1)MAP数据类型

将键(包括 NULL)映射到值(包括 NULL)的关联数组的数据类型。映射不能包含重复的键;每个键最多可以映射到一个值。

元素类型没有限制;确保唯一性是用户的责任。

Map 类型是 SQL 标准的扩展。

声明MAP<kt, vt>,此类型用 MAP<kt, vt> 声明,其中 kt 是键的数据类型,vt 是值的数据类型。

(2)FROM_UNIXTIME函数

FROM_UNIXTIME(numeric[, string])Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig).E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone.Only supported in blink planner.

(3)代码

// TODO 3 从指定的数据源(kafka)读取数据,转换为动态表
String topic = "dwd_page_log";
String groupId = "keyword_stats_app_group";
// 表字段要和JSON的属性一一对应
tableEnv.executeSql("CREATE TABLE page_view (" +
        " common MAP<STRING,STRING>," +
        " page MAP<STRING,STRING>," +
        " ts BIGINT," +
        " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss'))," +
        " WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND" +
        " ) WITH (" + MyKafkaUtil.getKafkaDDL(topic,groupId) + ")");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/114674.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机基础(五):C语言的程序的处理过程

一段C语言程序 打开任何一个C语言的教程&#xff0c;首先自然是展示一段 Hello World 的程序&#xff0c;类似于下面这段&#xff1a; #include <stdio.h>int main() {/* 我的第一个 C 程序 */printf("Hello, World! \n");return 0; }运行上面这段程序也很简…

Qt中的数据库(简单使用)

在Qt中支持对数据库的操作 Qt中数据库的类有&#xff1a; 驱动层&#xff1a;为具体的数据库和SQL接口层之间提供底层的桥梁SQL层&#xff1a;提供对数据库的访问 QSqlDateBase类用来创建连接QSqlQuery可以使用SQL语句实现交互用户接口层&#xff1a;实现将数据库中的数据链接…

36. 卷积神经网络(LeNet)

通过之前几节&#xff0c;我们学习了构建一个完整卷积神经网络的所需组件。 回想一下&#xff0c;之前我们将softmax回归模型和多层感知机模型应用于Fashion-MNIST数据集中的服装图片&#xff0c;为了能够应用softmax回归和多层感知机&#xff1a; 我们首先将每个大小为的图像…

【Web开发】Python实现Web服务器(Ubuntu下Flask使用MySQL数据库)

&#x1f37a;基于Python的Web服务器系列相关文章编写如下&#x1f37a;&#xff1a; &#x1f388;【Web开发】Python实现Web服务器&#xff08;Flask快速入门&#xff09;&#x1f388;&#x1f388;【Web开发】Python实现Web服务器&#xff08;Flask案例测试&#xff09;&a…

大数据-玩转数据-Kafka实战

一、kafka使用要点 要点一&#xff1a;Producer即生产者&#xff0c;向Kafka集群发送消息&#xff0c;在发送消息之前&#xff0c;会对消息进行分类&#xff0c;即Topic&#xff0c;topic1&#xff0c;topic2。Topic即主题&#xff0c;通过对消息指定主题可以将消息分类&#…

工具-能写会看正则表达式【强烈建议收藏!】

正则表达式,很常用的一个技能点,但是一般的开发流程中都是这样的: 需要验证数据上网搜一下正则表达式CV 搞定!!!今天有时间回看了一下文档,简单整理了一下里面需要注意的点,并且通过分析几个常见的正则表达式,下次遇到正则争取不再只依靠 CV 大法! 基础部分 基本语法 …

初识指针(9)

目录 1、指针是什么&#xff1f; 2、指针和指针类型 1、指针- 整数 2、指针的解引用 3、野指针 1、野指针成因 2、如何规避野指针 4、指针运算 1、指针- 整数 2、指针- 指针 3、指针的关系运算 5、指针和数组 6、二级指针 7、指针数组 1、指针是什么&#xff1f;…

04-Hystrix

服务熔断Hystrix 1. Hystrix是什么 分布式系统环境下&#xff0c;服务间类似依赖非常常见&#xff0c;一个业务调用通常依赖多个基础服务。如下图&#xff0c;对于同步调用&#xff0c;当库存服务不可用时&#xff0c;商品服务请求线程被阻塞&#xff0c;当有大批量请求调用库…

SpringBoot-2 读取properties;自动加载127个类原理总结;全部加载,按需配置

读取properties 方式一&#xff1a;非配置类填写&#xff1a;ComponentConfigurationProperties 1)建立bean&#xff1a; /只有在容器中的组件才拥有springboot提供的强大功能 Component ConfigurationProperties(prefix "mycar") public class Car {private Stri…

【机器学习】模型评估与选择

模型评估与选择 目录一、评估方法1、留出法2、交叉验证法3、自助法二、性能度量1、错误率与准确率2、查准率、查全率阈值对查准率、查全率的影响3、F1度量&#xff08;基于查准率与查全率的调和平均&#xff09;4、P-R Curve5、ROC CurvePRC和ROC的选用准则PRC和ROC的差异6、代…

python 中文转带音调的拼音

python 中文转带音调的拼音 前言python 中文转带音调的拼音1、1.1 安装pinyin模块1.2 试验1.3 效果图1.4 代码实现前言 今天整理中药材,每个药材上标上带音调的拼音,查了些,有的易形成乱码,如Shān Mi Dōnɡ,想到python自已动手转算了 python 中文转带音调的拼音 1、 …

(五)汇编语言——[bx]和loop指令

目录 [...]与&#xff08;...&#xff09; [...] &#xff08;...&#xff09; idata Loop指令 段前缀 总结 [...]与&#xff08;...&#xff09; [...] 这个我们其实见过&#xff0c;代表的是一个内存单元&#xff0c;段地址在DS中&#xff0c;偏移地址就是[bx]。 &am…

《图解TCP/IP》阅读笔记(第七章 7.4)—— RIP 路由信息协议

7.4 RIP RIP&#xff08;Routing Information Protocol&#xff0c;路由信息协议&#xff09;&#xff0c;是一种距离向量算法&#xff0c;广泛用于LAN。 该协议将路由控制信息定期&#xff08;30秒一次&#xff09;向全网广播。如果没有收到路由控制信息&#xff0c;连接就会…

【1739. 放置盒子】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 有一个立方体房间&#xff0c;其长度、宽度和高度都等于 n 个单位。请你在房间里放置 n 个盒子&#xff0c;每个盒子都是一个单位边长的立方体。放置规则如下&#xff1a; 你可以把盒子放在地板上的…

筛法(线性筛,厄拉多塞筛)

在前前前前前前…的博客中,我们主要谈了欧拉筛和埃氏筛. 今天我们主要来聊一聊线性筛和厄拉多塞筛(其实和埃氏筛和欧拉筛本质上没区别哎).其实在这两种筛法中厄拉多塞筛最好懂(就连本蒟蒻一看代码就明白了…别看这个名字,容易糊弄人) 首先是厄拉多塞筛"粉墨登场"::…

HRTransNet阅读理解

E. Dual-direction short connection fusion module HRFormer applies transformer blocks to enlarge receptive field of fused feature Frs, and uses exchange units to absorb the merits of multi-scales features. The process is described as: HRFormer使用TRM块来扩…

《教育的目的》笔记——如何让学生通过树木看见森林

目录 作者简介 个人感悟 经典摘录 1、学生所受的训练应该比他们最终投身的专业更为广泛 2、学习新语言用途 3、教师的责任 4、教育的主题 5、学到的有用之物 6、教育目的 7、所有事物都不是静态的、定型的&#xff0c;而是处于形成&#xff08;becoming&#xff09;过…

Merry Xmas | 用Matplotlib画个3D圣诞树送给你!~

1写在前面 Merry Christmas ! &#x1f973; 过完圣诞就要跨年了&#xff0c;希望2023年自己可以一扫霉运&#xff0c;顺顺利利&#xff01;&#xff01;&#xff01;&#x1f618; 从网上抄了个英文的祝福语送给大家&#xff08;主要是懒&#xff09;: &#x1f447; I hope S…

【Unity】VideoPlayer实现视频播放

【Unity】VideoPlayer实现视频播放 背景&#xff1a;开发影院场景需要在荧幕上播放视频 环境&#xff1a;Unity2021.3 VideoPlayer的简单使用&#xff1a;http://t.csdn.cn/K8665 局限&#xff1a;上述方法会使得视频播放窗口强制在相机前&#xff1b; 需求&#xff1a;视频播…

怎么看懂单片机时序图?

本人没有上过单片机相关的专业课&#xff0c;是在《计算机系统结构》里遇见的时序图。由于看不懂加之老师没有专门讲&#xff0c;因此自行查阅了相关的视频和博客。&#xff08;参考视频已放在文末&#xff09; 网上资源贫瘠&#xff0c;不过我也不需要太过深入的知识。 大家…