基于Flink+kafka实时告警

news2025/1/15 16:40:42

引出问题

项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高了。数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次。最高会产生10分钟的误差,这种告警就没什么意义了。

demo设计

为了简单的还原业务场景,做了简单的demo假设

实现一个对于学生成绩评价的实时处理程序
数学成绩,基准范围是90-140,超出告警
物理成绩,基准范围是60-95,超出告警

环境搭建

使用windows环境演示

准备工作

1、安装jdk

2、安装zookeeper

解压压缩包

zoo_sample.cfg将它重命名为zoo.cfg

修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置环境变量

3、安装kafka

解压压缩包

修改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代码

pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

主程序

public class StreamAlertDemo {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);

		DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);

		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}

}
主程序,配置告警规则后期可以使用推送或者拉去方式获取数据
public class RuleMap {

	private RuleMap(){}

	public final static Map<String,List<AlertRule>> initialRuleMap;

	private static List<AlertRule> ruleList = new ArrayList<>();

	private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));

	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}


}

AlertFlatMapper,处理告警逻辑

public class AlertFlatMapper implements FlatMapFunction<String, String> {

	@Override
	public void flatMap(String inVal, Collector<String> out) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
		List<AlertInfo> resList = new ArrayList<>();
		List<AlertRule> mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}

	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal < targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal > targetVal;
			default:
				return false;
		}
	}
}

三个实体类

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {

    private static final long serialVersionUID = -1L;

    private String name;

    private Integer mathVal;

    private Integer physicsVal;

}

@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    
    private static final long serialVersionUID = -1L;

    private String name;

    private String descInfo;

}

@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {

	private static final long serialVersionUID = -1L;

	private String target;

	//0小于 1等于 2大于
	private Integer type;

	private Integer criticalVal;

	private String descInfo;
}

 项目演示

创建kafka生产者 test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

创建kafka消费者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

启动flink应用

给topic test发送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

 消费topic demo

告警系统架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/166226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智慧水务能效管理平台在污水处理厂电气节能中的应用

摘要&#xff1a;污水处理属于高能耗行业&#xff0c;会消耗大量的电能、燃料和药剂等&#xff0c;高能耗不仅会提升污水处理成本&#xff0c;还会加剧能源危机。所以&#xff0c;本文首先探究了污水处理厂耗能的原因&#xff0c;分析了污水处理与节能降耗的关系&#xff0c;然…

MyBatis-Plus数据安全保护(加密解密)

项目创建POM依赖 <dependency><!--MyBatis-Plus 企业级模块--><groupId>com.baomidou</groupId><artifactId>mybatis-mate-starter</artifactId><version>1.2.8</version> </dependency> <!-- https://mvnrepository…

git commit 命令详解

文章目录前言1. git commit 介绍2. git commit 使用3. git commit -m4. git commit -am5. git commit --amend6. commit 多行提交信息7. commit 背后到底发生了什么前言 CSDN 只用来做博客主站文章的转载 博客主站&#xff1a;https://www.itqaq.com 下面地址路径可能会发生变…

Java---中间件---Redis的常见命令和客户端使用

Redis的常见命令和客户端使用1.初识Redis1.1.认识NoSQL1.1.1.结构化与非结构化1.1.2.关联和非关联1.1.3.查询方式1.1.4.事务1.1.5.总结1.2.认识Redis1.3.安装Redis1.3.1.依赖库1.3.2.上传安装包并解压1.3.3.启动1.3.4.默认启动1.3.5.指定配置启动1.3.6.开机自启1.4.Redis桌面客…

VulnHub2018_DeRPnStiNK靶机总结

VulnHub2018_DeRPnStiNK靶机渗透总结 靶机下载地址: https://download.vulnhub.com/derpnstink/VulnHub2018_DeRPnStiNK.ova https://www.dropbox.com/s/8jqor3tuc3jhe1w/VulnHub2018_DeRPnStiNK.ova?dl0 打开靶机,使用nmap扫描出靶机的ip和开放的所有端口 可以看到,靶机开放…

从零开始学习Linux

Linux Linux内核版本&#xff1a;Linux内核运维开发小组&#xff0c;源码在不开源 Linux发行版本&#xff1a;由各大互联网/软件公司定制&#xff0c;开源 一个内核版本是有多种多样的发行版本 Ubuntu&#xff1a;以强大的桌面应用为主&#xff0c;吸收不少Windows用户&…

Docker部署jeecgboot微服务使用记录

docker安装和基础命令 docker安装 docker安装详细步骤 Docker命令 #进入容器 sudo docker exec -it 775c7c9ee1e1 /bin/bash # docker中 启动所有的容器命令 docker start $(docker ps -a | awk { print $1} | tail -n 2) # docker中 关闭所有的容器命令 docker stop $(doc…

(黑马C++)L09 C++类型转换 异常 输入输出流

一、C类型转换 类型转换&#xff08;cast&#xff09;是将一种数据类型转换成另一种数据类型&#xff0c;一般情况下要尽量少的去使用类型转换&#xff0c;除非解决非常特殊的问题。 &#xff08;1&#xff09;静态转换&#xff08;static_cast&#xff09; static_cast使用…

联合证券|内外利好共振 今年A股可更乐观一点

在经历了开年首周的快速拉升后&#xff0c;上星期A股商场全体高位盘整。职业板块从普涨转为快速轮动&#xff0c;前期领涨的新能源及大消费主线均出现了必定程度的回撤&#xff0c;由金融、信创板块接力“领跑”。 展望后市&#xff0c;指数在盘整后能否持续上攻&#xff1f;外…

解决前后端分离Vue项目部署到服务器后出现的302重定向问题

解决前后端分离Vue项目部署到服务器后出现的302重定向问题问题描述问题原因定位问题解决方案校验修改效果相关阅读写在最后问题描述 最近发现自己开发的vue前后端分离项目因为使用了spring security 安全框架&#xff0c;即使在登录认证成功之后再调用一些正常的接口总是会莫名…

Xilinx 7系列FPGA之Spartan-7产品简介

以最低的成本获得无与伦比的性能和功耗效率如果您对功耗或性能的要求与成本要求一样严苛&#xff0c;那么请使用 Spartan -7 FPGA。该系列采用 TSMC&#xff08;台积电&#xff09; 的 28nm HPL 工艺制造&#xff0c;将小尺寸架构的Xilinx 7 系列FPGA 的广泛功能和符合 RoHS 标…

结构体专题详解

目录 &#x1f94e;什么是结构体&#xff1f; ⚾结构体的声明 &#x1f3c0;简单结构体的声明 &#x1f3d0;结构体的特殊声明 &#x1f3c8;结构体嵌套问题 &#x1f3c9;结构体的自引用 &#x1f3b3;结构体的内存大小 &#x1f94c;结构体的内存对齐 ⛳内存对齐的优点 ⚽还…

SAP 服务器参数文件详细解析

一、SAP参数的说明 SAP参数的学习需要了解SAP参数的作用、参数的启动顺序、参数的配置&#xff1b; 1、参数的启动顺序 a) 启动Start profileb) 启动default profilec) 启动instance profile 2、参数的位置 a) 启动参数Start profile的位置&#xff1a;/usr/sap//SYS/prof…

计讯物联数字乡村解决方案赋能乡村振兴

项目背景 数字乡村是乡村振兴的战略方向&#xff0c;是推动农村现代化的重要途径。当前&#xff0c;数字乡村建设正在加速推进&#xff0c;打造乡村数字治理新模式&#xff0c;提升乡村的数字化水平&#xff0c;进一步推动乡村振兴进入高质量发展新赛道。计讯物联作为数字乡村…

机器学习的几个公式

今天看了几个公式的推演过程&#xff0c;有些推演过程还不是很明白&#xff0c;再着担心自己后面会忘记&#xff0c;特来此记下笔记。python 是由自己特定的公式符号的&#xff0c;但推演过程需要掌握&#xff0c;其实过程不过程不是重点&#xff0c;只要是要记得公式的含义&am…

SocketCAN 命名空间 VCAN VXCAN CANGW 举例

文章目录NAMESPACESocketCAN最新 can-utils 安装VCAN 举例VXCAN 举例CANGW 举例参考NAMESPACE namespaces, 命名空间, 将全局系统资源包装在抽象中, 使命名空间中的进程看起来拥有自己全局资源的独立实例. 命名空间的一个用途是实现容器. Linux 命名空间类型及隔离物(Isolate…

译文 | Kubernetes 1.26:PodDisruptionBudget 守护不健康 Pod 时所用的驱逐策略

对于 Kubernetes 集群而言&#xff0c;想要确保日常干扰不影响应用的可用性&#xff0c;不是一个简单的任务。上月发布的 Kubernetes v1.26 增加了一个新的特性&#xff1a;允许针对 PodDisruptionBudget (PDB) 指定不健康 Pod 驱逐策略&#xff0c;这有助于在节点执行管理操作…

电商云仓是如何包装发货的?

包装不时是为了维护产品&#xff0c;而它从工厂地板移动到大型仓库&#xff0c;并最终经过批发或批发店抵达消费者。但是&#xff0c;自21世纪初以来&#xff0c;消费者希望与那些不时吸收着某种情感的品牌联络在一同&#xff0c;同时央求他们在心理上对品牌中止投资&#xff0…

【Java AWT 图形界面编程】Canvas 组件中使用 Graphics 绘图 ④ ( AWT 绘图窗口闪烁问题 )

文章目录一、AWT 绘图窗口闪烁问题二、完整代码示例一、AWT 绘图窗口闪烁问题 使用 Graphics 第一次绘图 完成后 , 如果在循环中 持续调用 Canvas#repaint() 函数刷新界面 , 代码如下 : import java.awt.*;public class HelloAWT {public static void main(String[] args) thr…

MySQL进阶——存储引擎

MySQL有9种存储引擎&#xff0c;不同的引擎&#xff0c;适合不同的场景&#xff0c;我们最常用的&#xff0c;可能就是InnoDB&#xff0c;应该是从5.5开始&#xff0c;就成为了MySQL的默认存储引擎。 show engines可以查询MySQL支持的这几种存储引擎&#xff0c;从表头能看出来…