引出问题
项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高了。数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次。最高会产生10分钟的误差,这种告警就没什么意义了。
demo设计
为了简单的还原业务场景,做了简单的demo假设
实现一个对于学生成绩评价的实时处理程序
数学成绩,基准范围是90-140,超出告警
物理成绩,基准范围是60-95,超出告警
环境搭建
使用windows环境演示
准备工作
1、安装jdk
2、安装zookeeper
解压压缩包
zoo_sample.cfg将它重命名为zoo.cfg
修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data
配置环境变量
3、安装kafka
解压压缩包
修改config/server.properties
log.dirs=D://tools//kafka_2.11-2.1.0//log
flink程序代码
pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
主程序
public class StreamAlertDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);
DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
resultStream.print().setParallelism(4);
resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
env.execute();
}
}
主程序,配置告警规则后期可以使用推送或者拉去方式获取数据
public class RuleMap {
private RuleMap(){}
public final static Map<String,List<AlertRule>> initialRuleMap;
private static List<AlertRule> ruleList = new ArrayList<>();
private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
static {
for (String i : ruleStringList) {
ruleList.add(JSON.parseObject(i, AlertRule.class));
}
initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
}
}
AlertFlatMapper,处理告警逻辑
public class AlertFlatMapper implements FlatMapFunction<String, String> {
@Override
public void flatMap(String inVal, Collector<String> out) throws Exception {
Achievement user = JSON.parseObject(inVal, Achievement.class);
Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
List<AlertInfo> resList = new ArrayList<>();
List<AlertRule> mathRule = initialRuleMap.get("MathVal");
for (AlertRule rule : mathRule) {
if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
}
}
List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
for (AlertRule rule : physicsRule) {
if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
}
}
String result = JSON.toJSONString(resList);
out.collect(result);
}
private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
switch (type) {
case 0:
return actVal < targetVal;
case 1:
return actVal.equals(targetVal);
case 2:
return actVal > targetVal;
default:
return false;
}
}
}
三个实体类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
private static final long serialVersionUID = -1L;
private String name;
private Integer mathVal;
private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
private static final long serialVersionUID = -1L;
private String name;
private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
private static final long serialVersionUID = -1L;
private String target;
//0小于 1等于 2大于
private Integer type;
private Integer criticalVal;
private String descInfo;
}
项目演示
创建kafka生产者 test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
创建kafka消费者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning
启动flink应用
给topic test发送消息
{"name":"liu","MathVal":45,"PhysicsVal":76}
消费topic demo