目录
一:流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL)
1.1 主要任务:
1.2 思路分析:
1.3 图解:
1.4 ClickHouse建表语句:
二:流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表
2.1 主要任务
2.2 思路分析
2.3 图解
2.4 ClickHouse建表语句
三:流量域页面浏览各窗口汇总表
3.1 主要任务
3.2 思路分析
3.3 图解
3.4 ClickHouse建表语句
四:用户域用户登陆各窗口汇总表
4.1 主要任务
4.2 思路分析
4.3 图解
4.4 ClickHouse建表语句
DWS层设计要点:
(1)DWS层的设计参考指标体系;
(2)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)
注:window 表示窗口对应的时间范围。
一:流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL)
1.1 主要任务:
从 Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。
1.2 思路分析:
本程序将使用 FlinkSQL 实现。分词是个一进多出的过程,需要一个 UDTF 函数来实现,FlinkSQL 没有提供相关的内置函数,所以要自定义 UDTF 函数。
自定义函数的逻辑在代码中实现,要完成分词功能,需要导入相关依赖,此处将借助 IK 分词器完成分词。
最终要将数据写入 ClickHouse,需要补充相关依赖,封装 ClickHouse 工具类和方法。本节任务分为两部分:分词处理和数据写出。
1)分词处理
分词处理分为八个步骤,如下。
(1)创建分词工具类
定义分词方法,借助 IK 分词器提供的工具将输入的关键词拆分成多个词,返回一个 List 集合。
(2)创建自定义函数类
继承 Flink 的 TableFunction 类,调用分词工具类的分词方法,实现分词逻辑。
(3)注册函数
(4)从 Kafka 页面浏览明细主题读取数据并设置水位线
(5)过滤搜索行为
满足以下三个条件的即为搜索行为数据:
① page 字段下 item 字段不为 null;
② page 字段下 last_page_id 为 search;
③ page 字段下 item_type 为 keyword。
(6)分词
(7)分组、开窗、聚合计算
按照拆分后的关键词分组。统计每个词的出现频次,补充窗口起始时间、结束时间和关键词来源(source)字段。调用 unix_timestamp() 函数获取以秒为单位的当前系统时间戳,转为毫秒(*1000),作为 ClickHouse 表的版本字段,用于数据去重。
(8)将动态表转换为流
2)将数据写入 ClickHouse
(1)建表
要将数据写入 ClickHouse,先要建表。首先要明确使用的表引擎。为了保证数据不重复,可以使用 ReplacingMergeTree(替换合并树) 或者 ReplicatedMergeTree(副本合并树),二者均可去重,区别如下。
① 副本通过对比插入的“数据块”(同一批次写入的数据)实现去重,如果插入的两批数据相似度达到 ClickHouse 的判断标准后插入的数据会被舍弃。副本的初衷是防止数据丢失,而非去重,如果重复数据夹杂在不同的数据块中并不能实现去重效果。假设向 ClickHouse 写入数据时 5 条一批,第一批次 ABCDE 第二批 FAGHI,只要没有达到 ClickHouse 对数据块重复的判断标准,重复的 A 依然会被写入。
② ReplacingMergeTree 在建表时需要定义版本字段,它会对比排序字段(在 ClickHouse 中排序字段可以唯一标识一行数据)相同数据的版本字段,如果设置了该字段,且多条数据的该字段值不同,则保留版本字段值最大的数据,如果没有设置该字段或者多条数据该字段的值相同,则按插入顺序保留最后一条。数据的去重只会在数据合并期间进行。合并操作会在后台一个不确定的时间执行,无法预先做出计划。因此无法保证每时每刻数据不会重复。可以执行 optimize table xxx final 手动对分区进行合并。
此处选择 ReplacingMergeTree,主要考虑到虽然去重有延迟,但在必要时可以通过optimize 去重。但这个命令会引发大量读写操作,对 ClickHouse 而言是非常重的,极其影响性能。生产环境不可能在每次查询前都做一次合并操作,不可过多依赖 optimize 去重。
(2)写出方式
调用Flink提供的JDBCSink.<T>sink(String sql,JdbcStatementBuilder<T> statementBuilder, JdbcExecutionOptions executionOptions, JdbcConnectionOptions connectionOptions) 方法创建 JDBC sink,返回 SinkFunction 类型的对象,将其作为流调用 addSink() 方法的参数,即可将数据以 JDBC 方式写入数据库。这种方式只能写入数据库中的一张表。参数解读如下
-
- sql:任意的 DML 语句。
- statementBuilder:构造者类 JDBCStatementBuilder 对象,用于为数据库操作对象 (PreparedStatement 对象)中的占位符传参。核心方法 accept(PreparedStatement preparedStatement, T obj),参数解读如下。
- preparedStatement:数据库操作对象。
- obj:流中数据对象。要给占位符传参,就必须将 SQL 中的占位符和流中数据对应起来。然而,不同 SQL 语句的占位符数量可能不同,不可能设置一个统一的数值指定占位符个数,然后简单地通过固定次数的循环完成传参。那么,如何在程序中将占位符和流中数据对应起来?可以这样做,用传入方法的流中数据对象(obj)获取类的 Class 对象,然后通过反射的方式获取所有属性的 Field 对象,再调用 field 对象的 setObject() 方法将流中数据传递给 SQL 中的占位符,完成传参。
- T:泛型,指定流中数据类型。
- executionOptions:SQL DML 语句是按照批次执行的,该参数用于设置执行参数,API 如下。
- withBatchIntervalMs(long intervalMs) 设置批处理时间间隔,单位毫秒。默认值为0,表示不会基于时间对批处理进行控制。
- withBatchSize(int size) 设置批次大小(数据的条数),默认为 5000 条。
- withMaxRetries(int maxRetries) 设置最大重试次数,默认为 3 次。
- 批处理触发条件(满足其一即可):
- 距离上次数据插入经过了 withBatchIntervalMs 设置的时间间隔
- 数据量达到批大小
- Flink 检查点启动时
- connectionOptions:设置数据库连接参数
- withUrl:数据库 URL
- withDriverName:数据库驱动名称
- withUsername:连接数据库的用户名
- withPassword:连接数据库的密码
(3)TransientSink
在实体类中某些字段是为了辅助指标计算而设置的,并不会写入到数据库。那么,如何告诉程序哪些字段不需要写入数据库呢?Java 的反射提供了解决思路。类的属性对象 Field 可以调用 getAnnotation(Class annotationClass) 方法获取写在类中属性定义语句上方的注解中的信息,若注解存在则返回值不为 null。
定义一个可以写在属性上的注解,对于不需要写入数据库的属性,在实体类中属性定义语句上方添加该注解。为数据库操作对象传参时判断注解是否存在,是则跳过属性,即可实现对属性的排除。
1.3 图解:
1.4 ClickHouse建表语句:
drop table if exists dws_traffic_source_keyword_page_view_window;
create table if not exists dws_traffic_source_keyword_page_view_window
(
stt DateTime,
edt DateTime,
source String,
keyword String,
keyword_count UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt, source, keyword);
二:流量域版本-渠道-地区-访客类别粒度页面浏览各窗口汇总表
2.1 主要任务
DWS 层是为 ADS 层服务的,通过对指标体系的分析,本节汇总表中需要有会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数五个度量字段。本节的任务是统计这五个指标,并将维度和度量数据写入 ClickHouse 汇总表。
2.2 思路分析
任务可以分为两部分:统计指标的计算和数据写出,数据写出在10.1 节已有介绍,不再赘述。此处仅对统计指标计算进行分析。
会话数、页面浏览数和浏览总时长三个指标均与页面浏览有关,可以由 DWD 层页面浏览明细表获得。独立访客数可以由 DWD 层的独立访客明细表获得,跳出会话数可以由 DWD 层的用户跳出明细表获得。
三个主题读取的数据会在程序中被封装为三条流。处理后的数据要写入 ClickHouse 的同一张表,那么三条流的数据结构必须完全一致,这个问题很好解决,只要定义与表结构对应的实体类,然后将流中数据结构转换为实体类即可。除此之外,还有个问题需要考虑,三条流是否需要合并?ClickHouse 表的字段将按照窗口 + 表中所有维度做 order by,排序键是 ClickHouse 中的唯一键。如果三条流分别将数据写出到 ClickHouse,则对于唯一键相同的数据,不考虑重复写入的情况下会存在三条需要保留的数据(度量数据分别存在于三条数据中)。我们使用了 ReplacingMergeTree,在分区合并时会按照排序键去重,排序字段相同的数据仅保留一条,将造成数据丢失。显然,这种方案是不可行的。此处将三条流合并为一条,对于每一排序键只生成一条数据。
1)知识储备
常见的多流合并算子及应用场景如下。
- union():用于两条及多条流之间的合并,对流的数量没有限制,但是要求所有流中的数据结构完全一致。
- connect():用于两条流的合并,其后紧邻的 process 算子中可以使用的 CoProcessFunction 是双流处理最底层的 API,可以通过键控状态和定时器的运用实现join、广播join、段join等各种关联。connect() 只能对两条流做关联,且对两条流的数据结构没有要求。
- intervalJoin:段 join,两条流的每一条数据都可以与另一条流某个时间范围内的数据做关联。底层实现原理:以 A.intervalJoin(B) 为例,A 流中的数据进入算子后,会被保存到键控状态中,同时注册一个定时器,定时器触发时清空 A 流状态中的数据。在定时器触发之前,B 流中的每一条数据都可以与状态中保存的 A 流数据关联。同理,B 流中也维护了状态定时器。由此实现了段 join。假定A流中的定时器存在时长为3s,B流中的定时器存在时长为5s,A 流中某条数据抵达时间为 tA,可与 tA – 5s ~ tA + 3s 时间范围内抵达的 B 流数据关联;B 流中某条数据抵达时间为 tB,可与 tB – 3s ~ tB + 5s 时间范围内抵达的 A 流数据关联。
- join():该算子的功能可以被其它算子替代,目前基本不用。
connect()、intervalJoin()、join() 都是双流合并算子,本节对三条流进行合并,且流中数据结构一致,选择 union() 更为合理。
2)执行步骤
(1)读取页面主题数据,封装为流
(2)统计页面浏览时长、页面浏览数、会话数,转换数据结构
创建实体类,将独立访客数、跳出会话数置为 0,将页面浏览数置为 1(只要有一条页面浏览日志,则页面浏览数加一),获取日志中的页面浏览时长,赋值给实体类的同名字段,最后判断 last_page_id 是否为 null,如果是,说明页面是首页,开启了一个新的会话,将会话数置为 1,否则置为 0。补充维度字段,窗口起始和结束时间置为空字符串。下游要根据水位线开窗,所以要补充事件时间字段,此处将日志生成时间 ts 作为事件时间字段即可。最后将实体类对象发往下游。
(3)读取用户跳出明细数据
(4)转换用户跳出流数据结构封装实体类,维度字段和时间戳处理与页面流相同,跳出数置为1,其余度量字段置为 0。将数据发往下游。
(5)读取独立访客明细数据
(6)转换独立访客流数据结构处理过程与跳出流同理。
(7)union 合并三条流
(8)设置水位线;
(9)按照维度字段分组;
(10)开窗
跳出行为判定的超时时间为 10s,假设某条日志属于跳出数据,如果它对应的事件时间为 15s,要判定是否跳出需要在水位线达到 25s 时才能做到,若窗口大小为 10s,这条数据应进入 10~20s 窗口,但是拿到这条数据时水位线已达到 25s,所属窗口已被销毁。这样就导致跳出会话数永远为 0,显然是有问题的。要避免这种情况,必须设置窗口延迟关闭,延迟关闭时间大于等于跳出判定的超时时间才能保证跳出数据不会被漏掉。但是这样会严重影响时效性,如果企业要求延迟时间设置为半小时,那么窗口就要延迟半小时关闭。要统计跳出行为相关的指标,就必须接受它对时效性带来的负面影响。
(11)聚合计算
度量字段求和,每个窗口数据聚合完毕之后补充窗口起始时间和结束时间字段。
在 ClickHouse 中,ts 将作为版本字段用于去重,ReplacingMergeTree 会在分区合并时对比排序字段相同数据的 ts,保留 ts 最大的数据。此处将时间戳字段置为当前系统时间,这样可以保证数据重复计算时保留的是最后一次计算的结果。
(12)将数据写入 ClickHouse。
2.3 图解
2.4 ClickHouse建表语句
drop table if exists dws_traffic_vc_ch_ar_is_new_page_view_window;
create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window
(
stt DateTime,
edt DateTime,
vc String,
ch String,
ar String,
is_new String,
uv_ct UInt64,
sv_ct UInt64,
pv_ct UInt64,
dur_sum UInt64,
uj_ct UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt, vc, ch, ar, is_new);
三:流量域页面浏览各窗口汇总表
3.1 主要任务
从 Kafka 页面日志主题读取数据,统计当日的首页和商品详情页独立访客数。
3.2 思路分析
1)读取 Kafka 页面主题数据
2)转换数据结构
将流中数据由 String 转换为 JSONObject。
3)过滤数据
仅保留 page_id 为 home 或 good_detail 的数据,因为本程序统计的度量仅与这两个页面有关,其它数据无用。
4)设置水位线
5)按照 mid 分组
6)统计首页和商品详情页独立访客数,转换数据结构
运用 Flink 状态编程,为每个 mid 维护首页和商品详情页末次访问日期。如果 page_id 为 home,当状态中存储的日期为 null 或不是当日时,将 homeUvCt(首页独立访客数) 置为 1,并将状态中的日期更新为当日。否则置为 0,不做操作。商品详情页独立访客的统计同理。当 homeUvCt 和 detailUvCt 至少有一个不为 0 时,将统计结果和相关维度信息封装到定义的实体类中,发送到下游,否则舍弃数据。
7)开窗
8)聚合
9)将数据写出到 ClickHouse
3.3 图解
3.4 ClickHouse建表语句
drop table if exists dws_traffic_page_view_window;
create table if not exists dws_traffic_page_view_window
(
stt DateTime,
edt DateTime,
home_uv_ct UInt64,
good_detail_uv_ct UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt);
四:用户域用户登陆各窗口汇总表
4.1 主要任务
从 Kafka 页面日志主题读取数据,统计七日回流用户和当日独立用户数。
4.2 思路分析
之前的活跃用户,一段时间未活跃(流失),今日又活跃了,就称为回流用户。此处要求统计回流用户总数。规定当日登陆,且自上次登陆之后至少 7 日未登录的用户为回流用户。
1)读取 Kafka 页面主题数据
2)转换数据结构
流中数据由 String 转换为 JSONObject。
3)过滤数据
统计的指标与用户有关,uid 不为 null 的数据才是有用的。此外,登陆分为两种情况:(1)用户打开应用后自动登录;(2)用户打开应用后没有登陆,浏览部分页面后跳转到登录页面,中途登陆。对于情况(1),登录操作发生在会话首页,所以保留首页即可;对于情况(2),登陆操作发生在 login 页面,login 页面之后必然会跳转到其它页面,保留 login 之后的页面即可记录情况(2)的登陆操作。
综上,我们应保留 uid 不为 null 且 last_page_id 为 null 或 last_page_id 为 login 的浏览记录。
4)设置水位线
5)按照 uid 分组
不同用户的登陆记录互不相干,各自处理。
6)统计回流用户数和独立用户数
运用 Flink 状态编程,记录用户末次登陆日期。
(1)若状态中的末次登陆日期不为 null,进一步判断。
① 如果末次登陆日期不等于当天日期则独立用户数 uuCt 记为 1,并将状态中的末次登陆日期更新为当日,进一步判断。
a)如果当天日期与末次登陆日期之差大于等于 8 天则回流用户数 backCt 置为 1。
b)否则 backCt 置为 0。
② 若末次登陆日期为当天,则 uuCt 和 backCt 均为 0,此时本条数据不会影响统计结果,舍弃,不再发往下游。
(2)如果状态中的末次登陆日期为 null,将 uuCt 置为 1,backCt 置为 0,并将状态中的末次登陆日期更新为当日。
7)开窗,聚合
度量字段求和,补充窗口起始和结束时间,时间戳字段置为当前系统时间,用于 ClickHouse 数据去重。
8)写入 ClickHouse
4.3 图解
4.4 ClickHouse建表语句
drop table if exists dws_user_user_login_window;
create table if not exists dws_user_user_login_window
(
stt DateTime,
edt DateTime,
back_ct UInt64,
uu_ct UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt);