前言
今天开始 DWS 层的搭建,不知不觉又是周一,都忘了昨天是周末,近两年对我来说,周六日晚上八九点能打一小会篮球就算一周的休息了。不得不说自己真的是天生打工体质,每天不管多累,晚上十二点睡,第二天六点多七点准时自然醒,依然精神焕发,中午都不带困的;那既然老天给我这个特质让我像牛一样可以不知疲倦的工作,那我也希望是让我在热爱的领域发光发热;那既然这样,总得先让我找到个满意的工作吧哈哈哈 ...
1、DWS 层搭建
设计要点:
- DWS层的设计参考指标体系(需求驱动);(前面的 DIM 和 DWD 的设计都是参考建模理论,是业务驱动)
- DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)
离线数仓中的 DWS 层的统计周期我们当时做的是 1/7/30 ,那实时数仓的统计周期当然不能这么大;离线数仓中每一天就相当于一个窗口,而在实时数仓当中,窗口都是秒级别的,我们这里开窗的大小选择 10 s,因为我们的可视化平台只能 10s 刷新一次,开得太小没有意义;(生产环境中可以更小比如 1s ,甚至可以不开窗。开窗还是不开窗是性能和时效性的取舍)
1.1、流量域来源关键词粒度页面浏览各窗口汇总表
主要任务:
从 Kafka 页面浏览明细(dwd_traffic_page_log)主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。
1.1.1、思路分析
- 在 DWD 层,我们对日志根据日志类型进行了分流,写入到了 5 个不同的主题当中
- 现在我们需要统计搜索内容中的关键词,所以需要消费页面浏览日志
- 使用分词器将搜索内容分为多个关键词
- 划分窗口,词频统计后存储进 clickhouse
思考:既然用到分词,为啥不直接用 ES 存呢?
答:确实是要分词,但是我们这里是要做词频统计,ES 是对关键词做索引,相当于用 key(关键词)去获得 value(文档),而我们这里是要对 key 进行统计,所以不合适;
1.1.2、代码实现
1)IK 分词器工具类
public class KeywordUtil {
public static List<String> analyze(String text){
// 创建集合用于存放切分或的数据
List<String> keywordList = new ArrayList<>();
// 封装待分词内容
StringReader reader = new StringReader(text);
// 创建 IK 分词器(ik_smart 智能分词,ik_max_word: 尽可能分最多的词)
IKSegmenter ikSegmenter = new IKSegmenter(reader,true);
try {
// 取出切分好的词
Lexeme lexeme = null;
while((lexeme = ikSegmenter.next())!=null){
String keyword = lexeme.getLexemeText();
keywordList.add(keyword);
}
} catch (IOException e) {
e.printStackTrace();
}
return keywordList;
}
public static void main(String[] args) {
List<String> list = analyze("Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待");
System.out.println(list);
}
}
2)自定义 UDTF
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public class IkUDTF extends TableFunction<Row> {
public void eval(String str){
for (String word : KeywordUtil.analyze(str)) {
collect(Row.of(word));
}
}
}
3) 消费页面浏览日志主题
我们相当于一个消费者去消费页面浏览主题,那么就需要先创建该表,也就需要先确定我们要的字段。在事件时间语义下使用窗口函数的时候我们需要指定事件时间的字段;
前面我们为了 join lookup 表的时候那样(要想 join lookup 表,必须要有一个处理时间字段):
只不过我们现在需要指定一个事件时间,我们同样可以通过 DDL 中来指定:
对于这里的关键词需求而言,我们不需要保留 common 字段,所以建表如下:
// TODO 3. 消费 Kafka dwd_traffic_page_log 主题
String groupId = "dws_traffic_source_keyword_page_view_window";
tableEnv.executeSql(
"CREATE TABLE dwd_traffic_page_log " +
" page map<string,string>, " +
" ts bigint , " +
" time_ltz AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)), " +
" WATERMARK FOR time_ltz AS time_ltz - INTERVAL '2' SECOND " +
MyKafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));
这里我们指定了 time_ltz 为事件时间字段以及乱序延迟时间最大为 2s,这里为什么不直接使用 ts 字段呢?这是因为 json 默认把数值类型都当做 bigint 来处理,而 Flink SQL 中,表的事件时间必须为 timestamp 类型,所以我们需要进行转换;
注意:建表语句中尽量加 AS ,尤其字段涉及函数!
4)过滤出搜索数据
// TODO 4. 过滤出搜索数据
Table searchLog = tableEnv.sqlQuery("SELECT " +
"page['item'] item, " +
"time_ltz " +
"FROM dwd_traffic_page_log " +
"WHERE page['last_page_id' = 'search'] " +
"AND page['item_type'] = 'keyword' " +
"AND page['item'] is not null "
);
tableEnv.createTemporaryView("search_log_table",searchLog);
5)注册 udtf 函数并进行分词
// TODO 5. 注册 udtf & 分词
tableEnv.createTemporaryFunction("ik", IkUDTF.class);
Table splitTable = tableEnv.sqlQuery("SELECT " +
"word, " +
"time_ltz " +
"FROM search_log_table " +
"LATERAL TABLE(ik(item))"
);
tableEnv.createTemporaryView("split_table",splitTable);
6)分组、开窗、聚合
之前离线数仓写过窗口函数,但是都是没有边界的窗口。这里我们学习一下 Flink 中的三种窗口怎么用 Flink SQL 去写:
上面三种窗口分别对应:滚动,滑动和会话,下面是使用案例:
现在我们需要考虑将来写入到 ck 时,ck 应该采用什么引擎?
- 选择 SummingMergeTree
- 优点:自动预聚合,存储的内容少了,查询效率高
- 缺点:只能做求和指标,比如峰值指标就做不了。再有假如数据消费后挂了(Flink 读取后数据写入到 ck 了,但是这时候挂了,Flink 恢复后会重新消费,ck 就会重复处理。如果是别的引擎还好,因为数据不是聚合的状态,而是一条一条存储的,我们可以对数据根据 uuid 进行区分是否已经处理过)
- 选择ReplacingMergeTree
- 它有去重的功能,但是是在任务挂掉的时候我们才用得到(保证一致性)
- 可以做更多的指标
- 缺点就是会存储更多的数据
那么,我们当然选择 ReplacingMergeTree ,现在我们需要考虑去重字段(在 ck 中去重字段比主键都重要):
- 去重(order by 字段):
- 根据 窗口时间(起始+终止)+关键词 进行去重(这里会添加一个 source 字段区分日志的来源,比如 search、cart、order)
窗口的起始和终止时间同样有特定的函数来获取:
最终,我们的代码:
// TODO 6. 分组、开窗、聚合
Table resultTable = tableEnv.sqlQuery("SELECT " +
" date_format(tumble_start(time_ltz,interval '10' second),'yyyy-MM-dd HH:mm:ss') stt," +
" date_format(tumble_end(time_ltz,interval '10' second),'yyyy-MM-dd HH:mm:ss') edt," +
" 'search' source," +
" word keyword," +
" count(*) keyword_count," +
" unix_timestamp() ts" +
"FROM split_table" +
"GROUP BY word,tumble(time_ltz,interval '10' second)");
7)创建 ck 表格
create table if not exists dws_traffic_source_keyword_page_view_window
(
stt DateTime,
edt DateTime,
source String,
keyword String,
keyword_count UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt, source, keyword);
8)ck 工具类
上面第 6 步之后,我们得到了开窗聚合后的一个结果,要写入 ck 我们需要先将动态表转为流:
// TODO 7. 将动态表转换为流
DataStream<KeywordBean> dataStream = tableEnv.toAppendStream(resultTable, KeywordBean.class);
接着我们需要通过 JdbcSink 写出到 ck 集群中,因为之后每个聚合结果都是存在 DWS 层的,所以都会用到该 JdbcSink,所以我们统一封装成一个工具类:
public class ClickHouseUtil {
// 泛型方法需要再返回值类型前面放一个泛型
public static <T> SinkFunction<T> getSinkFunction(String sql) {
return JdbcSink.sink(
sql,
new JdbcStatementBuilder<T>() {
@SneakyThrows
@Override
public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
// 利用反射获得 t 对象的属性
Class<?> tClz = t.getClass();
int index = 1;
for (Field field : tClz.getDeclaredFields()) {
field.setAccessible(true); // 防止访问呢 private 属性失败
// 尝试获得字段上的注解
TransientSink transientSink = field.getAnnotation(TransientSink.class);
if (transientSink != null){
continue;
}
// 获得字段值
Object value = field.get(t);
// 给占位符赋值
preparedStatement.setObject(index++,value);
}
}
}, new JdbcExecutionOptions.Builder()
.withBatchSize(5)
.withBatchIntervalMs(1000L)
.build()
, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(GmallConfig.CLICKHOUSE_DRIVER)
.withUrl(GmallConfig.CLICKHOUSE_URL)
.build()
);
}
}
在代码中我们有一个获取注解的操作,是为了防止 JavaBean 中的字段(可能是辅助字段)在 ck 表中并没有能对应上的,所以我们通过注解来甄别:
@Retention(RetentionPolicy.RUNTIME) // 生效时机: 运行时
@Target(ElementType.FIELD) // 该注解的作用域: 属性上
public @interface TransientSink {
}
补全主程序:
// TODO 8. 写入 clickhouse
// 插入字段顺序尽量和ck库的表保持一致
dataStream.addSink(ClickHouseUtil.getSinkFunction(
"insert into dws_traffic_source_keyword_page_view_window " +
"values(?,?,?,?,?,?)"
));
// TODO 9. 启动任务
env.execute("DwsTrafficSourceKeywordPageViewWindow");
注意:因为我们是通过反射获取 Bean 对象字段来向 ck 表插入数据的,所以一定要保证 Bean 对象的顺序要和 ck 表对应上;
1.2、流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表
上面 DWS 的第一个需求我们是用 Flink SQL 来实现的,从这个需求开始,我们将使用 DataStream API 来实现;
1.2.1、需求分析
- 维度有 4 个:版本,渠道,地区和访客类别;
- 度量值有 5 个:会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数等;
关于独立访客数和跳出会话数我们之前在 DWD 层已经实现并分别写入到了 dwd_traffic_unique_visitor_detail(状态编程保存 lastVisitDate 实现) 和 dwd_traffic_user_jump_detail(Flink CEP 实现) 主题了;所以这里只需要分析前 2 个度量值怎么计算:
- 会话数
- 我们的数据中没有 session_id,但是要求也很简单:last_page_id 为 null 即代表一个新会话的开始
- 页面浏览数(PV)
- 页面浏览记录中每一行数据就是一个浏览记录(count(1) )
- 浏览总时长
- 还是从页面浏览记录中获取浏览时间(during_time)
思考:这三个度量值都可以从 dwd_traffic_page_log 中一次计算出来,但是怎么和另外两个来自不同主题的度量值聚合呢?
答:使用 join,根据 dws 表的粒度进行 join,但是在 SQL 中使用 join 的话也许好一点,可是我们使用 API 就比较复杂;所以其实我们还可以使用另一种方式实现——使用 union + 分组聚合也可以实现;
那么最终写入到 ck 中的字段其实一共有 12 个:4个维度字段 + 5 个度量值 + ts + 窗口起始、终止时间字段;
1.2.2、代码实现
1)建表语句
create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window
(
stt DateTime,
edt DateTime,
vc String,
ch String,
ar String,
is_new String,
uv_ct UInt64,
sv_ct UInt64,
pv_ct UInt64,
dur_sum UInt64,
uj_ct UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt, vc, ch, ar, is_new);
2)创建 ck 表对应的 Bean
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class TrafficPageViewBean {
// 窗口起始时间
String stt;
// 窗口结束时间
String edt;
// app 版本号
String vc;
// 渠道
String ch;
// 地区
String ar;
// 新老访客状态标记
String isNew ;
// 独立访客数
Long uvCt;
// 会话数
Long svCt;
// 页面浏览数
Long pvCt;
// 累计访问时长
Long durSum;
// 跳出会话数
Long ujCt;
// 时间戳
Long ts;
}
3)读取三个主题的数据
// TODO 3. 读取三个主题的数据
String uvTopic = "dwd_traffic_unique_visitor_detail";
String ujdTopic = "dwd_traffic_user_jump_detail";
String topic = "dwd_traffic_page_log";
String groupId = "dws_traffic_channel_page_view_window";
DataStreamSource<String> uvDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(uvTopic, groupId));
DataStreamSource<String> ujdDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(ujdTopic, groupId));
DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
4)统一数据格式
将 3 个主题的数据进行格式统一,方便后面 union + 聚合
// TODO 4. 统一数据格式
SingleOutputStreamOperator<TrafficPageViewBean> trafficWithUvDS = uvDS.map(line -> {
JSONObject jsonObject = JSONObject.parseObject(line);
JSONObject common = jsonObject.getJSONObject("common");
return new TrafficPageViewBean("", "",
common.getString("vc"),
common.getString("ch"),
common.getString("ar"),
common.getString("is_new"),
1L, 0L, 0L, 0L, 0L,
common.getLong("ts")
);
});
SingleOutputStreamOperator<TrafficPageViewBean> trafficWithUJ = ujdDS.map(line -> {
JSONObject jsonObject = JSONObject.parseObject(line);
JSONObject common = jsonObject.getJSONObject("common");
return new TrafficPageViewBean("", "",
common.getString("vc"),
common.getString("ch"),
common.getString("ar"),
common.getString("is_new"),
0L, 0L, 0L, 0L, 1L,
common.getLong("ts")
);
});
SingleOutputStreamOperator<TrafficPageViewBean> trafficWithSvPvDurSumDS = pageDS.map(line -> {
JSONObject jsonObject = JSONObject.parseObject(line);
JSONObject common = jsonObject.getJSONObject("common");
JSONObject page = jsonObject.getJSONObject("page");
return new TrafficPageViewBean("", "",
common.getString("vc"),
common.getString("ch"),
common.getString("ar"),
common.getString("is_new"),
0L,
page.getString("last_page_id") == null ? 1L : 0L,
1L,
page.getLong("during_time"),
0L,
common.getLong("ts")
);
});
注意: trafficWithUJ 这条流本就存在延时,所以很可能下面在 union 的时候,窗口都关闭了它还没来,所以我们只能给水位线的最大乱序等待时间 + 判定为用户跳出的最大时间(也就是超时时间);
5)三流 union
对三条流进行 union 然后提取出事件时间生成水位线,之后就需要开窗聚合了,而开窗聚合我们一般都会指定 keyby 再开窗,全窗口几乎不用;而 keyby 的字段我们选择 4 个维度(可以用 String 拼接也可以用一个四元组 Tuple4)
窗口分类:
- OpWindow:windowAll()
- KeyedWindow:window()
- 时间:滚动、滑动、会话
- 计数:滚动、滑动
// TODO 5. 三条流进行 union
DataStream<TrafficPageViewBean> unionDS = trafficWithUvDS.union(trafficWithUJDS, trafficWithSvPvDurSumDS);
// TODO 6. 提取事件时间(去 ts 字段生成水位线)
SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewWithWaterMarkDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TrafficPageViewBean>forBoundedOutOfOrderness(Duration.ofSeconds(14))
.withTimestampAssigner(new SerializableTimestampAssigner<TrafficPageViewBean>() {
@Override
public long extractTimestamp(TrafficPageViewBean element, long recordTimestamp) {
return element.getTs();
}
})
);
// TODO 7. 分组开窗聚合(按照维度做keyby)
WindowedStream<TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow> windowedStream = trafficPageViewWithWaterMarkDS.keyBy(new KeySelector<TrafficPageViewBean, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> getKey(TrafficPageViewBean value) throws Exception {
return Tuple4.of(value.getAr(),
value.getCh(),
value.getIsNew(),
value.getVc());
}
}).window(TumblingEventTimeWindows.of(Time.seconds(10)));
注意:这里在设置水位线延迟时间时,我们设置为 14,因为需求中包含用户跳出会话数,而跳出这个需求本就存在延迟(我们在 DWD 层设置了两种判断跳出策略(前提是按照 mid 分区):1. last_page_id = null & 下一条数据的 last_page_id 也为 null 2. last_page_id = null & 超时时间达到 10s 视作跳出)
6) 聚合
回顾一下窗口聚合函数:
- 增量聚合函数:来一条计算一条(效率高,存储数据量小)
- 全量聚合函数:可以求平均值和百分比,可以获取窗口信息
与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。
但是把计算放到窗口关闭才去计算无疑是低效的,毕竟如果数据量比较大的时候,这种方式肯定没有增量聚合函数计算的快。那为什么还要使用这种方式呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效(比如求平均值),这时做增量聚合就没什么意义了;
那么,现在我们应该对 keyby 后的数据流进行聚合,把相同 key 的度量值进行累加,那么我们应该选用哪种聚合函数呢?
选用增量聚合函数其实可以实现度量值的累加,但是由于我们的 ck 表中还有两个窗口字段需要补充(窗口起始和终止时间),所以我们需要获取窗口信息,那这就只能使用全量聚合函数了,毕竟全量窗口函数才能获得窗口信息;但是全窗口函数的计算往往是放到最后才执行的,这就很难受,那能不能结合二者的优点呢?
其实是可以的,我们在之前学习Flink 窗口的时候是讲过的:
增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。
而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
SingleOutputStreamOperator<TrafficPageViewBean> resultDS = windowedStream.reduce(new ReduceFunction<TrafficPageViewBean>() {
@Override
public TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {
value1.setSvCt(value1.getSvCt() + value2.getSvCt());
value1.setUvCt(value1.getUvCt() + value2.getUvCt());
value1.setUvCt(value1.getUjCt() + value2.getUjCt());
value1.setPvCt(value1.getPvCt() + value2.getPvCt());
value1.setDurSum(value1.getDurSum() + value2.getDurSum());
return value1;
}
}, new WindowFunction<TrafficPageViewBean, TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow>() {
@Override
public void apply(Tuple4<String, String, String, String> stringStringStringStringTuple4, TimeWindow window, Iterable<TrafficPageViewBean> input, Collector<TrafficPageViewBean> out) throws Exception {
// 获取数据
TrafficPageViewBean next = input.iterator().next();
// 补充信息
next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
// 修改 ts
next.setTs(System.currentTimeMillis());
// 输出数据
out.collect(next);
}
});
这样,我们既高效地完成了窗口聚合(增量聚合),也拿到了窗口信息(全量聚合获得起止时间);
7)写出到 clickhouse
// TODO 8. 写入 clickhouse
resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_traffic_channel_page_view_window " +
"values(?,?,?,?,?,?,?,?,?,?,?,?)"));
// TODO 9. 启动任务
env.execute("DwsTrafficVcChArIsNewPageViewWindow");
总结
至此,流量域两张汇总表创建完毕,关于流量域就剩一张表明天完成,先去吃饭;