【flink应用系列】1.Flink银行反欺诈系统设计方案
- 1. 经典案例:短时间内多次大额交易
- 1.1 场景描述
- 1.2 风险判定逻辑
- 2. 使用Flink实现
- 2.1 实现思路
- 2.2 代码实现
- 2.3 使用Flink流处理
- 3. 使用Flink CEP实现
- 3.1 实现思路
- 3.2 代码实现
- 4. 总结
1. 经典案例:短时间内多次大额交易
1.1 场景描述
规则1:单笔交易金额超过10,000元。
规则2:同一用户在10分钟内进行了3次或更多次交易。
风险行为:同时满足规则1和规则2的交易行为。
1.2 风险判定逻辑
检测每笔交易是否满足“单笔交易金额超过10,000元”。
对同一用户,统计10分钟内的交易次数。
如果交易次数达到3次或更多,则判定为风险行为。
2. 使用Flink实现
2.1 实现思路
使用Flink的KeyedStream按用户分组。
使用ProcessFunction实现自定义窗口逻辑,统计10分钟内的交易次数。
结合规则1和规则2,判断是否为风险行为。
2.2 代码实现
// 定义交易数据POJO
public class Transaction {
private String transactionId;
private String userId;
private Double amount;
private Long timestamp;
// getters and setters
}
// 定义风控结果POJO
public class RiskResult {
private String userId;
private String transactionId;
private String riskLevel;
private String actionTaken;
private Long createTime;
// getters and setters
}
// 实现风控逻辑
public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> {
private transient ValueState<Integer> transactionCountState;
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
// 初始化状态
ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>(
"transactionCount",
Types.INT
);
transactionCountState = getRuntimeContext().getState(countDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timerState",
Types.LONG
);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context ctx,
Collector<RiskResult> out) throws Exception {
// 规则1:单笔交易金额超过10,000元
if (transaction.getAmount() > 10000) {
// 更新交易次数
Integer count = transactionCountState.value();
if (count == null) {
count = 0;
}
count += 1;
transactionCountState.update(count);
// 如果是第一次满足规则1,设置10分钟的定时器
if (count == 1) {
long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分钟
ctx.timerService().registerEventTimeTimer(timer);
timerState.update(timer);
}
// 规则2:10分钟内交易次数达到3次
if (count >= 3) {
RiskResult result = new RiskResult();
result.setUserId(transaction.getUserId());
result.setTransactionId(transaction.getTransactionId());
result.setRiskLevel("HIGH");
result.setActionTaken("ALERT");
result.setCreateTime(System.currentTimeMillis());
out.collect(result);
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception {
// 定时器触发时,重置状态
transactionCountState.clear();
timerState.clear();
}
}
2.3 使用Flink流处理
java
DataStream<Transaction> transactionStream = env.addSource(transactionSource);
DataStream<RiskResult> riskResultStream = transactionStream
.keyBy(Transaction::getUserId)
.process(new FraudDetectionProcessFunction());
riskResultStream.addSink(new AlertSink());
3. 使用Flink CEP实现
Flink CEP(Complex Event Processing)是Flink提供的复杂事件处理库,适合处理基于时间序列的模式匹配。以下是使用Flink CEP实现上述风控规则的示例。
3.1 实现思路
定义模式:检测10分钟内3次或更多次大额交易。
使用Flink CEP的模式匹配功能,匹配符合条件的事件序列。
3.2 代码实现
java
// 定义交易数据POJO
public class Transaction {
private String transactionId;
private String userId;
private Double amount;
private Long timestamp;
// getters and setters
}
// 定义风控结果POJO
public class RiskResult {
private String userId;
private List<String> transactionIds;
private String riskLevel;
private String actionTaken;
private Long createTime;
// getters and setters
}
// 实现风控逻辑
public class FraudDetectionCEP {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 交易数据流
DataStream<Transaction> transactionStream = env.addSource(transactionSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// 按用户分组
KeyedStream<Transaction, String> keyedStream = transactionStream
.keyBy(Transaction::getUserId);
// 定义CEP模式:10分钟内3次或更多次大额交易
Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction transaction) {
return transaction.getAmount() > 10000;
}
})
.next("second")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction transaction) {
return transaction.getAmount() > 10000;
}
})
.next("third")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction transaction) {
return transaction.getAmount() > 10000;
}
})
.within(Time.minutes(10));
// 应用模式
PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern);
// 生成风控结果
DataStream<RiskResult> riskResultStream = patternStream.process(
new PatternProcessFunction<Transaction, RiskResult>() {
@Override
public void processMatch(
Map<String, List<Transaction>> match,
Context ctx,
Collector<RiskResult> out) throws Exception {
RiskResult result = new RiskResult();
result.setUserId(match.get("first").get(0).getUserId());
result.setTransactionIds(
match.values().stream()
.flatMap(List::stream)
.map(Transaction::getTransactionId)
.collect(Collectors.toList())
);
result.setRiskLevel("HIGH");
result.setActionTaken("ALERT");
result.setCreateTime(System.currentTimeMillis());
out.collect(result);
}
}
);
// 输出结果
riskResultStream.addSink(new AlertSink());
env.execute("Fraud Detection with Flink CEP");
}
}
4. 总结
Flink实现:通过KeyedProcessFunction和状态管理实现多规则匹配。
Flink CEP实现:通过定义复杂事件模式,简化多规则匹配的逻辑。
适用场景:
Flink适合需要自定义逻辑的场景。
Flink CEP适合基于时间序列的模式匹配场景。
通过以上实现,可以高效检测银行交易中的风险行为,并根据需要扩展更多规则