读源码系列文章--开源项目openjob之alarm告警模块

news2024/12/25 13:51:20

一、背景

告警模块,作为大多数应用都存在的一个基础功能,今天我们就以开源项目openjob 为例,分析其设计及实现。

首先,我们梳理一下需求:

  • 支持多种告警方式,包括钉钉、飞书、微信和webhook。
  • 方便业务模块的接入,这里采用本地事件驱动的方式来解耦模块间的依赖。

本文将针对这两个问题,先画出多个告警的类及接口设计,再讲述本地事件驱动,最后是异步消费队列中的任务。

二、接口及类的设计

在这里插入图片描述

在这里插入图片描述
源码内容就不在这里赘述了。

四个实现类的差异在于方法send()和channel(),大多数公共的实现在抽象类AbstractChannel中。

三、模块架构图

在这里插入图片描述
我们可以看到,通过本地事件驱动机制,告警模块和其他业务模块做到了解耦。

事件监听者,订阅事件,转换为任务存入到LinkedBlockingQueue队列中。

同时,启动两个线程池(pullExecutor线程池负责拉取队列中的任务,consumerExecutor线程池负责执行任务,也即告警)

在这里插入图片描述
还有两个基础类 io.openjob.common.task.BaseConsumer和io.openjob.common.task.TaskQueue。
在这里插入图片描述
TaskQueue是对LinkedBlockingQueue的一个简单封装,入队在本地事件监听者,出队则在下一个类。
抽象类BaseConsumer包括两个线程池:pullExecutor和consumerExecutor。

  • 线程池pullExecutor的作用是:读取TaskQueue中的任务保存至第二个线程池consumerExecutor里
  • 线程池consumerExecutor的作用是:异步执行任务,调用AlarmService.alarm()。

四、本地事件驱动机制

1、定义事件AlarmEvent

package io.openjob.server.alarm.event;

import io.openjob.server.alarm.dto.AlarmEventDTO;
import org.springframework.context.ApplicationEvent;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
public class AlarmEvent extends ApplicationEvent {
    public AlarmEvent(AlarmEventDTO alarmEventDTO) {
        super(alarmEventDTO);
    }
}

2、定义事件的发布者AlarmEventPublisher

package io.openjob.server.alarm.event;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Component
public class AlarmEventPublisher implements ApplicationEventPublisherAware {

    private static ApplicationEventPublisher applicationEventPublisher;

    /**
     * Publish event
     *
     * @param applicationEvent applicationEvent
     */
    public static void publishEvent(ApplicationEvent applicationEvent) {
        applicationEventPublisher.publishEvent(applicationEvent);
    }

    @Override
    public void setApplicationEventPublisher(@NonNull ApplicationEventPublisher applicationEventPublisher) {
        AlarmEventPublisher.applicationEventPublisher = applicationEventPublisher;
    }
}

业务模块发布事件:

AlarmEventPublisher.publishEvent(new AlarmEvent(alarmEventDTO));

3、事件监听者AlarmEventListener(重点)

类初始化的时候,初始化任务队列TaskQueue,然后启动两个线程池。
作为事件监听者,使用注解EventListener,把事件内容保存至任务队列TaskQueue。

package io.openjob.server.alarm.event;

import io.openjob.common.task.TaskQueue;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.task.EventTaskConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Slf4j
@Component
public class AlarmEventListener {
    private final TaskQueue<AlarmEventDTO> queue;

    @Autowired
    public AlarmEventListener() {
        queue = new TaskQueue<>(0L, 1024);
        EventTaskConsumer consumer = new EventTaskConsumer(
                0L,
                1,
                4,
                "Openjob-heartbeat-executor",
                1024,
                "Openjob-heartbeat-consumer",
                queue
        );
        consumer.start();
    }

    /**
     * Alarm listener
     *
     * @param alarmEvent alarmEvent
     */
    @EventListener
    public void alarmListener(AlarmEvent alarmEvent) {
        try {
            AlarmEventDTO event = (AlarmEventDTO) alarmEvent.getSource();
            // 取得事件的内容,放入任务队列
            queue.submit(event);
        } catch (Throwable throwable) {
            log.error("Alarm event add failed!", throwable);
        }
    }
}

  • start()方法,包括两个线程池:consumerExecutor和pullExecutor,
    consumerExecutor这里只有初始化,并没有放入任务,待类EventTaskConsumer的consume()实现。pullExecutor详见下文。
		consumerExecutor = new ThreadPoolExecutor(
                this.consumerCoreThreadNum,
                this.consumerMaxThreadNum,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10240),
                new ThreadFactory() {
                    private final AtomicInteger index = new AtomicInteger(1);

                    @Override
                    public Thread newThread(@Nonnull Runnable r) {
                        return new Thread(r, String.format("%s-%d-%d", consumerThreadName, id, index.getAndIncrement()));
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        consumerExecutor.allowCoreThreadTimeOut(true);

4、生产线程池pullExecutor

  • start()方法,每次从队列中拉取一定数量的任务
		this.pullExecutor = new ThreadPoolExecutor(
                1,
                1,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(1), r -> new Thread(r, "pull"));

        this.pullExecutor.submit(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    List<T> tasks = this.pollTasks();
                    if (tasks.size() < this.pollSize) {
                        if (tasks.isEmpty()) {
                            Thread.sleep(this.pollIdleTime);
                            continue;
                        }
                        Thread.sleep(this.pollSleepTime);
                    }
                }
            } catch (Throwable ex) {
                log.warn("Task consumer failed! message={}", ex.getMessage());
            }
        });
  • pollTasks(),每次拉取一定量的任务,转放入消费线程池(消费逻辑不一)
    private synchronized List<T> pollTasks() {
        // 每次拉取一定量的任务
        List<T> tasks = queues.poll(this.pollSize);
        if (!tasks.isEmpty()) {
            this.activePollNum.incrementAndGet();
            // 放入消费线程池,异步执行任务
            this.consume(id, tasks);
        }
        return tasks;
    }

其中consume()是一个抽象方法,交由具体类去实现。见下文类EventTaskConsumer

public abstract void consume(Long id, List<T> tasks);

5、消费线程EventTaskConsumer

package io.openjob.server.alarm.task;

import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.service.AlarmService;
import io.openjob.common.OpenjobSpringContext;
import io.openjob.common.task.BaseConsumer;
import io.openjob.common.task.TaskQueue;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Slf4j
public class EventTaskConsumer extends BaseConsumer<AlarmEventDTO> {
    public EventTaskConsumer(Long id,
                             Integer consumerCoreThreadNum,
                             Integer consumerMaxThreadNum,
                             String consumerThreadName,
                             Integer pollSize,
                             String pollThreadName, TaskQueue<AlarmEventDTO> queues) {
        super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 5000L, 5000L);
    }

    @Override
    public void consume(Long id, List<AlarmEventDTO> tasks) {
         // 异步执行任务
        this.consumerExecutor.submit(new EventTaskRunnable(tasks));
    }

    private static class EventTaskRunnable implements Runnable {
        private final List<AlarmEventDTO> tasks;

        private EventTaskRunnable(List<AlarmEventDTO> tasks) {
            this.tasks = tasks;
        }

        @Override
        public void run() {
            try {
                // 执行告警,详细实现见后
                OpenjobSpringContext.getBean(AlarmService.class).alarm(this.tasks);
            } catch (Throwable throwable) {
                log.error("Alarm event consume failed!", throwable);
            }
        }
    }
}

五、执行告警任务

前文我们讲到告警的策略有多种,具体采用哪种策略,是由任务决定。
所以,首先保存策略对应的实现,再取得任务的属性后,反查其实现类,最后执行调用。

  • 保存策略对应的告警实现
@Service
public class AlarmService {
    private final AlertRuleDAO alertRuleDAO;
    private final DelayDAO delayDAO;
    private final JobDAO jobDAO;
    private final AppDAO appDAO;
    private final Map<String, AlarmChannel> channelMap = new HashMap<>();

    @Autowired
    public AlarmService(List<AlarmChannel> channels, AlertRuleDAO alertRuleDAO, DelayDAO delayDAO, JobDAO jobDAO, AppDAO appDAO) {
        this.alertRuleDAO = alertRuleDAO;
        this.delayDAO = delayDAO;
        this.jobDAO = jobDAO;
        this.appDAO = appDAO;
        channels.forEach(c -> channelMap.put(c.channel().getType(), c));
    }

注入接口AlarmChannel的所有实现类(这种写法的好处是不需要枚举),在类实例化的时候,遍历所有的实现类,保存至Map集合。

private AlarmChannel getChannel(String alertMethod) {
        return Optional.ofNullable(this.channelMap.get(alertMethod))
                .orElseThrow(() -> new RuntimeException("Alarm method not supported! method=" + alertMethod));
    }

根据任务的属性,反查接口的实现类。

AlarmChannel channel = this.getChannel(r.getMethod());
channel.send(alarmDTO);

最后调用实现类的send()方法,这样就实现了告警的灵活配置。

六、总结

当遇到一个接口多种实现的时候,利用jdk的多态性和抽象类,源码实现可以看到设计模式中的策略模式与工厂模式。

通过本文,使用事件驱动机制,降低模块之间的耦合。

顺便说一下,openjob是支持延迟任务的,不过它的实现比较复杂,并没有采用常见的开源方案。

本文就告警模块的源码给出了一个梳理与分析,希望可以帮助到你。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1648574.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多模态大模型学杂了能力反下降?新研究:MoE+通用专家解决冲突

微调&#xff0c;能让通用大模型更加适配具体的行业应用。 但现在&#xff0c;研究人员们却发现&#xff1a; 对多模态大模型做“多任务指令微调”&#xff0c;大模型可能会“学得多错得多”&#xff0c;因为不同任务之间的冲突&#xff0c;导致泛化能力下降。 △多模态指令微…

扩展学习|一文读懂知识图谱

一、知识图谱的技术实现流程及相关应用 文献来源&#xff1a;曹倩,赵一鸣.知识图谱的技术实现流程及相关应用[J].情报理论与实践,2015, 38(12):127-132. &#xff08;一&#xff09;知识图谱的特征及功能 知识图谱是为了适应新的网络信息环境而产生的一种语义知识组织和服务的方…

WRT1900ACS搭建openwrt服务器小记

参考链接 wrt1900acs openwrt wrt1900acs openwrt 刷机 wrt1900acs原生固件刷openwrt-23.05.3-mvebu-cortexa9-linksys_wrt1900acs-squashfs-factory.img wrt1900acs openwrt更新刷openwrt-23.05.3-mvebu-cortexa9-linksys_wrt1900acs-squashfs-sysupgrade.bin 通过WEB UI来…

linux的信号量的使用

1.信号量 在多线程情况下&#xff0c;线程要进入关键代码就得获取信号量&#xff08;钥匙&#xff09;{sem_init(&sem, 0, 0);}&#xff0c;没有信号量的情况下就一直等待sem_wait(&sem)&#xff0c;只到别人把钥匙&#xff08;sem_post(&sem)&#xff09;给你。 …

从哪些方面可以看出现货黄金价格走势?

现货黄金价格的走势受到多种因素的影响&#xff0c;我们可以从宏观经济环境、货币政策、供需关系、市场情绪和技术分析几个主要方面来观察和分析这一贵金属的价格动态。现货黄金作为全球投资市场中的避险资产&#xff0c;其价格波动往往能体现出复杂的经济和政治变化。 宏观经济…

QT+多线程编程

QT的多线程编程有两种 1、自定义类继承QThread 第一种是自定义一个类继承于QThread&#xff0c;重写run()方法来实现。然后当需要使用线程的时候你就新建一个自定义对象&#xff0c;然后调用start方法开始运行。 下面的例子是widget里面创建一个线程&#xff0c;然后调用sta…

【docker】常用的把springboot打包为docker镜像的maven插件

Spring Boot Maven Plugin: Spring Boot 自带的 Maven 插件 (spring-boot-maven-plugin) 支持直接生成 Docker 镜像。通过配置&#xff0c;可以在 Maven 构建过程中自动构建 Docker 镜像&#xff0c;而无需单独编写 Dockerfile。这种方法简化了将应用打包为 Docker 镜像的过程。…

2024年第七届大数据技术国际会议(ICBDT 2024)即将召开!

2024年第七届大数据技术国际会议&#xff08;ICBDT 2024&#xff09;将于2024年9月20-22日在中国杭州的浙江工商大学举行。数据驱动未来&#xff0c;技术引领潮流。从数据挖掘算法的优化&#xff0c;到数据处理速度的提升&#xff0c;再到数据安全与隐私保护的进步&#xff0c;…

男士内裤品牌哪个好?口碑最好的男士内裤汇总

许多男士选内裤可能比较随意&#xff0c;但实际上作为长时间贴合身体皮肤的贴身衣物&#xff0c;经常会出很多汗。而普通的内裤会吸附很多汗水却不易干&#xff0c;导致细菌含量超标&#xff0c;摩擦力增强&#xff0c;容易造成破皮感染从而影响健康。 但是现在的男士内裤种类和…

专注 APT 攻击与防御——红蓝对抗渗透测试

在团体渗透测试的项目中&#xff0c;如红蓝对抗&#xff0c;团队渗透测试比赛等&#xff0c;最重要的是过程与结果实时共享于团队&#xff0c;例如&#xff1a;A同学nmap目标站&#xff0c;B同学也nmap目标站&#xff0c;这在对抗比赛中是极其浪费时间也是非常容易引起防火墙&a…

吴恩达2022机器学习专项课程C2(高级学习算法)W1(神经网络):2.4 神经网络层

目录 神经网络第一层&#xff08;隐藏层&#xff09;计算过程1.输入向量2.神经元的计算2.标识不同神经元3.层输出&#xff08;激活&#xff09;向量4.神经网络分层5.标识不同层 神经网络第二层&#xff08;输出层&#xff09;计算过程1.输入向量2.层输出&#xff08;激活&#…

Java反序列化-CC11链

前言 这条链子的主要作用是为了可以在 Commons-Collections 3.2.1 版本中使用&#xff0c;而且还是无数组的方法。这条链子适用于 Shiro550漏洞 CC11链子流程 CC2 CC6的结合体 CC2 这是CC2的流程图&#xff0c;我们取的是后面那三个链子&#xff0c;但是由于CC2 只能在 c…

美易官方:英伟达业绩将难以撑起股价?

美股市场似乎总是对各大公司的业绩表现抱有极大的期待&#xff0c;就像一个永远填不饱的“巨胃”。在这样的市场环境下&#xff0c;即使是业绩骄人的公司也可能难以支撑其股价。英伟达&#xff0c;这家在图形处理单元&#xff08;GPU&#xff09;领域享有盛誉的公司&#xff0c…

echarts双Y轴,并实现图例等

一个Y轴时yAxis为对象 yAxis: {type: value,name: 占比(%) },两个Y轴时yAxis为数组 yAxis: [{ // 左侧的type: value,name: 占比(%),nameTextStyle: {padding: [0, 0, 10, -50]},min: 0,max: 100,splitNumber: this.splitNumber, // 设置坐标轴的分割段数interval: 20, // 标轴…

doris经典bug

在部署完登录web页面查看的时候会发现只有一个节点可以读取信息剩余的节点什么也没读取到 在发现问题后&#xff0c;我们去对应的节点去看log日志&#xff0c;发现它自己绑定到前端的地址上了 现在我们已经发现问题了&#xff0c;以下就开始解决问题 重置doris 首先对be进行操…

一键 input 苹果 OpenELM,零门槛 Llama 3 教程,40+ 优质模型/数据集/教程,建议收藏!...

现在 AI 行业什么最火&#xff1f; 「大模型」一定仍然排在前三甲的位置。 自从 2022 年底 ChatGPT 面世以来&#xff0c;大模型在各行各业已经带来了太多的惊喜&#xff0c;应用场景愈发丰富&#xff0c;同时也带动了相关底层技术的普及与普适化。尤其是在开源模型繁荣发展之下…

解决 SyntaxError: Unexpected token ‘.‘ 报错问题

这个报错一般是编译问题&#xff0c;浏览器的版本过低没通过代码 解决办法&#xff1a; 在package.json文件中加上这个 "browserslist": ["> 1%","last 2 versions","not dead","not ie < 6","Android > 4&…

TradingView 使用方法

【前言】最近项目中用到了Tradingview中的K线图,基于以前从未使用过,写此篇文章记录一下Tradingview的使用。 【目标】 1 会使用Tradingview中k线图的渲染方式 2 了解一些基本的用法 一 简介 Tradingview是一个价格图表和分析软件,提供免费和付费选项,为优秀的交易技术分析…

Python数据爬取超简单入门

## 什么是网络爬虫&#xff1f; 网络爬虫是一种自动浏览器程序&#xff0c;能够自动地从互联网获取数据。爬虫的主要任务是访问网页&#xff0c;分析网页内容&#xff0c;然后提取所需的信息。爬虫广泛应用于数据收集、数据分析、网页内容监控等领域。 ## 爬虫的基本步骤 1.…

C++实现二叉搜索树(模型)

目录 1.二叉搜索树的概念 2.二叉搜索树的实现 2.1总体代码预览 2.2各个函数实现原理 链表结构体 二叉搜索树的成员变量 二叉搜索树的插入 二叉搜索树的查找 二叉搜索树的遍历 二叉搜索树的删除 1.二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#…