文章目录
- 一 DWS层-地区主题表(FlinkSQL)
- 1 分组开窗和聚合计算
- (1)分组窗口
- (2)选择分组窗口的开始和结束时间戳
- (3)系统内置函数
- (4)完整代码
- 2 将动态表转换为流,写入ClickHouse
- (1)定义地区统计宽表实体类ProvinceStats
- (2)代码实现
- (3)在ClickHouse中创建地区主题宽表
- 3 整体测试
- 二 DWS层-关键词主题表(FlinkSQL)
- 1 需求分析与思路
- (1) 关于分词
- 2 IK分词器的使用
- (1)在pom.xml中加入依赖
- (2)封装分词工具类并进行测试
- 3 自定义函数
- (1)自定义函数分类
- (2)封装KeywordUDTF函数
- 4 创建KeywordStatsApp,定义流环境
- 5 声明动态表和自定义函数
- (1)`MAP`数据类型
- (2)`FROM_UNIXTIME`函数
- (3)代码
一 DWS层-地区主题表(FlinkSQL)
1 分组开窗和聚合计算
(1)分组窗口
SQL 查询的分组窗口是通过 GROUP BY
子句定义的。类似于使用常规 GROUP BY
语句的查询,窗口分组语句的 GROUP BY
子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:
分组窗口函数 | 描述 |
---|---|
TUMBLE(time_attr, interval) | 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
HOP(time_attr, interval, interval) | 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
SESSION(time_attr, interval) | 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 |
详细说明。
(2)选择分组窗口的开始和结束时间戳
可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性:
辅助函数 | 描述 |
---|---|
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) | 返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。 |
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) | 返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。注意: 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为 行时间属性 使用,比如 interval join 以及 分组窗口或分组窗口上的聚合。 |
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) | 返回相对应的滚动、滑动和会话窗口范围以内的上界时间戳。返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如interval join 以及 分组窗口或分组窗口上的聚合。 |
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) | 返回一个可用于后续需要基于时间的操作的 处理时间参数,比如interval join 以及 分组窗口或分组窗口上的聚合. |
注意: 辅助函数必须使用与 GROUP BY
子句中的分组窗口函数完全相同的参数来调用。
(3)系统内置函数
将时间戳转化为字符串。
DATE_FORMAT(timestamp, string) | Attention This function has serious bugs and should not be used for now. Please implement a custom UDF instead or use EXTRACT as a workaround. |
---|---|
获取当前系统时间,单位为秒。
UNIX_TIMESTAMP() | Gets current Unix timestamp in seconds.Note: This function is not deterministic which means the value would be recalculated for each record.Only supported in blink planner. |
---|---|
(4)完整代码
Table provinceStatTable = tableEnv.sqlQuery("select " +
" DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as stt, " +
" DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as edt, " +
" province_id, " +
" province_name, " +
" province_area_code area_code, " +
" province_iso_code iso_code, " +
" province_3166_2_code iso_3166_2, " +
" count(distinct order_id) order_count, " +
" sum(split_total_amount) order_amount, " +
" UNIX_TIMESTAMP() * 1000 as ts" +
" from " +
" order_wide " +
" group by " +
" TUMBLE(rowtime, INTERVAL '10' SECOND), " +
" province_id, " +
" province_name, " +
" province_area_code, " +
" province_iso_code, " +
" province_3166_2_code");
2 将动态表转换为流,写入ClickHouse
官网说明。
(1)定义地区统计宽表实体类ProvinceStats
package com.hzy.gmall.realtime.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.Date;
/**
* Desc:地区统计宽表实体类
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ProvinceStats {
private String stt;
private String edt;
private Long province_id;
private String province_name;
private String area_code;
private String iso_code;
private String iso_3166_2;
private BigDecimal order_amount;
private Long order_count;
private Long ts;
public ProvinceStats(OrderWide orderWide){
province_id = orderWide.getProvince_id();
order_amount = orderWide.getSplit_total_amount();
province_name=orderWide.getProvince_name();
area_code=orderWide.getProvince_area_code();
iso_3166_2=orderWide.getProvince_iso_code();
iso_code=orderWide.getProvince_iso_code();
order_count = 1L;
ts=new Date().getTime();
}
}
(2)代码实现
// TODO 5 将动态表转换为流
DataStream<ProvinceStats> provinceStatsDS = tableEnv.toAppendStream(provinceStatTable, ProvinceStats.class);
provinceStatsDS.print(">>>");
// TODO 6 将流中的数据写到ClickHouse中
provinceStatsDS.addSink(
ClickhouseUtil.getJdbcSink("insert into province_stats_2022 values(?,?,?,?,?,?,?,?,?,?)")
);
(3)在ClickHouse中创建地区主题宽表
create table province_stats_2022 (
stt DateTime,
edt DateTime,
province_id UInt64,
province_name String,
area_code String ,
iso_code String,
iso_3166_2 String ,
order_amount Decimal64(2),
order_count UInt64 ,
ts UInt64
)engine =ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,province_id );
3 整体测试
- 启动ZK、Kafka、ClickHouse、Redis、HDFS、Hbase、Maxwell
- 运行BaseDBApp
- 运行OrderWideApp
- 运行ProvinceStatsSqlApp
- 运行rt_dblog目录下的jar包
- 查看控制台输出
- 查看ClickHouse中products_stats_2022表数据
注意:因为是事件时间,所以第一次运行rt_dblog的时候,不会触发watermark,第二次再运行rt_dblog的jar的时候,才会触发第一次运行的watermark。
二 DWS层-关键词主题表(FlinkSQL)
1 需求分析与思路
关键词主题主要是为了大屏展示中的字符云的展示效果,用于感性的让大屏观看者感知目前的用户都更关心的那些商品和关键词。
关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小。
关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词。
(1) 关于分词
因为无论是从用户的搜索栏中,还是从商品名称中文字都是可能是比较长的,且由多个关键词组成,如下图。
所以需要根据把长文本分割成一个一个的词,这种分词技术,在搜索引擎中可能会用到。对于中文分词,现在的搜索引擎基本上都是使用的第三方分词器,在计算数据中也可以使用和搜索引擎中一致的分词器,IK。
2 IK分词器的使用
(1)在pom.xml中加入依赖
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
(2)封装分词工具类并进行测试
package com.hzy.gmall.realtime.utils;
/**
* 使用IK分子器进行分词
*/
public class KeywordUtil {
// 分词方法
public static List<String> analyze(String text){
StringReader reader = new StringReader(text);
IKSegmenter ikSegmenter = new IKSegmenter(reader,true);
List<String> resList = new ArrayList<>();
try {
Lexeme lexeme = null;
while ( (lexeme = ikSegmenter.next()) != null){
resList.add(lexeme.getLexemeText());
}
} catch (IOException e) {
e.printStackTrace();
}
return resList;
}
public static void main(String[] args) {
String text = "荣耀Play6T Pro 天玑810 40W超级快充 6nm疾速芯 4800万超清双摄 全网通 5G手机 8GB+256GB 钛空银";
System.out.println(KeywordUtil.analyze(text));
}
}
3 自定义函数
有了分词器,那么另外一个要考虑的问题就是如何把分词器的使用揉进FlinkSQL中。
因为SQL的语法和相关的函数都是Flink内定的,想要使用外部工具,就必须结合自定义函数。
(1)自定义函数分类
- Scalar Function(相当于 Spark的 UDF)
- Table Function(相当于 Spark 的 UDTF)
- Aggregation Functions (相当于 Spark的UDAF)
考虑到一个词条包括多个词语所以分词是指上是一种一对多的拆分,一拆多的情况,应该选择Table Function。
(2)封装KeywordUDTF函数
官网示例。
跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。
要定义一个表值函数,你需要扩展 org.apache.flink.table.functions
下的 TableFunction
,可以通过实现多个名为 eval
的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction
类的泛型参数 T
,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 collect(T)
方法来发送要输出的行。
在 Table API 中,表值函数是通过 .joinLateral(...)
或者 .leftOuterJoinLateral(...)
来使用的。joinLateral
算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral
算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。
在 SQL 里面用 JOIN
或者 以 ON TRUE
为条件的 LEFT JOIN
来配合 LATERAL TABLE(<TableFunction>)
的使用。
package com.hzy.gmall.realtime.app.fun;
/**
* 自定义UDTF函数
*/
// 注解表示Row(一行)中有几列,列名是什么
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public class KeywordUDTF extends TableFunction<Row> {
public void eval(String text) {
List<String> keywordList = KeywordUtil.analyze(text);
for (String keyword : keywordList) {
// use collect(...) to emit a row
collect(Row.of(keyword));
}
}
}
4 创建KeywordStatsApp,定义流环境
package com.hzy.gmall.realtime.app.dws;
import com.hzy.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class KeywordStatsApp {
public static void main(String[] args) throws Exception {
// TODO 1 环境准备
// 1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.3 设置并行度
env.setParallelism(4);
// TODO 2 检查点相关设置(略)
env.execute();
}
}
5 声明动态表和自定义函数
注意json格式的要定义为Map对象,数据类型说明。
(1)MAP
数据类型
将键(包括 NULL
)映射到值(包括 NULL
)的关联数组的数据类型。映射不能包含重复的键;每个键最多可以映射到一个值。
元素类型没有限制;确保唯一性是用户的责任。
Map 类型是 SQL 标准的扩展。
声明MAP<kt, vt>
,此类型用 MAP<kt, vt>
声明,其中 kt
是键的数据类型,vt
是值的数据类型。
(2)FROM_UNIXTIME
函数
FROM_UNIXTIME(numeric[, string]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig).E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone.Only supported in blink planner. |
---|---|
(3)代码
// TODO 3 从指定的数据源(kafka)读取数据,转换为动态表
String topic = "dwd_page_log";
String groupId = "keyword_stats_app_group";
// 表字段要和JSON的属性一一对应
tableEnv.executeSql("CREATE TABLE page_view (" +
" common MAP<STRING,STRING>," +
" page MAP<STRING,STRING>," +
" ts BIGINT," +
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss'))," +
" WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND" +
" ) WITH (" + MyKafkaUtil.getKafkaDDL(topic,groupId) + ")");