一、背景
告警模块,作为大多数应用都存在的一个基础功能,今天我们就以开源项目openjob 为例,分析其设计及实现。
首先,我们梳理一下需求:
- 支持多种告警方式,包括钉钉、飞书、微信和webhook。
- 方便业务模块的接入,这里采用本地事件驱动的方式来解耦模块间的依赖。
本文将针对这两个问题,先画出多个告警的类及接口设计,再讲述本地事件驱动,最后是异步消费队列中的任务。
二、接口及类的设计
源码内容就不在这里赘述了。
四个实现类的差异在于方法send()和channel(),大多数公共的实现在抽象类AbstractChannel中。
三、模块架构图
我们可以看到,通过本地事件驱动机制,告警模块和其他业务模块做到了解耦。
事件监听者,订阅事件,转换为任务存入到LinkedBlockingQueue队列中。
同时,启动两个线程池(pullExecutor线程池负责拉取队列中的任务,consumerExecutor线程池负责执行任务,也即告警)
还有两个基础类 io.openjob.common.task.BaseConsumer和io.openjob.common.task.TaskQueue。
TaskQueue是对LinkedBlockingQueue的一个简单封装,入队在本地事件监听者,出队则在下一个类。
抽象类BaseConsumer包括两个线程池:pullExecutor和consumerExecutor。
- 线程池pullExecutor的作用是:读取TaskQueue中的任务保存至第二个线程池consumerExecutor里
- 线程池consumerExecutor的作用是:异步执行任务,调用AlarmService.alarm()。
四、本地事件驱动机制
1、定义事件AlarmEvent
package io.openjob.server.alarm.event;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import org.springframework.context.ApplicationEvent;
/**
* @author stelin swoft@qq.com
* @since 1.0.6
*/
public class AlarmEvent extends ApplicationEvent {
public AlarmEvent(AlarmEventDTO alarmEventDTO) {
super(alarmEventDTO);
}
}
2、定义事件的发布者AlarmEventPublisher
package io.openjob.server.alarm.event;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
/**
* @author stelin swoft@qq.com
* @since 1.0.6
*/
@Component
public class AlarmEventPublisher implements ApplicationEventPublisherAware {
private static ApplicationEventPublisher applicationEventPublisher;
/**
* Publish event
*
* @param applicationEvent applicationEvent
*/
public static void publishEvent(ApplicationEvent applicationEvent) {
applicationEventPublisher.publishEvent(applicationEvent);
}
@Override
public void setApplicationEventPublisher(@NonNull ApplicationEventPublisher applicationEventPublisher) {
AlarmEventPublisher.applicationEventPublisher = applicationEventPublisher;
}
}
业务模块发布事件:
AlarmEventPublisher.publishEvent(new AlarmEvent(alarmEventDTO));
3、事件监听者AlarmEventListener(重点)
类初始化的时候,初始化任务队列TaskQueue,然后启动两个线程池。
作为事件监听者,使用注解EventListener,把事件内容保存至任务队列TaskQueue。
package io.openjob.server.alarm.event;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.task.EventTaskConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* @author stelin swoft@qq.com
* @since 1.0.6
*/
@Slf4j
@Component
public class AlarmEventListener {
private final TaskQueue<AlarmEventDTO> queue;
@Autowired
public AlarmEventListener() {
queue = new TaskQueue<>(0L, 1024);
EventTaskConsumer consumer = new EventTaskConsumer(
0L,
1,
4,
"Openjob-heartbeat-executor",
1024,
"Openjob-heartbeat-consumer",
queue
);
consumer.start();
}
/**
* Alarm listener
*
* @param alarmEvent alarmEvent
*/
@EventListener
public void alarmListener(AlarmEvent alarmEvent) {
try {
AlarmEventDTO event = (AlarmEventDTO) alarmEvent.getSource();
// 取得事件的内容,放入任务队列
queue.submit(event);
} catch (Throwable throwable) {
log.error("Alarm event add failed!", throwable);
}
}
}
- start()方法,包括两个线程池:consumerExecutor和pullExecutor,
consumerExecutor这里只有初始化,并没有放入任务,待类EventTaskConsumer的consume()实现。pullExecutor详见下文。
consumerExecutor = new ThreadPoolExecutor(
this.consumerCoreThreadNum,
this.consumerMaxThreadNum,
30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10240),
new ThreadFactory() {
private final AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(@Nonnull Runnable r) {
return new Thread(r, String.format("%s-%d-%d", consumerThreadName, id, index.getAndIncrement()));
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
consumerExecutor.allowCoreThreadTimeOut(true);
4、生产线程池pullExecutor
- start()方法,每次从队列中拉取一定数量的任务
this.pullExecutor = new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1), r -> new Thread(r, "pull"));
this.pullExecutor.submit(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
List<T> tasks = this.pollTasks();
if (tasks.size() < this.pollSize) {
if (tasks.isEmpty()) {
Thread.sleep(this.pollIdleTime);
continue;
}
Thread.sleep(this.pollSleepTime);
}
}
} catch (Throwable ex) {
log.warn("Task consumer failed! message={}", ex.getMessage());
}
});
- pollTasks(),每次拉取一定量的任务,转放入消费线程池(消费逻辑不一)
private synchronized List<T> pollTasks() {
// 每次拉取一定量的任务
List<T> tasks = queues.poll(this.pollSize);
if (!tasks.isEmpty()) {
this.activePollNum.incrementAndGet();
// 放入消费线程池,异步执行任务
this.consume(id, tasks);
}
return tasks;
}
其中consume()是一个抽象方法,交由具体类去实现。见下文类EventTaskConsumer
public abstract void consume(Long id, List<T> tasks);
5、消费线程EventTaskConsumer
package io.openjob.server.alarm.task;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.service.AlarmService;
import io.openjob.common.OpenjobSpringContext;
import io.openjob.common.task.BaseConsumer;
import io.openjob.common.task.TaskQueue;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* @author stelin swoft@qq.com
* @since 1.0.6
*/
@Slf4j
public class EventTaskConsumer extends BaseConsumer<AlarmEventDTO> {
public EventTaskConsumer(Long id,
Integer consumerCoreThreadNum,
Integer consumerMaxThreadNum,
String consumerThreadName,
Integer pollSize,
String pollThreadName, TaskQueue<AlarmEventDTO> queues) {
super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 5000L, 5000L);
}
@Override
public void consume(Long id, List<AlarmEventDTO> tasks) {
// 异步执行任务
this.consumerExecutor.submit(new EventTaskRunnable(tasks));
}
private static class EventTaskRunnable implements Runnable {
private final List<AlarmEventDTO> tasks;
private EventTaskRunnable(List<AlarmEventDTO> tasks) {
this.tasks = tasks;
}
@Override
public void run() {
try {
// 执行告警,详细实现见后
OpenjobSpringContext.getBean(AlarmService.class).alarm(this.tasks);
} catch (Throwable throwable) {
log.error("Alarm event consume failed!", throwable);
}
}
}
}
五、执行告警任务
前文我们讲到告警的策略有多种,具体采用哪种策略,是由任务决定。
所以,首先保存策略对应的实现,再取得任务的属性后,反查其实现类,最后执行调用。
- 保存策略对应的告警实现
@Service
public class AlarmService {
private final AlertRuleDAO alertRuleDAO;
private final DelayDAO delayDAO;
private final JobDAO jobDAO;
private final AppDAO appDAO;
private final Map<String, AlarmChannel> channelMap = new HashMap<>();
@Autowired
public AlarmService(List<AlarmChannel> channels, AlertRuleDAO alertRuleDAO, DelayDAO delayDAO, JobDAO jobDAO, AppDAO appDAO) {
this.alertRuleDAO = alertRuleDAO;
this.delayDAO = delayDAO;
this.jobDAO = jobDAO;
this.appDAO = appDAO;
channels.forEach(c -> channelMap.put(c.channel().getType(), c));
}
注入接口AlarmChannel的所有实现类(这种写法的好处是不需要枚举),在类实例化的时候,遍历所有的实现类,保存至Map集合。
private AlarmChannel getChannel(String alertMethod) {
return Optional.ofNullable(this.channelMap.get(alertMethod))
.orElseThrow(() -> new RuntimeException("Alarm method not supported! method=" + alertMethod));
}
根据任务的属性,反查接口的实现类。
AlarmChannel channel = this.getChannel(r.getMethod());
channel.send(alarmDTO);
最后调用实现类的send()方法,这样就实现了告警的灵活配置。
六、总结
当遇到一个接口多种实现的时候,利用jdk的多态性和抽象类,源码实现可以看到设计模式中的策略模式与工厂模式。
通过本文,使用事件驱动机制,降低模块之间的耦合。
顺便说一下,openjob是支持延迟任务的,不过它的实现比较复杂,并没有采用常见的开源方案。
本文就告警模块的源码给出了一个梳理与分析,希望可以帮助到你。