文章目录
- 一 DWS层-商品主题计算
- 1 把JSON字符串数据流转换为统一数据对象的数据流
- (1)转换订单宽表流数据
- (2)转换支付宽表流数据
- 2 把统一的数据结构流合并为一个流
- (1)代码
- (2)测试
- 3 设定事件时间与水位线
- 4 分组、开窗、聚合
- 5 补充商品维度信息
- (1)关联商品维度
- (2)关联SPU维度
- (3)关联品类维度
- (4)关联品牌维度
- (5)测试
- 6 写入ClickHouse
- (1)在ClickHouse中创建商品主题宽表
- (2)为主程序增加写入ClickHouse的Sink
- (3)整体测试
- 二 DWS层-地区主题表(FlinkSQL)
- 1 需求分析与思路
- 2 在pom.xml文件中添加FlinkSQL相关依赖
- 3 创建ProvinceStatsSqlApp,定义Table流环境
- 4 MyKafkaUtil增加一个DDL的方法
- 5 把数据源定义为动态表并指定水位线
- (1)指定`WATERMARK`
- (2)系统内置函数
- (3)给计算列起别名
- (4)完整代码
一 DWS层-商品主题计算
1 把JSON字符串数据流转换为统一数据对象的数据流
(1)转换订单宽表流数据
// 4.6 转换订单宽表流数据
SingleOutputStreamOperator<ProductStats> orderWideStatsDS = orderWideStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
OrderWide orderWide = JSON.parseObject(jsonStr, OrderWide.class);
ProductStats productStats = ProductStats.builder()
.sku_id(orderWide.getSku_id())
.order_sku_num(orderWide.getSku_num())
.order_amount(orderWide.getSplit_total_amount())
.ts(DateTimeUtil.toTs(orderWide.getCreate_time()))
.orderIdSet(new HashSet(Collections.singleton(orderWide.getOrder_id())))
.build();
return productStats;
}
}
);
(2)转换支付宽表流数据
// 4.7 转换支付宽表流数据
SingleOutputStreamOperator<ProductStats> paymentWideStatsDS = paymentWideStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
PaymentWide paymentWide = JSON.parseObject(jsonStr, PaymentWide.class);
ProductStats productStats = ProductStats.builder()
.sku_id(paymentWide.getSku_id())
.payment_amount(paymentWide.getSplit_total_amount())
.paidOrderIdSet(new HashSet(Collections.singleton(paymentWide.getOrder_id())))
.ts(DateTimeUtil.toTs(paymentWide.getCallback_time()))
.build();
return productStats;
}
}
);
2 把统一的数据结构流合并为一个流
(1)代码
// TODO 5 将不同流的数据通过union合并到一起
DataStream<ProductStats> unionDS = clickAndDisplayStatsDS.union(
favorStatsDS,
cartStatsDS,
refundStatsDS,
commentStatsDS,
orderWideStatsDS,
paymentWideStatsDS
);
unionDS.print(">>>");
(2)测试
-
启动ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell
redis-server /home/hzy/redis2022.conf sudo systemctl start clickhouse-server
-
运行BaseLogApp
-
运行BaseDBApp
-
运行OrderWideApp
-
运行PaymentWideApp
-
运行ProductsStatsApp
-
运行rt_applog目录下的jar包(可以获取到曝光 display_ct和点击数据click_ct)
-
运行rt_dblog目录下的jar包(可以获取到sku_id、cart_ct、favor_ct,下单数量,下单金额、orderIdSet等)
-
查看控制台输出(如电脑性能不够,可以将日志和业务两条线分开测试)
3 设定事件时间与水位线
// TODO 6 指定watermark以及提取时间时间字段
SingleOutputStreamOperator<ProductStats> productStatsWithWatermarkDS = unionDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<ProductStats>() {
@Override
public long extractTimestamp(ProductStats productStats, long recordTimestamp) {
return productStats.getTs();
}
}
)
);
4 分组、开窗、聚合
// TODO 7 分组 -- 按照商品id分组
KeyedStream<ProductStats, Long> keyedDS = productStatsWithWatermarkDS.keyBy(ProductStats::getSku_id);
// TODO 8 开窗
WindowedStream<ProductStats, Long, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
// TODO 9 聚合计算
SingleOutputStreamOperator<ProductStats> reduceDS = windowDS.reduce(
new ReduceFunction<ProductStats>() {
@Override
public ProductStats reduce(ProductStats stats1, ProductStats stats2) throws Exception {
stats1.setDisplay_ct(stats1.getDisplay_ct() + stats2.getDisplay_ct());
stats1.setClick_ct(stats1.getClick_ct() + stats2.getClick_ct());
stats1.setCart_ct(stats1.getCart_ct() + stats2.getCart_ct());
stats1.setFavor_ct(stats1.getFavor_ct() + stats2.getFavor_ct());
stats1.setOrder_amount(stats1.getOrder_amount().add(stats2.getOrder_amount()));
stats1.getOrderIdSet().addAll(stats2.getOrderIdSet());
stats1.setOrder_ct(stats1.getOrderIdSet().size() + 0L);
stats1.setOrder_sku_num(stats1.getOrder_sku_num() + stats2.getOrder_sku_num());
stats1.setPayment_amount(stats1.getPayment_amount().add(stats2.getPayment_amount()));
stats1.getRefundOrderIdSet().addAll(stats2.getRefundOrderIdSet());
stats1.setRefund_order_ct(stats1.getRefundOrderIdSet().size() + 0L);
stats1.setRefund_amount(stats1.getRefund_amount().add(stats2.getRefund_amount()));
stats1.getPaidOrderIdSet().addAll(stats2.getPaidOrderIdSet());
stats1.setPaid_order_ct(stats1.getPaidOrderIdSet().size() + 0L);
stats1.setComment_ct(stats1.getComment_ct() + stats2.getComment_ct());
stats1.setGood_comment_ct(stats1.getGood_comment_ct() + stats2.getGood_comment_ct());
return stats1;
}
},
new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<ProductStats> elements, Collector<ProductStats> out) throws Exception {
for (ProductStats productStats : elements) {
productStats.setStt(DateTimeUtil.toYMDHMS(new Date(context.window().getStart())));
productStats.setEdt(DateTimeUtil.toYMDHMS(new Date(context.window().getEnd())));
productStats.setTs(new Date().getTime());
out.collect(productStats);
}
}
}
);
5 补充商品维度信息
因为除了下单操作之外,其它操作,只获取到了商品的id,其它维度信息是没有的。
(1)关联商品维度
SingleOutputStreamOperator<ProductStats> productStatsWithSkuDS = AsyncDataStream.unorderedWait(
reduceDS,
new DimAsyncFunction<ProductStats>("DIM_SKU_INFO") {
@Override
public void join(ProductStats productStats, JSONObject dimJsonObj) throws Exception {
productStats.setSku_name(dimJsonObj.getString("SKU_NAME"));
productStats.setSku_price(dimJsonObj.getBigDecimal("PRICE"));
productStats.setCategory3_id(dimJsonObj.getLong("CATEGORY3_ID"));
productStats.setSpu_id(dimJsonObj.getLong("SPU_ID"));
productStats.setTm_id(dimJsonObj.getLong("TM_ID"));
}
@Override
public String getKey(ProductStats productStats) {
return productStats.getSku_id().toString();
}
},
60, TimeUnit.SECONDS
);
(2)关联SPU维度
SingleOutputStreamOperator<ProductStats> productStatsWithSpuDS =
AsyncDataStream.unorderedWait(productStatsWithSkuDS,
new DimAsyncFunction<ProductStats>("DIM_SPU_INFO") {
@Override
public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
productStats.setSpu_name(jsonObject.getString("SPU_NAME"));
}
@Override
public String getKey(ProductStats productStats) {
return String.valueOf(productStats.getSpu_id());
}
}, 60, TimeUnit.SECONDS
);
(3)关联品类维度
SingleOutputStreamOperator<ProductStats> productStatsWithCategory3DS =
AsyncDataStream.unorderedWait(productStatsWithSpuDS,
new DimAsyncFunction<ProductStats>("DIM_BASE_CATEGORY3") {
@Override
public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
productStats.setCategory3_name(jsonObject.getString("NAME"));
}
@Override
public String getKey(ProductStats productStats) {
return String.valueOf(productStats.getCategory3_id());
}
}, 60, TimeUnit.SECONDS
);
(4)关联品牌维度
SingleOutputStreamOperator<ProductStats> productStatsWithTmDS =
AsyncDataStream.unorderedWait(productStatsWithCategory3DS,
new DimAsyncFunction<ProductStats>("DIM_BASE_TRADEMARK") {
@Override
public void join(ProductStats productStats, JSONObject jsonObject) throws Exception {
productStats.setTm_name(jsonObject.getString("TM_NAME"));
}
@Override
public String getKey(ProductStats productStats) {
return String.valueOf(productStats.getTm_id());
}
}, 60, TimeUnit.SECONDS
);
productStatsWithTmDS.print(">>>");
(5)测试
- 运行rt_applog目录下的jar包。
- 运行rt_dblog目录下的jar包,执行两次以改变水位线,触发窗口提交操作。
6 写入ClickHouse
(1)在ClickHouse中创建商品主题宽表
create table product_stats_2022 (
stt DateTime,
edt DateTime,
sku_id UInt64,
sku_name String,
sku_price Decimal64(2),
spu_id UInt64,
spu_name String ,
tm_id UInt64,
tm_name String,
category3_id UInt64,
category3_name String ,
display_ct UInt64,
click_ct UInt64,
favor_ct UInt64,
cart_ct UInt64,
order_sku_num UInt64,
order_amount Decimal64(2),
order_ct UInt64 ,
payment_amount Decimal64(2),
paid_order_ct UInt64,
refund_order_ct UInt64,
refund_amount Decimal64(2),
comment_ct UInt64,
good_comment_ct UInt64 ,
ts UInt64
)engine =ReplacingMergeTree( ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,sku_id );
(2)为主程序增加写入ClickHouse的Sink
// TODO 11 将结果写入到ClickHouse
productStatsWithTmDS.addSink(
ClickhouseUtil.getJdbcSink(
"insert into product_stats_2022 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
);
(3)整体测试
- 启动ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell
- 运行BaseLogApp
- 运行BaseDBApp
- 运行OrderWideApp
- 运行PaymentWideApp
- 运行ProductsStatsApp
- 运行rt_applog目录下的jar包
- 运行rt_dblog目录下的jar包
- 查看控制台输出
- 查看ClickHouse中products_stats_2022表数据
注意:一定要匹配两个数据生成模拟器的日期,否则窗口无法匹配上。
二 DWS层-地区主题表(FlinkSQL)
统计主题 | 需求指标 | 输出方式 | 计算来源 | 来源层级 |
---|---|---|---|---|
地区 | pv | 多维分析 | page_log直接可求 | dwd |
uv | 多维分析 | 需要用page_log过滤去重 | dwm | |
下单(单数,金额) | 可视化大屏 | 订单宽表 | dwm |
地区主题主要是反映各个地区的销售情况。从业务逻辑上地区主题比起商品更加简单,业务逻辑也没有什么特别的就是做一次轻度聚合然后保存,所以使用flinkSQL,来完成该业务。
1 需求分析与思路
- 定义Table流环境
- 把数据源定义为动态表
- 通过SQL查询出结果表
- 把结果表转换为数据流
- 把数据流写入目标数据库
如果是Flink官方支持的数据库,也可以直接把目标数据表定义为动态表,用insert into 写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、 PostgreSQL、Derby)。也可以制作自定义sink,实现官方不支持的连接器。但是比较繁琐。
2 在pom.xml文件中添加FlinkSQL相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
3 创建ProvinceStatsSqlApp,定义Table流环境
package com.hzy.gmall.realtime.app.dws;
/**
* 地区主题统计 -- SQL
*/
public class ProvinceStatsSqlApp {
public static void main(String[] args) throws Exception {
// TODO 1 环境准备
// 1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.3 设置并行度
env.setParallelism(4);
// TODO 2 检查点相关设置(略)
env.execute();
}
}
4 MyKafkaUtil增加一个DDL的方法
public static String getKafkaDDL(String topic,String groupId){
String ddl = "'connector' = 'kafka'," +
" 'topic' = '"+topic+"'," +
" 'properties.bootstrap.servers' = '"+KAFKA_SERVER+"'," +
" 'properties.group.id' = '"+groupId+"'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'format' = 'json'";
return ddl;
}
5 把数据源定义为动态表并指定水位线
(1)指定WATERMARK
WATERMARK
定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
。
rowtime_column_name
把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)
,且是 schema 中的顶层列,它也可以是一个计算列。相当于是API格式中的提取时间字段操作。
watermark_strategy_expression
定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3)
,表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval
中所配置的间隔发出。 若 watermark 的间隔是 0ms
,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。
Flink 提供了几种常用的 watermark 策略。
-
严格递增时间戳:
WATERMARK FOR rowtime_column AS rowtime_column
。发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。
-
递增时间戳:
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
。发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。相当于是API格式中的单调递增策略。
-
有界乱序时间戳:
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
。发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如,
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
是一个 5 秒延迟的 watermark 策略。
其中WATERMARK FOR rowtime AS rowtime是把某个字段设定为EVENT_TIME。
详细说明。
(2)系统内置函数
将字符串转换为时间戳。
TO_TIMESTAMP(string1[, string2]) | Converts date time string string1 with format string2 (by default: ‘yyyy-MM-dd HH:mm:ss’) under the session time zone (specified by TableConfig) to a timestamp.Only supported in blink planner. |
---|---|
详细说明。
(3)给计算列起别名
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
(4)完整代码
字段名要和kafka中的json属性完全一致。
// TODO 3 从指定的数据源(kafka)读取数据,转换为动态表,并指定水位线
String orderWideTopic = "dwm_order_wide";
String groupId = "province_stats";
tableEnv.executeSql("CREATE TABLE order_wide (" +
" province_id BIGINT," +
" province_name STRING," +
" province_area_code STRING," +
" province_iso_code STRING," +
" province_3166_2_code STRING," +
" order_id STRING," +
" split_total_amount DOUBLE," +
" create_time STRING," +
" rowtime as TO_TIMESTAMP(create_time)," +
" WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND" +
" ) WITH (" + MyKafkaUtil.getKafkaDDL(orderWideTopic,groupId) +")");