一、恶意登录
对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。
因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
二、数据源格式
937166,1715,beijing,beijing,1511661606
937166,1715,beijing,beijing,1511661607
937166,1715,beijing,beijing,1511661608
161501,36156,jiangsu,nanjing,1511661608
937166,1715,beijing,beijing,1511661609
937166,1715,beijing,beijing,1511661610
937166,1715,beijing,beijing,1511661611
937166,1715,beijing,beijing,1511661612
三、封装数据
package com.lyh.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginEvent {
private Long userId;
private String ip;
private String eventType;
private Long eventTime;
}
四、代码实现逻辑
实现逻辑:
统计连续失败的次数:
- 把失败的时间戳放入到List中,
- 当List中的长度到达2的时候, 判断这个两个时间戳的差是否小于等于2s
- 如果是, 则这个用户在恶意登录
- 否则不是, 然后删除List的第一个元素
- 用于保持List的长度为2
- 如果出现成功, 则需要清空List集合
五、代码实现
package com.lyh.flink11;
import com.lyh.bean.LoginEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class Login_ey {
public static void main(String[] args) throws Exception {
//创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//创建水印策略
WatermarkStrategy<LoginEvent> wms = WatermarkStrategy.
<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent element, long recordTimestamp) {
return element.getEventTime();
}
});
//读入数据
KeyedStream<LoginEvent, Long> watersencerStream = env.readTextFile("input/LoginLog.csv")
.map(line -> {
String[] datas = line.split(",");
return new LoginEvent(Long.valueOf(datas[0]),
datas[1],
datas[2],
Long.valueOf(datas[3]));
// 指定水印和时间戳
}).assignTimestampsAndWatermarks(wms)
// 按照用户ID分组
.keyBy(LoginEvent::getUserId);
// Flink CEP 也叫做Flink复杂事件处理,
// 可以在无穷无界的事件流中检测事件规则,通过模式规则匹配的方式对重要信息进行跟踪和分析,从而在实时数据中发掘出有价值的信息
//定义模式
Pattern<LoginEvent, LoginEvent> fail = Pattern.
<LoginEvent>begin("fail")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getEventType());
}
}).timesOrMore(2).consecutive()
.until(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "success".equals(value.getEventType());
}
}).within(Time.seconds(2));
// 把模式用在流上
PatternStream<LoginEvent> ps = CEP.pattern(watersencerStream, fail);
//获取匹配到的结果
ps.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
return pattern.get("fail").toString();
}
}).print();
env.execute();
}
}