【Java项目】使用Nacos实现动态线程池技术以及Nacos配置文件更新监听事件

news2025/1/11 21:07:46

文章目录

  • 项目源码
  • 线程池概念
  • ThreadPoolExecutor介绍
  • Nacos
  • 实现对Nacos配置文件更新的事件监听机制
  • 定时通知功能
  • 邮件发送通知功能
  • 开始测试

项目源码

真诚的希望能给我项目一个stars!!!
项目源码

线程池概念

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如Tomcat。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

使用线程池的好处如下:

降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。

提高响应速度:任务到达时,无需等待线程创建即可立即执行。

提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

过多的概念以及线程池中的核心参数这些问题我就不讲解了,可以直接看下面这篇文章
线程池概念以及核心参数讲解

ThreadPoolExecutor介绍

先看一下ThreadPoolExecutor的类图。
在这里插入图片描述
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor的执行机制如下图。
(来源网络)
在这里插入图片描述
上图表明了当一个任务过来的时候,三种处理情况,直接拒绝,直接执行,或者是放入缓冲区。

线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。**线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。**阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
上文中也提到了几个BlockingQueue了,这里不做赘述。
这里也不在讲解线程池的任务申请流程了。

Nacos

我们知道,使用Nacos作为注册中心和配置中心的好处在于,如果我们的配置修改了,我们的项目是可以实时监控到的并且进行配置的同步更新。
在这里插入图片描述
我们可以按照如上的方式来配置线程池的参数,这是完全没问题的。
我们也知道,我们把这里的参数修改之后,我们的代码里面也会同步的修改这些值为新值。
但是有一个问题,我们知道,我们创建线程池的时候都是new一个线程池出来,那么这里有一个问题就在于,我们使用的是spring项目,我们不会去说当nacos的配置文件修改之后,然后去通知项目然后new一个线程池出来,而是希望在原有的线程池的参数上进行修改。
恰巧,ThreadPoolExecutor也提供了这些方法给我们来让我们运行时修改线程池的参数。
在这里插入图片描述
那么现在问题就变成了,我们如何做到nacos的配置更新之后,我们如何对已经创建的线程池进行参数的修改呢?
肯定能最快想到的就是事件通知机制了,我们猜测nacos能实现本地代码配置的实时更新,大概率也是使用了通知功能,所以我们翻看nacos的源码可以发现nacos有一个方法可以添加一个监听器来监听配置文件的更新。
在这里插入图片描述

实现对Nacos配置文件更新的事件监听机制

那么接下来的代码来实现Nacos的事件监听机制

spring:
  application:
    name: dynamic-thread-pool
  profiles:
    active: dev
  cloud:
    nacos:
      discovery:
        namespace: test
        group: DYNAMIC_THREADPOOL
        server-addr: xxxx # 填写nacos地址
      config:
        server-addr: xxxx # 填写nacos地址
        group: DYNAMIC_THREADPOOL
        namespace: test
        file-extension: properties
        shared-configs:
          - data-id: dynamic-thread-pool-dev.properties
            refresh: true
server:
  port: 8080

然后我们编写一个配置类,当然,这个配置类其实可有可无

package zhang.blossom.dynamic.threadpool.config;
 
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
 
import java.util.Properties;
 
/**
 * nacos配置
 *
 * @author 张锦标
 */
@Slf4j
@Configuration
public class NacosConfig {
    @Value("${spring.cloud.nacos.config.server-addr}")
    private String serverAddr;
    @Value("${spring.cloud.nacos.config.namespace}")
    private String namespace;
 
    @Bean
    @Primary
    public ConfigService configService() {
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        properties.put("namespace", namespace);
        try {
            return NacosFactory.createConfigService(properties);
        } catch (NacosException e) {
            log.error(e.toString(), e);
        }
 
        return null;
    }
}

之后我们创建一个监听器,有多种方法,我们直接@Bean创建一个我们配置好的ConfigService也可以,当然,我这里选择的是实现ApplicationRunner接口,这样子可以做到项目启动成功后会自动执行run方法。
代码如下

package zhang.blossom.dynamic.threadpool.listener;
 
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import zhang.blossom.dynamic.threadpool.core.ResizableCapacityLinkedBlockIngQueue;

import javax.annotation.Resource;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 项目启动后添加监听,实现ApplicationRunner,在项目成功后会自动执行run方法
 *
 * @author wangfenglei
 */
@Slf4j
@Component
public class NacosListener implements ApplicationRunner {
    @Resource
    private ConfigService configService;
    @Value("${spring.cloud.nacos.config.group}")
    private String groupId;

    public static final String DATA_ID = "dynamic-thread-pool-dev.properties";
    @Autowired
    @Qualifier("commonThreadPool")
    private ThreadPoolExecutor threadPoolExecutor;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        //添加nacos配置文件监听
        listenerNacosConfig();
    }
 
    /**
     * 监听数据源变化
     *
     * @throws Exception 异常
     */
    private void listenerNacosConfig() throws Exception {
        configService.addListener(DATA_ID, groupId, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                //configInfo是一个字符串,它的内容就是你配置文件里所有的内容
                //这里推荐配置文件使用properties方式,不然后续不好处理
            }
 
            @Override
            public Executor getExecutor() {
                return null;
            }
        });
    }
 
    /**
     * 向nacos发布内容
     * 会直接覆写原本的配置文件,请谨慎使用
     * @param content 内容
     * @throws Exception 异常
     */
    private void publishConfig(String content) throws Exception {
        //发布内容
        configService.publishConfig(DATA_ID, groupId, content);
    }
}

ok,然后我现在创建一个配置类,来读取nacos上面对应的参数。

package zhang.blossom.dynamic.threadpool.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @author: 张锦标
 * @date: 2023/6/15 11:25
 * ThreadPoolProperty类
 */
//@Configuration
@RefreshScope
@Component
@ConfigurationProperties("dynamic.threadpool")
public class ThreadPoolProperty {
    //@Value("${dynamic.threadpool.corePoolSize}")
    private Integer corePoolSize;
    //@Value("${dynamic.threadpool.maximumPoolSize}")
    private Integer maximumPoolSize;
    //@Value("${dynamic.threadpool.queueCapacity}")
    private Integer queueCapacity;

    public Integer getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(Integer corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public Integer getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setMaximumPoolSize(Integer maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }

    public Integer getQueueCapacity() {
        return queueCapacity;
    }

    public void setQueueCapacity(Integer queueCapacity) {
        this.queueCapacity = queueCapacity;
    }
}

在这里插入图片描述
在这里插入图片描述
之后, 启动项目就可以得到我们在nacos上配置的参数的值了。
下面是一个测试:

package zhang.blossom.dynamic.threadpool.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import zhang.blossom.dynamic.threadpool.config.ThreadPoolProperty;

/**
 * @author: 张锦标
 * @date: 2023/6/15 11:59
 * TestController类
 */
@RestController
public class TestController {
    @Autowired
    private ThreadPoolProperty threadPoolProperty;
    @GetMapping("/get")
    public String getValue(){
        return threadPoolProperty.getMaximumPoolSize() +"  "+
        threadPoolProperty.getCorePoolSize()+"  "+
                threadPoolProperty.getQueueCapacity();
    }
}

在这里插入图片描述

定时通知功能

 <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.1</version>
</dependency>

使用xxl-job来实现定时通知功能
在这里插入图片描述
在这里插入图片描述

package zhang.blossom.dynamic.threadpool.handler;


import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import zhang.blossom.dynamic.threadpool.service.SendMailService;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 张锦标
 * @version 1.0
 */
@Component
public class SimpleXxxJob {
    @Autowired
    private SendMailService sendMailService;

    @Autowired
    @Qualifier("commonThreadPool")
    private ThreadPoolExecutor threadPoolExecutor;
    //使用XXLJOB注解定义一个job
    @XxlJob(value = "sendMailHandler", init = "initHandler", destroy = "destroyHandler")
    public void sendMailHandler() {
        sendMailService.sendMailToWarn(threadPoolExecutor);
        System.out.println("发送短信成功。。。。。。。。。。。");
    }

    //任务初始化方法
    public void initHandler() {
        System.out.println("任务调用初始化方法执行");
    }

    public void destroyHandler() {
        System.out.println("任务执行器被销毁");
    }


}

邮件发送通知功能

package zhang.blossom.dynamic.threadpool.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: 张锦标
 * @date: 2023/6/15 15:24
 * SendMailToWarn类
 */
@Service
public class SendMailService{

    @Autowired
    private JavaMailSender javaMailSender;

    @Value("${warn.recipient}")
    public String recipient;

    @Value("${warn.addresser}")
    public String addresser;


    public boolean sendMailToWarn(ThreadPoolExecutor threadPoolExecutor){
        SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
        simpleMailMessage.setTo(recipient);
        simpleMailMessage.setFrom(addresser);
        simpleMailMessage.setSubject("线程池情况汇报");
        String s = "CorePoolSize="+threadPoolExecutor.getCorePoolSize()+"   "+
                "LargestPoolSize="+threadPoolExecutor.getLargestPoolSize()+"   "+
                "MaximumPoolSize="+threadPoolExecutor.getMaximumPoolSize();
        simpleMailMessage.setText(s);
        javaMailSender.send(simpleMailMessage);
        return true;
    }
}

开始测试

这里特别需要先强调一个点,JDK提供的线程池虽然有获得工作队列的方法,但是目前目前实现的消息队列都是不能动态修改容量的,所以你需要自己实现一个工作队列。
具体可以看我项目中的代码实现。
(!!!!求求动动你发财的小手给我的项目点一个stars吧!!!!)
源码地址—希望能给我一个stars作为鼓励!!!
这是项目一开始启动的时候的线程池的参数情况。
在这里插入图片描述
ok,然后我们修改nacos的配置文件,使其发生监听事件
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
然后,我们的xxl-job也会定时的给我们进行消息提示
在这里插入图片描述
到此为止,整个项目完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/649814.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Megatron-LM GPT2

内容 使用原始 Megatron-LM 训练 GPT-2 训练数据设置运行未修改的 Megatron-LM GPT2 模型启用 DeepSpeed 参数解析初始化和训练 初始化使用训练 API 前向传播反向传播更新模型参数损失缩放检查点保存和加载DeepSpeed 激活检查点&#xff08;可选&#xff09;训练脚本使用 GPT-…

拆分代码 + 预加载,减少首屏资源,提升首屏性能及应用体验

route-resource-preload 组件懒加载可以极大地提升我们的应用首屏加载体验, 但我们在进行组件资源拆包后&#xff0c;一旦加载某个组件资源出现以上时间过长的情况&#xff0c;则会出现视图无法正常渲染的问题&#xff08;导致页面白屏/组件出不来/交互卡顿等情况&#xff09;。…

Pytest教程__钩子方法setup、teardown、setup_class、teardown_class(8)

pytest跳过用例执行的用法与unittest跳过用例大致相同。 pytest跳过用例的方法如下&#xff1a; pytest.mark.skip(reason)&#xff1a;无条件用例。reason是跳过原因&#xff0c;下同。pytest.mark.skipIf(condition, reason)&#xff1a;condition为True时跳过用例。 pyte…

代码随想录算法训练营第五十八天|739. 每日温度|496.下一个更大元素 I

LeetCode739. 每日温度 基本思路&#xff1a;什么时候使用单调栈呢&#xff1f;通常是一维数组&#xff0c;要寻找任一个元素的右边或者左边第一个比自己大或者小的元素的位置&#xff0c;此时我们就要想到可以用单调栈了。时间复杂度为O(n)。例如本题其实就是找找到一个元素右…

【发电厂】发电厂模型验证应用于电网事件在线性能监测【相量测量单元 (PMU) 数据对电网事件的在线性能监控】研究(MatlabSimulink实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

(一文详解!)wrk性能测试

目录 一、简介 二、格式及用法 三、简单压测及结果分析 四、使用lua脚本进行压测 一、简介 wrk 是一款针对 Http 协议的基准测试工具&#xff0c;它能够在单机多核 CPU 的条件下&#xff0c;使用系统自带的高性能 I/O 机制&#xff0c;如 epoll&#xff0c;kqueue 等&#xff0…

华为OD机试真题 JavaScript 实现【报数游戏】【2022Q4 100分】

一、题目描述 100个人围成一圈&#xff0c;每个人有一个编码&#xff0c;编号从1开始到100。他们从1开始依次报数&#xff0c;报到为M的人自动退出圈圈&#xff0c;然后下一个人接着从1开始报数&#xff0c;直到剩余的人数小于M。请问最后剩余的人在原先的编号为多少&#xff…

Ubuntu 更新 CMake 版本

项目中有时候会出现CMake版本小于最低要求的情况,实际上没有有必要这么高的要求,但是在不能改对方代码的情况下,只能去升级自身的版本了. 尝试了网上说的直接update之后再次安装的方式,结果版本号没有改变 sudo apt-get update sudo apt-get remove cmake sudo apt-get ins…

Rust语言从入门到入坑——(3)小谈Cargo工具以及VScode配置Rust

文章目录 0 引入1、Cargo 是什么2、常用Cargo指令3、VScode配置Rust环境3.1.tasks.json3.2.launch.json 4、总结 0 引入 上一节我们谈了在编译工程的时候用到Cargo&#xff08;卡狗&#xff09; 1、Cargo 是什么 Cargo 是 Rust 的构建系统和包管理器。 Rust 开发者常用 Carg…

Metricbeat安装下载,nginx模块使用

目录 MetricbeatMetricbeat组成下载启动Metricbeat Modulesystem module配置内容 Nginx Module开启Nginx Module 配置nginx module测试 Metricbeat 定期收集操作系统或应用服务的指标数据存储到Elasticsearch中&#xff0c;进行实时分析 Metricbeat组成 Metricbeat有2部分组成…

Jmeter性能测试步骤入门

目录 前言&#xff1a; 一、Jmeter简介 1 概述 2 JMeter支持的协议 二、Jmeter实现性能测试 第一步&#xff1a;配置jdk环境 第二步&#xff1a;下载jmeter 第三步&#xff1a;启动Jmeter 四、一个http还除了上述http请求中的请求行和请求体&#xff0c;还包含请求头&#xff…

Android使用AspectJ拦截点击事件

介绍&#xff1a; AspectJ是一种AOP框架&#xff0c;它可以在编译时或运行时拦截指定的方法。在Android开发中&#xff0c;我们可以使用AspectJ来拦截UI事件并执行自己的业务逻辑。本文将说明如何使用AspectJ来拦截Android应用程序中的点击事件&#xff0c;并附有详细的代码注…

存储池和LUN

存储池是什么意思 存储池是为集体存储共享置备的存储卷的集合。这些池建立在物理存储设备的聚合上&#xff0c;根据资源需求进行隔离&#xff0c;然后由共享环境中的各种计算机或系统使用。存储池可以分为三类&#xff0c;每类都有不同的用途;主存储池、副本存储池和活动数据池…

618来了!看图技术如何在物流管理系统大显身手!

导读 近日&#xff0c;随着电商“618”购物节的临近&#xff0c;各大商家纷纷推出各类补贴活动刺激消费者热情。下单后&#xff0c;消费者的心理活动如何呢&#xff1f;蹲点抢到优惠券&#xff0c;精打细算的凑单后&#xff0c;终于完成付款。焦急的等待待发货的小红点跳至待收…

分享三个java低代码开发平台,每个都很能打,建议收藏!

来讲讲近期比较流行的低代码开发平台一词。低代码的目的是将可重复性的编程工作通过平台实现&#xff0c;将开发人员从没有技术含量的增删改查开发中解放出来&#xff0c;让其专注于更有价值的开发工作&#xff0c;例如业务建模、数据库设计、流程设计、API核心开发、业务逻辑开…

如何调用百度地图API

前言 要调用百度地图API&#xff0c;步骤操作如下 注册并创建一个API密钥。您可以在百度地图API控制台上创建您的密钥。选择要使用的API服务。百度地图API提供了多种服务&#xff0c;包括地图展示、路线规划、地点搜索、实时交通等。您可以在百度地图API控制台上查看所有可用…

大数据hadoop生态技术简介

Hadoop 生态是指围绕 Hadoop 大数据处理平台形成的一系列开源软件和工具&#xff0c;用于支持大规模数据处理、存储、管理、分析和可视化等应用场景。暂时将其核心技术分为9类&#xff1a; 数据采集技术框架&#xff1a; Flume、Logstash、FileBeat&#xff1b;Sqoop和Datax&…

【Spring】循环依赖

一、什么情况下会出现循环依赖&#xff1f; 二、解决方案 &#xff08;一&#xff09;一级缓存&#xff1a;存放完整的Bean实例对象 缺点&#xff1a;一级缓存的方式无法保证多线程下的一级缓存Bean的完整性&#xff0c;可以用加锁的方式来解决此问题。 &#xff08;二&#…

Nerf如何制作自己的llff数据集

Nerf三维重建使用Pycharm运行自己的数据集 ------------------------------------20230427更新---------------------------------------------- Nerf代码讲解&#xff0c;从零简单复现论文代码 Nerf环境配置教程 你好&#xff01; 这里是“出门吃三碗饭”本人&#xff0c;本…

数字化时代,数据治理中有哪些误区?

数据治理也不是什么简单的事情&#xff0c;或者说企业想要利用数据资产、数字化、数字化转型等加速企业的发展&#xff0c;就势必会在路途中遇到很多困难&#xff0c;数据治理只是用来解决数据问题的一种方案。所以当数据的价值开始被不断挖掘&#xff0c;企业数据资产的地位越…