学Flink第八章多流转换的时候,进行合流操作.connect()使用到了第九章状态编程的知识,感觉总体不是很清晰,因此学完状态编程后现在进行重温并细化一些细节
- 业务背景
- 步骤一:
用户进行支付的时候,后台是需要调用第三方服务平台进行服务,即用户支付请求,页面将会跳转到第三方支付平台支付
- 步骤二:
用户进行支付之后,第三方支付平台给到用户前端支出反馈,并且给我们平台发送用户已经付款的消息
- 步骤三:
第三方支付平台需要将钱再转入到我们平台账户
- 出现的问题以及需求
- 问题
如果进行到图中④,如果发生数据丢失,那么用户已经支付的消息无法传达给到后台,而后不能关闭订单
- 需求
因此需要进行实时对账操作,即用户提交的支付请求(客户端),以及第三方支付平台给到的请求(三方端),两者可以当成两条流
- 结果
如果进行两条流的操作后不匹配,那么将进行预警
- 一些细节考虑
-
两个流都给他标上时间戳(使用watermark标志)
-
使用状态编程保存状态以及设置定时器,来进行两条流的连接以及等待
-
如果对方流中有我流的数据,那么直接输出成功;如果没有则更新我流状态,注册定时器等待另一个流
-
然后用ontimer()触发定时器:判断条件如果两条流中还有状态没被清空,说明没匹配上
-
- 上代码
- 代码
public class BillCheckExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//来自app的支付日志
SingleOutputStreamOperator<Tuple3<String,String,Long>> appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L),
Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.
<Tuple3<String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
})
);
//来自第三方平台的支付日志
SingleOutputStreamOperator<Tuple4<String,String,String,Long>> thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.
<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String,String,String,Long>>() {
@Override
public long extractTimestamp(Tuple4<String,String,String,Long> element, long recordTimestamp) {
return element.f3;
}
})
);
//检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警
// //这种也可以
// appStream.keyBy(data->data.f0)
// .connect(thirdpartStream.keyBy(data -> data.f0));
//
appStream.connect(thirdpartStream)
.keyBy(data->data.f0,data-> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
//自定义实现CoFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String,String,Long>,
Tuple4<String,String,String,Long>,String>{
//定义状态变量,用来保存已经到达的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;
//运行上下文环境中获取状态
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
);
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
);
}
@Override
public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
//来的是app event,看另一条流中事件是否来过
if(thirdPartyEventState.value()!=null){
out.collect("对账成功:"+value+" "+thirdPartyEventState.value());
//清空状态
thirdPartyEventState.clear();
}else{
//如果没来就等待,并且更新状态
appEventState.update(value);
//注册一个5秒后的定时器,开始等待另一条的事件
ctx.timerService().registerEventTimeTimer(value.f2+5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
//来的是app event,看另一条流中事件是否来过
if(appEventState.value()!=null){
out.collect("对账成功:"+appEventState.value()+" "+value);
//清空状态
appEventState.clear();
}else{
//如果没来就等待,并且更新状态
thirdPartyEventState.update(value);
//注册一个5秒后的定时器,开始等待另一条的事件
ctx.timerService().registerEventTimeTimer(value.f3);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来
//并且不会存在两个都不为空,因为其中一个不为空后会被清除
//没有没清空表示失败
if(appEventState.value()!=null){
out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
}
if(thirdPartyEventState.value()!=null){
out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
}
//清空所有数据
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
- 结果
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账成功:(order-3,app,3500) (order-3,third-party,success,4000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到