一、说明
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。
二、思路
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。
三、数据准备
订单数据从OrderLog.csv中读取,交易数据从ReceiptLog.csv中读取
JavaBean类的准备
四、代码
package com.lyh.flink06;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
public class Project_Order {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
SingleOutputStreamOperator<OrderEvent> orderEventString = env.readTextFile("input/OrderLog.csv")
.map(line -> {
String[] data = line.split(",");
return new OrderEvent(
Long.valueOf(data[0]),
data[1],
data[2],
Long.valueOf(data[3])
);
}).filter(log -> "pay".equals(log.getEventType()));
SingleOutputStreamOperator<TxEvent> txEventString = env.readTextFile("input/ReceiptLog.csv")
.map(line -> {
String[] data = line.split(",");
return new TxEvent(
data[0],
data[1],
Long.valueOf(data[2])
);
});
orderEventString.connect(txEventString)
.keyBy(OrderEvent::getTxId,TxEvent::getTxId)
.process(new KeyedCoProcessFunction<String, OrderEvent, TxEvent, String>() {
Map<String,OrderEvent> OrderEventmap = new HashMap<>();
Map<String,TxEvent> TxEventmap = new HashMap<>();
@Override
public void processElement1(OrderEvent value,
Context ctx,
Collector<String> out) throws Exception {
TxEvent txEvent = TxEventmap.get(ctx.getCurrentKey());
if (txEvent != null) {
out.collect("订单" + value.getOrderId() + "对账成功");
}else {
OrderEventmap.put(ctx.getCurrentKey(),value);
}
}
@Override
public void processElement2(TxEvent value,
Context ctx,
Collector<String> out) throws Exception {
OrderEvent orderEvent = OrderEventmap.get(ctx.getCurrentKey());
if (orderEvent != null) {
out.collect("订单" + orderEvent.getOrderId() + "对账成功");
}else {
TxEventmap.put(ctx.getCurrentKey(),value);
}
}
}).print();
env.execute();
}
}