序言
【开源项目】Dynamic-Tp核心流程源码解读,继上回解读完DynamicTp
这个开源项目的源码,觉得其中的告警机制也是十分精彩,如果能学会,用在自己的项目中,那才能说得上掌握了DynamicTp
这个开源项目的源码理解的精髓。所以接下来看看DynamicTp
是如何初始化告警机制的,又是如何发送告警信息的。
线程池初始化
DtpExecutor#initialize
,线程池初始化。
@Override
protected void initialize() {
NotifyHelper.initNotify(this);
if (preStartAllCoreThreads) {
prestartAllCoreThreads();
}
}
NotifyHelper#initNotify
,会初始化提醒的平台数据和AlarmManager
public static void initNotify(DtpExecutor executor) {
val dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class);
if (Objects.isNull(dtpProperties)) {
log.warn("DynamicTp notify, cannot find a DtpProperties instance for [{}].",
executor.getThreadPoolName());
return;
}
val platforms = dtpProperties.getPlatforms();
if (CollectionUtils.isEmpty(platforms)) {
executor.setNotifyItems(Lists.newArrayList());
executor.setPlatformIds(Lists.newArrayList());
log.warn("DynamicTp notify, no notify platforms configured for [{}]", executor.getThreadPoolName());
return;
}
if (CollectionUtils.isEmpty(executor.getNotifyItems())) {
log.warn("DynamicTp notify, no notify items configured for [{}]", executor.getThreadPoolName());
return;
}
fillPlatforms(executor.getPlatformIds(), platforms, executor.getNotifyItems());
AlarmManager.initAlarm(executor.getThreadPoolName(), executor.getNotifyItems());
}
NotifyHelper#fillPlatforms
,整合两个NotifyPlatform
集合,存放到NotifyItem
的数据中。
public static void fillPlatforms(List<String> platformIds,
List<NotifyPlatform> platforms,
List<NotifyItem> notifyItems) {
if (CollectionUtils.isEmpty(platforms) || CollectionUtils.isEmpty(notifyItems)) {
log.warn("DynamicTp notify, no notify platforms or items configured.");
return;
}
List<String> globalPlatformIds = StreamUtil.fetchProperty(platforms, NotifyPlatform::getPlatformId);
// notifyItem > executor > global
notifyItems.forEach(n -> {
if (CollectionUtils.isNotEmpty(n.getPlatformIds())) {
// intersection of notifyItem and global
n.setPlatformIds((List<String>) CollectionUtils.intersection(globalPlatformIds, n.getPlatformIds()));
} else if (CollectionUtils.isNotEmpty(platformIds)) {
n.setPlatformIds((List<String>) CollectionUtils.intersection(globalPlatformIds, platformIds));
} else {
// need to compatible with the previous situation that does not exist platformIds
if (CollectionUtils.isNotEmpty(n.getPlatforms())) {
setPlatformIds(platforms, n);
} else {
n.setPlatformIds(globalPlatformIds);
}
}
});
}
初始化告警器
AlarmManager#initAlarm(String, List<NotifyItem>)
,初始化AlarmLimiter
和AlarmCounter
public static void initAlarm(String poolName, List<NotifyItem> notifyItems) {
notifyItems.forEach(x -> initAlarm(poolName, x));
}
public static void initAlarm(String poolName, NotifyItem notifyItem) {
AlarmLimiter.initAlarmLimiter(poolName, notifyItem);
AlarmCounter.init(poolName, notifyItem.getType());
}
AlarmLimiter#initAlarmLimiter
,初始化限流器。
public static void initAlarmLimiter(String threadPoolName, NotifyItem notifyItem) {
if (NotifyItemEnum.CHANGE.getValue().equalsIgnoreCase(notifyItem.getType())) {
return;
}
String key = genKey(threadPoolName, notifyItem.getType());
Cache<String, String> cache = CacheBuilder.newBuilder()
.expireAfterWrite(notifyItem.getInterval(), TimeUnit.SECONDS)
.build();
ALARM_LIMITER.put(key, cache);
}
AlarmCounter#init
,初始化计数器。
public static void init(String threadPoolName, String notifyItemType) {
String key = buildKey(threadPoolName, notifyItemType);
val alarmInfo = AlarmInfo.builder()
.notifyItem(NotifyItemEnum.of(notifyItemType))
.build();
ALARM_INFO_CACHE.putIfAbsent(key, alarmInfo);
}
告警
AlarmManager#doAlarmAsync(DtpExecutor, NotifyItemEnum, Runnable)
,增加次数,调用线程池执行责任链
public static void doAlarmAsync(DtpExecutor executor, NotifyItemEnum notifyType, Runnable currRunnable) {
MDC.put(TRACE_ID, ((DtpRunnable) currRunnable).getTraceId());
AlarmCounter.incAlarmCounter(executor.getThreadPoolName(), notifyType.getValue());
ALARM_EXECUTOR.execute(() -> doAlarm(ExecutorWrapper.of(executor), notifyType));
}
public static void doAlarm(ExecutorWrapper executorWrapper, NotifyItemEnum notifyItemEnum) {
NotifyHelper.getNotifyItem(executorWrapper, notifyItemEnum).ifPresent(notifyItem -> {
val alarmCtx = new AlarmCtx(executorWrapper, notifyItem);
ALARM_INVOKER_CHAIN.proceed(alarmCtx);
});
}
告警责任链初始化
AlarmManager
初始化类信息会加载InvokerChain
责任链。
private static final InvokerChain<BaseNotifyCtx> ALARM_INVOKER_CHAIN;
static {
ALARM_INVOKER_CHAIN = NotifyFilterBuilder.getAlarmInvokerChain();
}
NotifyFilterBuilder#getAlarmInvokerChain
,获取到容器中的NotifyFilter
,使用工厂类构造责任链
public static InvokerChain<BaseNotifyCtx> getAlarmInvokerChain() {
val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);
Collection<NotifyFilter> alarmFilters = Lists.newArrayList(filters.values());
alarmFilters.add(new AlarmBaseFilter());
alarmFilters = alarmFilters.stream()
.filter(x -> x.supports(NotifyTypeEnum.ALARM))
.sorted(Comparator.comparing(Filter::getOrder))
.collect(Collectors.toList());
return InvokerChainFactory.buildInvokerChain(new AlarmInvoker(), alarmFilters.toArray(new NotifyFilter[0]));
}
InvokerChainFactory
用于构造InvokerChain
。
public final class InvokerChainFactory {
private InvokerChainFactory() { }
@SafeVarargs
public static<T> InvokerChain<T> buildInvokerChain(Invoker<T> target, Filter<T>... filters) {
InvokerChain<T> invokerChain = new InvokerChain<>();
Invoker<T> last = target;
for (int i = filters.length - 1; i >= 0; i--) {
Invoker<T> next = last;
Filter<T> filter = filters[i];
last = context -> filter.doFilter(context, next);
}
invokerChain.setHead(last);
return invokerChain;
}
}
AlarmBaseFilter
,判断是否告警过,判断容量,存活等阈值。AlarmLimiter
有时间限制,在一定时间内会自动清除数据。
@Slf4j
public class AlarmBaseFilter implements NotifyFilter {
private static final Object SEND_LOCK = new Object();
@Override
public void doFilter(BaseNotifyCtx context, Invoker<BaseNotifyCtx> nextInvoker) {
val executorWrapper = context.getExecutorWrapper();
val notifyItem = context.getNotifyItem();
if (Objects.isNull(notifyItem) || !satisfyBaseCondition(notifyItem, executorWrapper)) {
return;
}
boolean ifAlarm = AlarmLimiter.ifAlarm(executorWrapper.getThreadPoolName(), notifyItem.getType());
if (!ifAlarm) {
log.debug("DynamicTp notify, alarm limit, threadPoolName: {}, notifyItem: {}",
executorWrapper.getThreadPoolName(), notifyItem.getType());
return;
}
if (!AlarmManager.checkThreshold(executorWrapper, context.getNotifyItemEnum(), notifyItem)) {
return;
}
synchronized (SEND_LOCK) {
// recheck alarm limit.
ifAlarm = AlarmLimiter.ifAlarm(executorWrapper.getThreadPoolName(), notifyItem.getType());
if (!ifAlarm) {
log.warn("DynamicTp notify, concurrent send, alarm limit, threadPoolName: {}, notifyItem: {}",
executorWrapper.getThreadPoolName(), notifyItem.getType());
return;
}
AlarmLimiter.putVal(executorWrapper.getThreadPoolName(), notifyItem.getType());
}
nextInvoker.invoke(context);
}
private boolean satisfyBaseCondition(NotifyItem notifyItem, ExecutorWrapper executor) {
return executor.isNotifyEnabled()
&& notifyItem.isEnabled()
&& CollectionUtils.isNotEmpty(notifyItem.getPlatformIds());
}
@Override
public int getOrder() {
return 0;
}
}
AlarmManager#checkThreshold
,检查容量,存活率。
public static boolean checkThreshold(ExecutorWrapper executor, NotifyItemEnum itemEnum, NotifyItem notifyItem) {
switch (itemEnum) {
case CAPACITY:
return checkCapacity(executor, notifyItem);
case LIVENESS:
return checkLiveness(executor, notifyItem);
case REJECT:
case RUN_TIMEOUT:
case QUEUE_TIMEOUT:
return checkWithAlarmInfo(executor, notifyItem);
default:
log.error("Unsupported alarm type, type: {}", itemEnum);
return false;
}
}
private static boolean checkLiveness(ExecutorWrapper executorWrapper, NotifyItem notifyItem) {
val executor = (ThreadPoolExecutor) executorWrapper.getExecutor();
int maximumPoolSize = executor.getMaximumPoolSize();
double div = NumberUtil.div(executor.getActiveCount(), maximumPoolSize, 2) * 100;
return div >= notifyItem.getThreshold();
}
private static boolean checkCapacity(ExecutorWrapper executorWrapper, NotifyItem notifyItem) {
val executor = (ThreadPoolExecutor) executorWrapper.getExecutor();
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (CollectionUtils.isEmpty(workQueue)) {
return false;
}
int queueCapacity = executor.getQueue().size() + executor.getQueue().remainingCapacity();
double div = NumberUtil.div(workQueue.size(), queueCapacity, 2) * 100;
return div >= notifyItem.getThreshold();
}
private static boolean checkWithAlarmInfo(ExecutorWrapper executorWrapper, NotifyItem notifyItem) {
AlarmInfo alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(), notifyItem.getType());
return alarmInfo.getCount() >= notifyItem.getThreshold();
}
AlarmInvoker
,进行短信发送。
public class AlarmInvoker implements Invoker<BaseNotifyCtx> {
@Override
public void invoke(BaseNotifyCtx context) {
val alarmCtx = (AlarmCtx) context;
val executorWrapper = alarmCtx.getExecutorWrapper();
val notifyItem = alarmCtx.getNotifyItem();
val alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(), notifyItem.getType());
alarmCtx.setAlarmInfo(alarmInfo);
try {
DtpNotifyCtxHolder.set(context);
NotifierHandler.getInstance().sendAlarm(NotifyItemEnum.of(notifyItem.getType()));
AlarmCounter.reset(executorWrapper.getThreadPoolName(), notifyItem.getType());
} finally {
DtpNotifyCtxHolder.remove();
}
}
}
NotifierHandler#sendAlarm
,发送告警。获取所有的平台,根据平台获取DtpNotifier
,发送告警信息。
public void sendAlarm(NotifyItemEnum notifyItemEnum) {
NotifyItem notifyItem = DtpNotifyCtxHolder.get().getNotifyItem();
for (String platformId : notifyItem.getPlatformIds()) {
NotifyHelper.getPlatform(platformId).ifPresent(p -> {
DtpNotifier notifier = NOTIFIERS.get(p.getPlatform().toLowerCase());
if (notifier != null) {
notifier.sendAlarmMsg(p, notifyItemEnum);
}
});
}
}
AbstractDtpNotifier#sendAlarmMsg
,构建告警内容
@Override
public void sendAlarmMsg(NotifyPlatform notifyPlatform, NotifyItemEnum notifyItemEnum) {
String content = buildAlarmContent(notifyPlatform, notifyItemEnum);
if (StringUtils.isBlank(content)) {
log.debug("Alarm content is empty, ignore send alarm message.");
return;
}
notifier.send(notifyPlatform, content);
}
protected String buildAlarmContent(NotifyPlatform platform, NotifyItemEnum notifyItemEnum) {
AlarmCtx context = (AlarmCtx) DtpNotifyCtxHolder.get();
ExecutorWrapper executorWrapper = context.getExecutorWrapper();
String threadPoolName = executorWrapper.getThreadPoolName();
val executor = (ThreadPoolExecutor) context.getExecutorWrapper().getExecutor();
NotifyItem notifyItem = context.getNotifyItem();
AlarmInfo alarmInfo = context.getAlarmInfo();
val alarmCounter = AlarmCounter.countStrRrq(threadPoolName, executor);
String receivesStr = getReceives(platform.getPlatform(), platform.getReceivers());
String content = String.format(
getAlarmTemplate(),
CommonUtil.getInstance().getServiceName(),
CommonUtil.getInstance().getIp() + ":" + CommonUtil.getInstance().getPort(),
CommonUtil.getInstance().getEnv(),
populatePoolName(executorWrapper),
notifyItemEnum.getValue(),
notifyItem.getThreshold(),
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getPoolSize(),
executor.getActiveCount(),
executor.getLargestPoolSize(),
executor.getTaskCount(),
executor.getCompletedTaskCount(),
executor.getQueue().size(),
executor.getQueue().getClass().getSimpleName(),
getQueueCapacity(executor),
executor.getQueue().size(),
executor.getQueue().remainingCapacity(),
getRejectHandlerName(executor),
alarmCounter.getLeft(),
alarmCounter.getMiddle(),
alarmCounter.getRight(),
Optional.ofNullable(alarmInfo.getLastAlarmTime()).orElse(UNKNOWN),
DateUtil.now(),
receivesStr,
Optional.ofNullable(MDC.get(TRACE_ID)).orElse(UNKNOWN),
notifyItem.getInterval()
);
return highlightAlarmContent(content, notifyItemEnum);
}
AlarmCounter#countStrRrq
,告警信息中就包含当前线程池REJECT
,RUN_TIMEOUT
,QUEUE_TIMEOUT
的线程数量。
public static Triple<String, String, String> countStrRrq(String threadPoolName, ThreadPoolExecutor executor) {
if (!(executor instanceof DtpExecutor)) {
return new ImmutableTriple<>(DEFAULT_COUNT_STR, DEFAULT_COUNT_STR, DEFAULT_COUNT_STR);
}
DtpExecutor dtpExecutor = (DtpExecutor) executor;
String rejectCount = getCount(threadPoolName, REJECT.getValue()) + " / " + dtpExecutor.getRejectCount();
String runTimeoutCount = getCount(threadPoolName, RUN_TIMEOUT.getValue()) + " / "
+ dtpExecutor.getRunTimeoutCount();
String queueTimeoutCount = getCount(threadPoolName, QUEUE_TIMEOUT.getValue()) + " / "
+ dtpExecutor.getQueueTimeoutCount();
return new ImmutableTriple<>(rejectCount, runTimeoutCount, queueTimeoutCount);
}
提醒工具类
EmailNotifier
实现了Notifier
,会进行邮件发送。
@Slf4j
public class EmailNotifier implements Notifier {
@Value("${spring.mail.username}")
private String sendFrom;
@Value("${spring.mail.title:ThreadPool Notify}")
private String title;
@Resource
private JavaMailSender javaMailSender;
@Resource
private TemplateEngine templateEngine;
@Override
public String platform() {
return NotifyPlatformEnum.EMAIL.name().toLowerCase();
}
@Override
public void send(NotifyPlatform platform, String content) {
try {
MimeMessage mimeMessage = javaMailSender.createMimeMessage();
MimeMessageHelper messageHelper = new MimeMessageHelper(mimeMessage, true, "UTF-8");
messageHelper.setSubject(title);
messageHelper.setFrom(sendFrom);
messageHelper.setTo(platform.getReceivers().split(","));
messageHelper.setSentDate(new Date());
messageHelper.setText(content, true);
javaMailSender.send(mimeMessage);
log.info("DynamicTp notify, email send success.");
} catch (Exception e) {
log.error("DynamicTp notify, email send failed...", e);
}
}
public String processTemplateContent(String template, Context context) {
return templateEngine.process(template, context);
}
}