【开源项目】Dynamic-Tp核心流程源码解读

news2024/10/2 8:27:38

序.介绍

dynamic-tp 是一款动态线程池组件,可以实现线程池的实时动态调参及监控报警,线程池配置放在配置中心统一管理,达成业务代码零侵入,支持多配置中心的选择和常见的第三方组件的线程池的集成管理。

官网: https://dynamictp.top/

Gitee: https://gitee.com/dromara/dynamic-tp

Github: https://github.com/dromara/dynamic-tp

详细介绍及组件的基本使用,可以访问 dynamic-tp 官网。

快速入门

引入依赖

        <dependency>
            <groupId>cn.dynamictp</groupId>
            <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
            <version>1.1.0</version>
        </dependency>

本地bootstrap.xml

server:
  port: 8099

spring:
  application:
    name: dynamic-tp-example
  profiles:
    active: dev
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
      config:
        server-addr: ${spring.cloud.nacos.discovery.server-addr}
        file-extension: yml

远程配置

# 动态线程池配置文件,建议单独开一个文件放到配置中心,字段详解看readme介绍
spring:
  dynamic:
    tp:
      enabled: true
      enabledBanner: true           # 是否开启banner打印,默认true
      enabledCollect: true          # 是否开启监控指标采集,默认false
      collectorTypes: micrometer,logging     # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer
      logPath: /home/logs           # 监控日志数据路径,默认 ${user.home}/logs
      monitorInterval: 5            # 监控时间间隔(报警判断、指标采集),默认5s
      nacos:                        # nacos配置,不配置有默认值(规则name-dev.yml这样),cloud应用不需要配置
        dataId: dynamic-tp-nacos-demo-dtp-dev.yml
        group: DEFAULT_GROUP
      configType: yml               # 配置文件类型
      platforms:                    # 通知报警平台配置
        - platform: wechat
          urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替换
          receivers: test1,test2                   # 接受人企微名称
        - platform: ding
          urlKey: f80dad441fcd655438f4a08dcd6a     # 替换
          secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式可以没有此值
          receivers: 15810119805                   # 钉钉账号手机号
        - platform: lark
          urlKey: 0d944ae7-b24a-40                 # 替换
          receivers: test1,test2                   # 接受人飞书名称/openid
      tomcatTp:                                    # tomcat web server线程池配置
        corePoolSize: 100
        maximumPoolSize: 401
        keepAliveTime: 60
      jettyTp:                                     # jetty web server线程池配置
        corePoolSize: 100
        maximumPoolSize: 400
      undertowTp:                                  # undertow web server线程池配置
        corePoolSize: 100
        maximumPoolSize: 400
        keepAliveTime: 60
      hystrixTp:                                   # hystrix 线程池配置
        - threadPoolName: hystrix1
          corePoolSize: 100
          maximumPoolSize: 400
          keepAliveTime: 60
      dubboTp:                                     # dubbo 线程池配置
        - threadPoolName: dubboTp#20880
          corePoolSize: 100
          maximumPoolSize: 400
          keepAliveTime: 60
      rocketMqTp:                                  # rocketmq 线程池配置
        - threadPoolName: group1#topic1
          corePoolSize: 200
          maximumPoolSize: 400
          keepAliveTime: 60
      executors:                                   # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
        - threadPoolName: dtpExecutor1
          executorType: common                     # 线程池类型common、eager:适用于io密集型
          corePoolSize: 6
          maximumPoolSize: 8
          queueCapacity: 202
          queueType: VariableLinkedBlockingQueue   # 任务队列,查看源码QueueTypeEnum枚举类
          rejectedHandlerType: CallerRunsPolicy    # 拒绝策略,查看RejectedTypeEnum枚举类
          keepAliveTime: 50
          allowCoreThreadTimeOut: false                  # 是否允许核心线程池超时
          threadNamePrefix: test                         # 线程名前缀
          waitForTasksToCompleteOnShutdown: false        # 参考spring线程池设计,优雅关闭线程池
          awaitTerminationSeconds: 5                     # 单位(s)
          preStartAllCoreThreads: false                  # 是否预热所有核心线程,默认false
          runTimeout: 200                                # 任务执行超时阈值,目前只做告警用,单位(ms)
          queueTimeout: 100                              # 任务在队列等待超时阈值,目前只做告警用,单位(ms)
          taskWrapperNames: ["ttl"]                          # 任务包装器名称,集成TaskWrapper接口
          notifyItems:                     # 报警项,不配置自动会按默认值配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)
            - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类
              enabled: true
              threshold: 80                # 报警阈值
              platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台
              interval: 120                # 报警间隔(单位:s)
            - type: change
              enabled: true
            - type: liveness
              enabled: true
              threshold: 80
            - type: reject
              enabled: true
              threshold: 1
            - type: run_timeout
              enabled: true
              threshold: 1
            - type: queue_timeout
              enabled: true
              threshold: 1

启动类加 @EnableDynamicTp 注解

测试效果

@Slf4j
@RestController
@SuppressWarnings("all")
public class TestController {

    @Resource
    private ThreadPoolExecutor dtpExecutor1;

    @GetMapping("/test")
    public String test() throws InterruptedException {
        dtpExecutor1.execute(() -> {
                log.info("i am dynamic-tp-test-1 task");
            });
        return "success";
    }
}

在这里插入图片描述

源码分析

设计思路

  • 提供一个功能入口可以将配置构造成一个线程池对象,内部维护一个线程池注册表,将配置对应的线程池添加至注册表中。
  • 实例化线程池对象,Spring 环境则注入依赖 Bean,以供 IOC 容器使用。
  • 如果配置指向的是远端配置中心,则注册监听器,当远端注册配置中心刷新时回调,当前系统监听到回调刷新配置,刷新线程池(动态调参),刷新本地线程池注册表。
  • 线程池调参监控,异常报警

DtpBeanDefinitionRegistrar

添加EnableDynamicTp,引入DtpBeanDefinitionRegistrar

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({DtpBeanDefinitionRegistrar.class})
public @interface EnableDynamicTp {
}

DtpBeanDefinitionRegistrar

  1. 从 Environment 读取配置信息绑定到 DtpProperties
  2. 获取配置文件中配置的线程池,如果没有则结束
  3. 遍历线程池,绑定配置构造线程池所需要的属性,根据配置中的 executorType 注册不同类型的线程池 Bean
  4. BeanUtil#registerIfAbsent() 注册 Bean
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
    private static final Logger log = LoggerFactory.getLogger(DtpBeanDefinitionRegistrar.class);
    private Environment environment;

    public DtpBeanDefinitionRegistrar() {
    }

    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        DtpProperties dtpProperties = new DtpProperties();
        PropertiesBinder.bindDtpProperties(this.environment, dtpProperties);
        List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
        if (CollectionUtils.isEmpty(executors)) {
            log.warn("DynamicTp registrar, no executors are configured.");
        } else {
            executors.forEach((x) -> {
                Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
                Map<String, Object> properties = this.buildPropertyValues(x);
                Object[] args = this.buildConstructorArgs(executorTypeClass, x);
                BeanUtil.registerIfAbsent(registry, x.getThreadPoolName(), executorTypeClass, properties, args);
            });
        }
    }

    private Map<String, Object> buildPropertyValues(ThreadPoolProperties tpp) {
        Map<String, Object> properties = Maps.newHashMap();
        properties.put("threadPoolName", tpp.getThreadPoolName());
        properties.put("threadPoolAliasName", tpp.getThreadPoolAliasName());
        properties.put("allowCoreThreadTimeOut", tpp.isAllowCoreThreadTimeOut());
        properties.put("waitForTasksToCompleteOnShutdown", tpp.isWaitForTasksToCompleteOnShutdown());
        properties.put("awaitTerminationSeconds", tpp.getAwaitTerminationSeconds());
        properties.put("preStartAllCoreThreads", tpp.isPreStartAllCoreThreads());
        properties.put("runTimeout", tpp.getRunTimeout());
        properties.put("queueTimeout", tpp.getQueueTimeout());
        List<NotifyItem> notifyItems = NotifyItem.mergeAllNotifyItems(tpp.getNotifyItems());
        properties.put("notifyItems", notifyItems);
        properties.put("notifyEnabled", tpp.isNotifyEnabled());
        List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(tpp.getTaskWrapperNames());
        properties.put("taskWrappers", taskWrappers);
        return properties;
    }

    private Object[] buildConstructorArgs(Class<?> clazz, ThreadPoolProperties tpp) {
        Object taskQueue;
        if (clazz.equals(EagerDtpExecutor.class)) {
            taskQueue = new TaskQueue(tpp.getQueueCapacity());
        } else {
            taskQueue = QueueTypeEnum.buildLbq(tpp.getQueueType(), tpp.getQueueCapacity(), tpp.isFair(), tpp.getMaxFreeMemory());
        }

        return new Object[]{tpp.getCorePoolSize(), tpp.getMaximumPoolSize(), tpp.getKeepAliveTime(), tpp.getUnit(), taskQueue, new NamedThreadFactory(tpp.getThreadNamePrefix()), RejectHandlerGetter.buildRejectedHandler(tpp.getRejectedHandlerType())};
    }
}

DtpPostProcessor

DtpPostProcessor 利用了 Spring 容器启动 BeanPostProcessor 机制增强机制,在 bean 初始化的时候调用 postProcessAfterInitialization,它实现了获取被 IOC 容器托管的线程池 bean 然后注册到本地的注册表中。

public class DtpPostProcessor implements BeanPostProcessor {
    private static final Logger log = LoggerFactory.getLogger(DtpPostProcessor.class);

    public DtpPostProcessor() {
    }

    public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
        if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {
            return bean;
        } else if (bean instanceof DtpExecutor) {
            DtpExecutor dtpExecutor = (DtpExecutor)bean;
            if (bean instanceof EagerDtpExecutor) {
                ((TaskQueue)dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor)dtpExecutor);
            }

            this.registerDtp(dtpExecutor);
            return dtpExecutor;
        } else {
            ApplicationContext applicationContext = ApplicationContextHolder.getInstance();

            String dtpAnnotationVal;
            try {
                DynamicTp dynamicTp = (DynamicTp)applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
                if (Objects.nonNull(dynamicTp)) {
                    dtpAnnotationVal = dynamicTp.value();
                } else {
                    BeanDefinitionRegistry registry = (BeanDefinitionRegistry)applicationContext;
                    AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)registry.getBeanDefinition(beanName);
                    MethodMetadata methodMetadata = (MethodMetadata)annotatedBeanDefinition.getSource();
                    if (Objects.isNull(methodMetadata) || !methodMetadata.isAnnotated(DynamicTp.class.getName())) {
                        return bean;
                    }

                    dtpAnnotationVal = ((Map)Optional.ofNullable(methodMetadata.getAnnotationAttributes(DynamicTp.class.getName())).orElse(Collections.emptyMap())).getOrDefault("value", "").toString();
                }
            } catch (NoSuchBeanDefinitionException var9) {
                log.error("There is no bean with the given name {}", beanName, var9);
                return bean;
            }

            String poolName = StringUtils.isNotBlank(dtpAnnotationVal) ? dtpAnnotationVal : beanName;
            if (bean instanceof ThreadPoolTaskExecutor) {
                ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor)bean;
                this.registerCommon(poolName, taskExecutor.getThreadPoolExecutor());
            } else {
                this.registerCommon(poolName, (ThreadPoolExecutor)bean);
            }

            return bean;
        }
    }

    private void registerDtp(DtpExecutor executor) {
        DtpRegistry.registerDtp(executor, "beanPostProcessor");
    }

    private void registerCommon(String poolName, ThreadPoolExecutor executor) {
        ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor);
        DtpRegistry.registerCommon(wrapper, "beanPostProcessor");
    }
}

  1. 获取到 bean 后,如果是非线程池类型则结束。
  2. 如果是 DtpExecutor 则注册到 DTP_REGISTRY 注册表中
  3. 如果是 非动态线程池且标注了 @DynamicTp 注解则注册到 COMMON_REGISTRY 注册表中
  4. 如果是 非动态线程池且未标注 @DynamicTp 注解则结束不做增强

CloudNacosRefresher

CloudNacosRefresher实现了AbstractRefresher,当接收到RefreshScopeRefreshedEvent,进行刷新

public class CloudNacosRefresher extends AbstractRefresher implements SmartApplicationListener {
    private static final Logger log = LoggerFactory.getLogger(CloudNacosRefresher.class);

    public CloudNacosRefresher() {
    }

    public boolean supportsEventType(@NonNull Class<? extends ApplicationEvent> eventType) {
        return RefreshScopeRefreshedEvent.class.isAssignableFrom(eventType);
    }

    public void onApplicationEvent(@NonNull ApplicationEvent event) {
        if (event instanceof RefreshScopeRefreshedEvent) {
            this.doRefresh(this.dtpProperties);
        }

    }
}

AbstractRefresher#doRefresh(),刷新DtpRegistry

    protected void doRefresh(DtpProperties dtpProperties) {
        DtpRegistry.refresh(dtpProperties);
        this.publishEvent(dtpProperties);
    }

    private void publishEvent(DtpProperties dtpProperties) {
        RefreshEvent event = new RefreshEvent(this, dtpProperties);
        ApplicationContextHolder.publishEvent(event);
    }

DtpRegistry#refresh(),根据配置刷新DtpExecutor,执行消息变更通知, NoticeManager.doNoticeAsync(new ExecutorWrapper(executor), oldProp, diffKeys);

    public static void refresh(DtpProperties properties) {
        if (!Objects.isNull(properties) && !CollectionUtils.isEmpty(properties.getExecutors())) {
            properties.getExecutors().forEach((x) -> {
                if (StringUtils.isBlank(x.getThreadPoolName())) {
                    log.warn("DynamicTp refresh, threadPoolName must not be empty.");
                } else {
                    DtpExecutor dtpExecutor = (DtpExecutor)DTP_REGISTRY.get(x.getThreadPoolName());
                    if (Objects.isNull(dtpExecutor)) {
                        log.warn("DynamicTp refresh, cannot find specified dtpExecutor, name: {}.", x.getThreadPoolName());
                    } else {
                        refresh(dtpExecutor, x);
                    }
                }
            });
        } else {
            log.warn("DynamicTp refresh, empty threadPoolProperties.");
        }
    }

    private static void refresh(DtpExecutor executor, ThreadPoolProperties properties) {
        if (properties.getCorePoolSize() >= 0 && properties.getMaximumPoolSize() > 0 && properties.getMaximumPoolSize() >= properties.getCorePoolSize() && properties.getKeepAliveTime() >= 0L) {
            DtpMainProp oldProp = ExecutorConverter.convert(executor);
            doRefresh(executor, properties);
            DtpMainProp newProp = ExecutorConverter.convert(executor);
            if (oldProp.equals(newProp)) {
                log.warn("DynamicTp refresh, main properties of [{}] have not changed.", executor.getThreadPoolName());
            } else {
                List<FieldInfo> diffFields = EQUATOR.getDiffFields(oldProp, newProp);
                List<String> diffKeys = (List)diffFields.stream().map(FieldInfo::getFieldName).collect(Collectors.toList());
                NoticeManager.doNoticeAsync(new ExecutorWrapper(executor), oldProp, diffKeys);
                log.info("DynamicTp refresh, name: [{}], changed keys: {}, corePoolSize: [{}], maxPoolSize: [{}], queueType: [{}], queueCapacity: [{}], keepAliveTime: [{}], rejectedType: [{}], allowsCoreThreadTimeOut: [{}]", new Object[]{executor.getThreadPoolName(), diffKeys, String.format("%s => %s", oldProp.getCorePoolSize(), newProp.getCorePoolSize()), String.format("%s => %s", oldProp.getMaxPoolSize(), newProp.getMaxPoolSize()), String.format("%s => %s", oldProp.getQueueType(), newProp.getQueueType()), String.format("%s => %s", oldProp.getQueueCapacity(), newProp.getQueueCapacity()), String.format("%ss => %ss", oldProp.getKeepAliveTime(), newProp.getKeepAliveTime()), String.format("%s => %s", oldProp.getRejectType(), newProp.getRejectType()), String.format("%s => %s", oldProp.isAllowCoreThreadTimeOut(), newProp.isAllowCoreThreadTimeOut())});
            }
        } else {
            log.error("DynamicTp refresh, invalid parameters exist, properties: {}", properties);
        }
    }

DtpAdapterListener

DtpAdapterListener 处于 adapter 模块,该模块主要是对些三方组件中的线程池进行管理(例如 Tomcat,Jetty 等),通过 spring 的事件发布监听机制来实现与核心流程解耦。

如果需要对Tomcat进行管理,需要加入依赖

        <dependency>
            <groupId>cn.dynamictp</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-adapter-webserver</artifactId>
            <version>1.0.8</version>
        </dependency>

该jar包会注入WebServerTpAutoConfiguration

@Configuration
@ConditionalOnWebApplication
@AutoConfigureAfter({BaseBeanAutoConfiguration.class})
public class WebServerTpAutoConfiguration {
    public WebServerTpAutoConfiguration() {
    }

    @Bean
    @ConditionalOnTomcatWebServer
    public TomcatDtpAdapter tomcatTpHandler() {
        return new TomcatDtpAdapter();
    }

    @Bean
    @ConditionalOnJettyWebServer
    public JettyDtpAdapter jettyTpHandler() {
        return new JettyDtpAdapter();
    }

    @Bean
    @ConditionalOnUndertowWebServer
    public UndertowDtpAdapter undertowTpHandler() {
        return new UndertowDtpAdapter();
    }
}

TomcatDtpAdapter继承了AbstractWebServerDtpAdapter,而AbstractWebServerDtpAdapter实现了ApplicationListener,在容器启动的时候会重写onApplicationEvent

    public void onApplicationEvent(WebServerInitializedEvent event) {
        try {
            DtpProperties dtpProperties = (DtpProperties)ApplicationContextHolder.getBean(DtpProperties.class);
            this.initialize();
            this.refresh(dtpProperties);
        } catch (Exception var3) {
            log.error("Init web server thread pool failed.", var3);
        }

    }

    protected void initialize() {
        if (this.executorWrapper == null) {
            ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
            WebServer webServer = ((WebServerApplicationContext)applicationContext).getWebServer();
            this.executorWrapper = this.doGetExecutorWrapper(webServer);
            log.info("DynamicTp adapter, web server executor init end, executor: {}", this.executorWrapper.getExecutor());
        }

    }

TomcatDtpAdapter#refresh,会根据配置文件中的数据设置线程池大小。

public class TomcatDtpAdapter extends AbstractWebServerDtpAdapter {
    private static final Logger log = LoggerFactory.getLogger(TomcatDtpAdapter.class);
    private static final String POOL_NAME = "tomcatTp";

    public TomcatDtpAdapter() {
    }

    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
        TomcatWebServer tomcatWebServer = (TomcatWebServer)webServer;
        return new ExecutorWrapper("tomcatTp", tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor());
    }

    public ThreadPoolStats getPoolStats() {
        ThreadPoolExecutor executor = (ThreadPoolExecutor)this.getWrapper().getExecutor();
        return ThreadPoolStats.builder().corePoolSize(executor.getCorePoolSize()).maximumPoolSize(executor.getMaximumPoolSize()).queueType(executor.getQueue().getClass().getSimpleName()).queueCapacity(executor.getQueue().size() + executor.getQueue().remainingCapacity()).queueSize(executor.getQueue().size()).queueRemainingCapacity(executor.getQueue().remainingCapacity()).activeCount(executor.getActiveCount()).taskCount(executor.getTaskCount()).completedTaskCount(executor.getCompletedTaskCount()).largestPoolSize(executor.getLargestPoolSize()).poolSize(executor.getPoolSize()).waitTaskCount(executor.getQueue().size()).poolName("tomcatTp").build();
    }

    public void refresh(DtpProperties dtpProperties) {
        SimpleTpProperties properties = dtpProperties.getTomcatTp();
        if (!Objects.isNull(properties)) {
            ExecutorWrapper executorWrapper = this.getWrapper();
            ThreadPoolExecutor executor = (ThreadPoolExecutor)executorWrapper.getExecutor();
            this.checkParams(executor.getMaximumPoolSize(), properties);
            DtpMainProp oldProp = ExecutorConverter.ofSimple("tomcatTp", executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getKeepAliveTime(properties.getUnit()));
            this.doRefresh(executor, properties);
            DtpMainProp newProp = ExecutorConverter.ofSimple("tomcatTp", executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getKeepAliveTime(properties.getUnit()));
            if (oldProp.equals(newProp)) {
                log.warn("DynamicTp adapter refresh, main properties of [{}] have not changed.", "tomcatTp");
            } else {
                log.info("DynamicTp adapter [{}] refreshed end, corePoolSize: [{}], maxPoolSize: [{}], keepAliveTime: [{}]", new Object[]{"tomcatTp", String.format("%s => %s", oldProp.getCorePoolSize(), newProp.getCorePoolSize()), String.format("%s => %s", oldProp.getMaxPoolSize(), newProp.getMaxPoolSize()), String.format("%s => %s", oldProp.getKeepAliveTime(), newProp.getKeepAliveTime())});
            }
        }
    }

    private void doRefresh(ThreadPoolExecutor executor, SimpleTpProperties properties) {
        if (!Objects.equals(executor.getKeepAliveTime(properties.getUnit()), properties.getKeepAliveTime())) {
            executor.setKeepAliveTime((long)properties.getKeepAliveTime(), properties.getUnit());
        }

        if (!Objects.equals(executor.getCorePoolSize(), properties.getCorePoolSize())) {
            executor.setCorePoolSize(properties.getCorePoolSize());
        }

        if (!Objects.equals(executor.getMaximumPoolSize(), properties.getMaximumPoolSize())) {
            executor.setMaximumPoolSize(properties.getMaximumPoolSize());
        }

    }

    private ExecutorWrapper getWrapper() {
        ExecutorWrapper executorWrapper = this.getExecutorWrapper();
        if (!Objects.isNull(executorWrapper) && !Objects.isNull(executorWrapper.getExecutor())) {
            return executorWrapper;
        } else {
            log.warn("Tomcat web server threadPool is null.");
            throw new DtpException("Tomcat web server threadPool is null.");
        }
    }
}

当配置修改的时候,DtpAdapterListener会监听到对应的修改事件。

@Slf4j
public class DtpAdapterListener implements GenericApplicationListener {

    @Override
    public boolean supportsEventType(ResolvableType resolvableType) {
        Class<?> type = resolvableType.getRawClass();
        if (type != null) {
            return RefreshEvent.class.isAssignableFrom(type)
                    || CollectEvent.class.isAssignableFrom(type)
                    || AlarmCheckEvent.class.isAssignableFrom(type);
        }
        return false;
    }

    @Override
    public void onApplicationEvent(@NonNull ApplicationEvent event) {
        try {
            if (event instanceof RefreshEvent) {
                doRefresh(((RefreshEvent) event).getDtpProperties());
            } else if (event instanceof CollectEvent) {
                doCollect(((CollectEvent) event).getDtpProperties());
            } else if (event instanceof AlarmCheckEvent) {
                doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
            }
        } catch (Exception e) {
            log.error("DynamicTp adapter, event handle failed.", e);
        }
    }
    
    protected void doRefresh(DtpProperties dtpProperties) {
        val handlerMap = ApplicationContextHolder.getBeansOfType(DtpAdapter.class);
        if (CollectionUtils.isEmpty(handlerMap)) {
            return;
        }
        handlerMap.forEach((k, v) -> v.refresh(dtpProperties));
    }
}

AlarmManager

AlarmManager静态方法块中构造了责任链,在需要报警通知的时候,调用责任链执行

@Slf4j
public class AlarmManager {

    private static final ExecutorService ALARM_EXECUTOR = ThreadPoolBuilder.newBuilder()
            .threadPoolName("dtp-alarm")
            .threadFactory("dtp-alarm")
            .corePoolSize(1)
            .maximumPoolSize(2)
            .workQueue(LINKED_BLOCKING_QUEUE.getName(), 2000, false, null)
            .rejectedExecutionHandler(RejectedTypeEnum.DISCARD_OLDEST_POLICY.getName())
            .taskWrappers(TaskWrappers.getInstance().getByNames(Sets.newHashSet("mdc")))
            .buildDynamic();

    private static final InvokerChain<BaseNotifyCtx> ALARM_INVOKER_CHAIN;

    static {
        ALARM_INVOKER_CHAIN = NotifyFilterBuilder.getAlarmInvokerChain();
    }

    private AlarmManager() { }
}

获取容器中NotifyFilter的Bean。

public class NotifyFilterBuilder {

    private NotifyFilterBuilder() { }

    public static InvokerChain<BaseNotifyCtx> getAlarmInvokerChain() {
        val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);
        Collection<NotifyFilter> alarmFilters = Lists.newArrayList(filters.values());
        alarmFilters.add(new AlarmBaseFilter());
        alarmFilters = alarmFilters.stream()
                .filter(x -> x.supports(NotifyTypeEnum.ALARM))
                .sorted(Comparator.comparing(Filter::getOrder))
                .collect(Collectors.toList());
        return InvokerChainFactory.buildInvokerChain(new AlarmInvoker(), alarmFilters.toArray(new NotifyFilter[0]));
    }
}

AlarmManager#doAlarm,根据告警类型获取告警项配置,一个线程池可以配置多个NotifyItem,执行责任链。

    public static void doAlarm(ExecutorWrapper executorWrapper, NotifyItemEnum notifyItemEnum) {
        NotifyHelper.getNotifyItem(executorWrapper, notifyItemEnum).ifPresent(notifyItem -> {
            val alarmCtx = new AlarmCtx(executorWrapper, notifyItem);
            ALARM_INVOKER_CHAIN.proceed(alarmCtx);
        });
    }

执行责任链

public class AlarmInvoker implements Invoker<BaseNotifyCtx> {

    @Override
    public void invoke(BaseNotifyCtx context) {

        val alarmCtx = (AlarmCtx) context;
        val executorWrapper = alarmCtx.getExecutorWrapper();
        val notifyItem = alarmCtx.getNotifyItem();
        val alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(), notifyItem.getType());
        alarmCtx.setAlarmInfo(alarmInfo);

        try {
            DtpNotifyCtxHolder.set(context);
            NotifierHandler.getInstance().sendAlarm(NotifyItemEnum.of(notifyItem.getType()));
            AlarmCounter.reset(executorWrapper.getThreadPoolName(), notifyItem.getType());
        } finally {
            DtpNotifyCtxHolder.remove();
        }
    }
}

NotifierHandler#sendAlarm,获取不同的DtpNotifier,发送告警消息

@Slf4j
public final class NotifierHandler {

    private static final Map<String, DtpNotifier> NOTIFIERS = new HashMap<>();

    private NotifierHandler() {
        // 适配SPI
        ServiceLoader<DtpNotifier> loader = ServiceLoader.load(DtpNotifier.class);
        for (DtpNotifier notifier : loader) {
            NOTIFIERS.put(notifier.platform(), notifier);
        }
        DtpNotifier dingNotifier = new DtpDingNotifier(new DingNotifier());
        DtpNotifier wechatNotifier = new DtpWechatNotifier(new WechatNotifier());
        DtpNotifier larkNotifier = new DtpLarkNotifier(new LarkNotifier());
        NOTIFIERS.put(dingNotifier.platform(), dingNotifier);
        NOTIFIERS.put(wechatNotifier.platform(), wechatNotifier);
        NOTIFIERS.put(larkNotifier.platform(), larkNotifier);
    }

	public void sendAlarm(NotifyItemEnum notifyItemEnum) {
        NotifyItem notifyItem = DtpNotifyCtxHolder.get().getNotifyItem();
        for (String platformId : notifyItem.getPlatformIds()) {
            NotifyHelper.getPlatform(platformId).ifPresent(p -> {
                DtpNotifier notifier = NOTIFIERS.get(p.getPlatform().toLowerCase());
                if (notifier != null) {
                    notifier.sendAlarmMsg(p, notifyItemEnum);
                }
            });
        }
    }
}

AbstractDtpNotifier#sendAlarmMsg,发送消息。NotifierDingNotifierEmailNotifierWechatNotifier等实现类

    @Override
    public void sendAlarmMsg(NotifyPlatform notifyPlatform, NotifyItemEnum notifyItemEnum) {
        String content = buildAlarmContent(notifyPlatform, notifyItemEnum);
        if (StringUtils.isBlank(content)) {
            log.debug("Alarm content is empty, ignore send alarm message.");
            return;
        }
        notifier.send(notifyPlatform, content);
    }

DtpMonitor

对系统进行监控

@Slf4j
public class DtpMonitor implements ApplicationRunner, Ordered {

    private static final ScheduledExecutorService MONITOR_EXECUTOR = new ScheduledThreadPoolExecutor(
            1, new NamedThreadFactory("dtp-monitor", true));

    @Resource
    private DtpProperties dtpProperties;

    @Override
    public void run(ApplicationArguments args) {
        MONITOR_EXECUTOR.scheduleWithFixedDelay(this::run,
                0, dtpProperties.getMonitorInterval(), TimeUnit.SECONDS);
    }

    private void run() {
        List<String> dtpNames = DtpRegistry.listAllDtpNames();
        List<String> commonNames = DtpRegistry.listAllCommonNames();
        checkAlarm(dtpNames);
        collect(dtpNames, commonNames);
    }

    private void collect(List<String> dtpNames, List<String> commonNames) {
        if (!dtpProperties.isEnabledCollect()) {
            return;
        }

        dtpNames.forEach(x -> {
            DtpExecutor executor = DtpRegistry.getDtpExecutor(x);
            ThreadPoolStats poolStats = MetricsConverter.convert(executor);
            doCollect(poolStats);
        });
        commonNames.forEach(x -> {
            ExecutorWrapper wrapper = DtpRegistry.getCommonExecutor(x);
            ThreadPoolStats poolStats = MetricsConverter.convert(wrapper);
            doCollect(poolStats);
        });
        publishCollectEvent();
    }

    private void checkAlarm(List<String> dtpNames) {
        dtpNames.forEach(x -> {
            DtpExecutor executor = DtpRegistry.getDtpExecutor(x);
            AlarmManager.doAlarmAsync(executor, SCHEDULE_NOTIFY_ITEMS);
        });
        publishAlarmCheckEvent();
    }

    private void doCollect(ThreadPoolStats threadPoolStats) {
        try {
            CollectorHandler.getInstance().collect(threadPoolStats, dtpProperties.getCollectorTypes());
        } catch (Exception e) {
            log.error("DynamicTp monitor, metrics collect error.", e);
        }
    }

    private void publishCollectEvent() {
        CollectEvent event = new CollectEvent(this, dtpProperties);
        ApplicationContextHolder.publishEvent(event);
    }

    private void publishAlarmCheckEvent() {
        AlarmCheckEvent event = new AlarmCheckEvent(this, dtpProperties);
        ApplicationContextHolder.publishEvent(event);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 2;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/484734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++学习day--01 C生万物

1、C/C学习中遇到的问题&#xff1a; 1. 大部分初学者&#xff0c;学习 C/C 都是从入门到放弃。 C/C太难吗&#xff1f; 2. 90% 以上的初学者&#xff0c;学完 C/C 以后&#xff0c;考试完了&#xff0c;书看完了&#xff0c; 但还是不会做项目 是学的不够好吗&#xff1…

基于KZG多项式承诺方案的RLN

1. 引言 RLN——Rate-Limiting Nullifier为PSE团队主导的项目&#xff0c;源自&#xff1a; Barry White Hat 2019年博客 Semaphore RLN, rate limiting nullifier for spam prevention in anonymous p2p setting RLN&#xff08;Rate-Limiting Nullifier&#xff09;是一种…

Servlet原理

什么是Servlet? Servlet是JavaWeb应用程序中的一种Java类&#xff0c;用于接收和处理来自客户端的请求&#xff0c;并将生成的响应发送回客户端。 Servlet是按照Java Servlet规范开发的&#xff0c;可以通过Servlet容器&#xff08;如Tomcat&#xff09;来管理和运行。Servl…

二十二、SQL 数据分析实战(案例1~案例10)

文章目录 案例1&#xff1a;用户信息表 stu_table案例2&#xff1a;员工绩效表 score_table案例3&#xff1a;销售冠军信息表 month_table案例4&#xff1a;月销售额记录表 sale_table案例5&#xff1a;每季度员工绩效得分表 score_info_table案例6&#xff1a;员工信息表 stu_…

【大数据】Hadoop总结

本文对于Hadoop中的HDFS和MapReduce的相关面试重点进行了总结&#xff0c;下篇将介绍调优、数据倾斜等进阶知识。 Hadoop总结 一、概述1. Hadoop特性2. HDFS结构HDFS 架构 二、HDFS分布式文件系统1 概述2. HDFS存储数据架构图NameNodeDataNode 3 HDFS优点4 HDFS缺点&#xff08…

利用 Delte-Sigma ADC简化电路设计

很多时候在电路中选择合适的 ADC可以很大程度上简化前端的电路。这里我们一起来看一个电阻电桥的例子&#xff1a; 这里用到了一只仪表放大器和一只运算放大器&#xff0c;他们实际上主要完成了三个功能&#xff1a; 1. 抑制了 2.5V的共模信号&#xff1b; 2. 将-1…

「业务架构」波特的五力分析教程介绍

波特五力分析模型最早出现在哈佛商学院教授迈克尔E波特1979年发表在《哈佛商业评论》上的文章中。这篇论文的发表在历史上改变了企业、组织甚至国家对战略的理解。自《哈佛商业评论》创刊以来&#xff0c;它被评为十大最具影响力的论文之一。 五力分析可以帮助公司评估行业吸引…

Baumer工业相机堡盟工业相机如何联合BGAPISDK和Halcon实现图像的对数Log变换算法增强(C#)

Baumer工业相机堡盟工业相机如何联合BGAPISDK和Halcon实现图像的对数Log变换算法增强&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机使用图像算法增加图像的技术背景Baumer工业相机通过BGAPI SDK联合Halcon使用Log图像增强算法1.引用合适的类文件2.BGAPI SDK在图像回…

【ChatGLM】本地版ChatGPT ?6G显存即可轻松使用 !ChatGLM-6B 清华开源模型本地部署教程

目录 感谢B站秋葉aaaki大佬 前言 部署资源 部署流程 实机演示 ChatGML微调&#xff08;人格炼成&#xff09;&#xff08;个人感觉蛮有趣的地方&#xff09; 分享有趣の微调人格 实机演示&#xff08;潘金莲人格&#xff09; 感谢B站秋葉aaaki大佬 秋葉aaaki的个人空间…

《可穿戴监测中的数据质量评估》阅读笔记

目录 一、论文摘要 二、论文十问 三、论文亮点与不足之处 四、与其他研究的比较 五、实际应用与影响 六、个人思考与启示 参考文献 一、论文摘要 从手腕捕获的神经生理信号的可穿戴记录为癫痫监测提供了巨大的潜力。然而&#xff0c;数据质量仍然是影响数据可靠性的最具…

康耐视Visionpro常见问题汇总-视觉人机器视觉粉丝-千问之六十五解答

(2023年5月2日更,下次更新2023年10月1日-10月7日) Question0: 康耐视visionpro9.8/9.9-BeadInspect工具详细使用流程 原因分析或解决办法 康耐视visionpro9.8-BeadInspect工具详细使用流程 (qq.com) Question1: C#与visisionpro联合开发exe文件开机启动设置 原因分析…

Java 基础进阶篇(八)—— 匿名内部类与 Lambda 表达式

文章目录 一、内部类概述二、需要了解的内部类2.1 静态内部类2.2 成员内部类2.3 局部内部类2.4 面试笔试题 三、匿名内部类 ★四、Lambda表达式 ★4.1 Lambda 表达式的概述4.2 Lambda 表达式的省略规则4.3 Lambda 的使用 一、内部类概述 内部类就是定义在一个类里面的类&#…

SPSS如何管理数据之案例实训?

文章目录 0.引言1.数据文件的分解2.数据文件的横向合并3.数据文件的纵向合并4.数据文件的变换5.观测量的加权6.根据已存在的变量建立新变量7.产生计数变量8.对变量自身重新赋值9.赋值生成新的变量10.变量取值的求等级11.缺失数据的处理12.数据的汇总13.由变量组到观测量组的重组…

hd debug - DAPLink的资料

文章目录 DAPLink的资料概述笔记库迁出的技巧END DAPLink的资料 概述 查资料时, 看到有DAPLink的资料, 记录一下. 笔记 DAPLink项目分为软件和硬件2部分, 不在一个库中. 总览 : https://daplink.io/ 这个页面上说了软件和硬件项目的库地址. 软件库地址 : https://github.…

余弦相似度算法进行客户流失分类预测

余弦相似性是一种用于计算两个向量之间相似度的方法&#xff0c;常被用于文本分类和信息检索领域。具体来说&#xff0c;假设有两个向量A和B&#xff0c;它们的余弦相似度可以通过以下公式计算&#xff1a; 其中&#xff0c;dot_product(A, B)表示向量A和B的点积&#xff0c;no…

什么是链接库 | 动态库与静态库

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和…

SPSS如何进行基本统计分析之案例实训?

文章目录 0.引言1.描述性分析2.频数分析3.探索分析4.列联表分析5.比率分析 0.引言 因科研等多场景需要进行绘图处理&#xff0c;笔者对SPSS进行了学习&#xff0c;本文通过《SPSS统计分析从入门到精通》及其配套素材结合网上相关资料进行学习笔记总结&#xff0c;本文对基本统计…

深度学习卷积神经网络学习小结2

简介 经过大约两周左右的学习&#xff0c;对深度学习有了一个初步的了解&#xff0c;最近的任务主要是精读深度学习方向的文献&#xff0c;由于搭建caffe平台失败而且比较耗费时间就没有再尝试&#xff0c;所以并没有做实践方面的工作&#xff0c;本文只介绍了阅读文献学到的知…

JdbcTemplate常用语句代码示例

目录 JdbcTemplate 需求 官方文档 JdbcTemplate-基本介绍 JdbcTemplate 使用实例 需求说明 创建数据库 spring 和表 monster 创建配置文件 src/jdbc.properties 创建配置文件 src/JdbcTemplate_ioc.xml 创建类JdbcTemplateTest测试是否可以正确得到数据源 配置 J…

《程序员面试金典(第6版)面试题 16.10. 生存人数(前缀和思想)

题目描述 给定 N 个人的出生年份和死亡年份&#xff0c;第 i 个人的出生年份为 birth[i]&#xff0c;死亡年份为 death[i]&#xff0c;实现一个方法以计算生存人数最多的年份。 你可以假设所有人都出生于 1900 年至 2000 年&#xff08;含 1900 和 2000 &#xff09;之间。如果…