手撕线程池

news2025/1/10 20:25:52

1.手撕线程池原理图

在这里插入图片描述

2.代码实现

// 手撕线程池
public class Main {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,(queue, task) -> {
            queue.putByTime(task,1500,TimeUnit.MILLISECONDS);
        });
        for (int i = 0; i < 4; i++) {
            int j=i;
            threadPool.execute(()->{
                try{
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

// 自定义任务队列
class BlockQueue<T>{
    // 任务队列
    private Deque<T> queue=new ArrayDeque<>();

    // 锁
    private ReentrantLock lock=new ReentrantLock();

    // 生产者条件变量
    private Condition fullWaitSet=lock.newCondition();

    // 消费者条件变量
    private Condition emptyWaitSet=lock.newCondition();

    // 容量大小
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    // 阻塞获取
    public T take(){
        lock.lock();
        try{
            while(queue.isEmpty()){
                try{
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    // 阻塞添加
    public void put(T task){
        lock.lock();
        try {
            while(queue.size()>=capacity){
                try{
                    System.out.println("等待加入任务队列:"+task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列:"+task);
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    // 带超时时间的阻塞获取
    public T takeByTime(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos=unit.toNanos(timeout);
            while(queue.isEmpty()){
                try{
                    if(nanos<=0){
                        return null;
                    }
                    nanos=emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t=queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    // 带超时时间阻塞添加
    public boolean putByTime(T task,long timeout, TimeUnit unit){
        lock.lock();
        try {
            long nanos=unit.toNanos(timeout);
            while(queue.size()>=capacity){
                try{
                    if(nanos<=0){
                        return false;
                    }
                    System.out.println("等待加入任务队列:"+task);
                    nanos=fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列:"+task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    // 返回队列大小
    public int size(){
        lock.lock();
        try{
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    // 根据策略添加
    public void tryPut(RejectPolicy<T> rejectPolicy,T task){
        lock.lock();
        try{
            // 判断队列是否满
            if(queue.size()>=capacity){
                rejectPolicy.reject(this,task);
            }else{
                System.out.println("加入任务队列:"+task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

// 拒绝策略
// 采用策略模式
@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockQueue<T> queue,T task);
}

// 自定义线程池
class ThreadPool{
    // 任务队列
    private BlockQueue<Runnable> taskQueue;

    // 线程集合
    // Worker为内部类
    private HashSet<Worker> workers=new HashSet<Worker>();

    // 核心线程数
    private int coreSize;

    // 获取任务时的超时时间
    private long timeout;
    private TimeUnit timeUnit;

    // 拒绝策略
    private RejectPolicy<Runnable> policy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity, RejectPolicy<Runnable> policy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.policy = policy;
        this.taskQueue=new BlockQueue<>(queueCapacity);
    }

    // 执行任务
    public void execute(Runnable task){
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers){
            if(workers.size()<coreSize){
                Worker worker = new Worker(task);
                System.out.println("新增worker:"+worker+" ,task:"+task);
                workers.add(worker);
                worker.start();
            }else{
                taskQueue.tryPut(policy,task);
            }
        }
    }

    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // (1) 当 task 不为空,执行任务
            // (2) 当 task 执行完毕,再接着从任务队列获取任务并执行
            while(task!=null||(task= taskQueue.takeByTime(timeout,timeUnit))!=null){
                try{
                    System.out.println("正在执行:"+task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    task=null;
                }
            }
            synchronized (workers){
                System.out.println("worker 被移除:"+this);
                workers.remove(this);
            }
        }
    }
}

{
task=null;
}
}
synchronized (workers){
System.out.println(“worker 被移除:”+this);
workers.remove(this);
}
}
}
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2050711.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LangChain 实战演练:借助 LangChain SQL Agent 与 GPT 实现文档智能分析及交互

LangChain实战&#xff1a;利用LangChain SQL Agent和GPT进行文档分析和交互 我最近接触到一个非常有趣的挑战&#xff0c;涉及到人工智能数字化大量文件的能力&#xff0c;并使用户可以在这些文件上提出复杂的与数据相关的问题&#xff0c;比如&#xff1a; 数据检索问题&…

【qt】基于tcp的消息发送

我们需要实现客户端发消息&#xff0c;服务端接收消息 服务端界面新增接收消息 实现客户端发送和清空 发送数据需要将发送栏的信息转化为QByteArray,然后使用socket的write发送过去 实现服务端的接收 效果演示 20240818_111603 代码展示 server Widget.h #ifndef WIDGET_H …

Java的File类与IO流

目录 1. java.io.File类的使用 1.1 概述 1.2 构造器 1.3 常用方法 1、获取文件和目录基本信息 2、列出目录的下一级 3、File类的重命名功能 4、判断功能的方法 5、创建、删除功能 1.4 练习 2. IO流原理及流的分类 2.1 Java IO原理 2.2 流的分类 2.3 流的API 3. …

如何在 Windows/Mac/在线/iPhone/Android 上将 PDF 转换为 Word

PDF&#xff08;便携式文档格式&#xff09;是一种流行的格式&#xff0c;广泛用于在数字电子设备中呈现文档。输出文件小且兼容性强&#xff0c;使 PDF 如此受欢迎。但是&#xff0c;编辑 PDF 文件并非免费。您无需购买 PDF 编辑器&#xff0c;而是可以将 PDF 转换为 Word 进行…

「OC」NSPredicate —— 使用谓词过滤元素

「OC」NSPredicate —— 使用谓词过滤元素 文章目录 「OC」NSPredicate —— 使用谓词过滤元素前言介绍常见用法**比较运算符****逻辑运算符****字符串比较运算符****聚合运算符****用于字典或者类当中****格式说明符&#xff08;占位符&#xff09;** 实际运用总结参考文章 前…

05创建型设计模式——原型模式

一、原型模式简介 原型模式&#xff08;Prototype Pattern&#xff09;模式是一种对象创建型模式&#xff0c;它采取复制原型对象的方法来创建对象的实例。使用原型模式创建的实例&#xff0c;具有与原型一样的数据。 1&#xff09;由原型对象自身创建目标对象。换句话说&…

python基础语法 010 类和对象-3 方法

1.3 方法 属性表示是一个类当中的成员或类的特征&#xff0c;而方法是&#xff1f;&#xff1f; 方法&#xff1a;表示类、对象的行为&#xff0c;方法本质上是函数&#xff0c;是一个特殊的函数 属性名称一般为名词&#xff0c;方法名称一般为动词 1.3.1 方法 VS 属性 1、…

24/8/17算法笔记 DDPG算法

深度确定性策略梯度&#xff08;DDPG&#xff09;算法是一种用于解决连续动作空间强化学习问题的算法。它结合了确定性策略梯度&#xff08;DPG&#xff09;和深度学习技术的优点&#xff0c;通过Actor-Critic框架进行策略和价值函数的近似表示。DDPG算法的关键组成部分包括经验…

【RAG综述】北京大学检索增强技术综述

RAG for AIGC ​ 图 1 描述了一个典型的 RAG 过程。给定一个输入查询&#xff0c;检索器识别相关的数据源&#xff0c;检索到的信息与生成器交互以改进生成过程。根据检索结果如何增强生成&#xff0c;有几种基础范式&#xff08;简称基础&#xff09;&#xff1a;它们可以作为…

STM32的蜂鸣器

蜂鸣器分为有源蜂鸣器和无源蜂鸣器。 有源蜂鸣器&#xff1a;内部有震荡源&#xff0c;只要通电即可自动发出固定频率的声音。&#xff08;频率固定无 法控制音色&#xff09; 。 无源蜂鸣器&#xff1a;内部无震荡源&#xff0c;需要外部脉冲信号驱动发声&#xff0c;声音频…

《机器学习》 线性回归 一元、多元 推导 No.3

一、什么是线性回归 线性回归是一种用于预测连续数值的机器学习算法。它基于输入特征与目标变量之间的线性关系建立了一个线性模型。线性回归的目标是找到最佳拟合直线&#xff0c;以最小化预测值与实际值之间的误差。这个线性模型可以用来进行预测和推断。 线性回归的模型可以…

SpringBoot Profile多环境配置及配置优先级

【SpringBoot学习笔记 三】Profile多环境配置及配置优先级_profiles队列中的优先值-CSDN博客 Profile激活方式 但是我们发现一个问题&#xff0c;就是每次切换环境还需要去配置里指定&#xff0c;然后通过修改dev为test或prod来切换项目环境 , 这样做的话每次切换环境都要重新改…

前端面试——如何判断对象和数组

给你一个值&#xff0c;如何判断其是对象还是数组&#xff1f;&#xff1f;&#xff1f; 我们先给出数据 var lists [1,2,3,4,5]var objs {length:5 } 我们分别尝试如下五种方法 console.log((✘)使用length,lists.length,objs.length); console.log((✔)使用isArray,Arr…

【已成功EI检索】第三届机电一体化技术与航空航天工程国际学术会议(ICMTAE 2023)

重要信息 大会官网&#xff1a;www.icmtae.org 大会时间&#xff1a;2023年9月15-17日 大会地点&#xff1a;中国-江西南昌理工学院&#xff08;南昌市青山湖区经济技术开发区英雄大道901号&#xff09; 接受/拒稿通知&#xff1a;投稿后1周内 收录检索&#xff1a;EI 和 …

Vulkan 学习(4)---- Vulkan 逻辑设备

目录 Vulkan Logical Device OverView逻辑设备创建VkDeviceQueueCreateInfoDeviceExtension获取DeviceQueue参考代码 Vulkan Logical Device OverView 在 Vulkan 中&#xff0c;逻辑设备(Logical Device)是与物理设备(Physical Device)交互的接口,它抽象了对特定GPU(物理设备)…

CDD数据库文件制作(八)——服务配置(0x85)

目录 1.子功能创建2.会话切换配置/安全等级配置2.1.根据诊断调查表进行信息提取2.2.会话转换配置/安全等级配置3.寻址方式信息提取/禁止肯定响应位(SPRMIB)信息3.1.寻址方式/禁止肯定响应位(SPRMIB)配置4.否定响应码信息提取4.1.否定响应码配置按照诊断调查表中对0x85服务的…

PX30 Android8.1适配AIC8800 wifi

wifi驱动生成ko文件 生成后 通过wpa_supplicant加载参数 external/wpa_supplicant_8/wpa_supplicant/main.c int main(int argc, char *argv[]) {int ret -1;char module_type[20]{0};wpa_printf(MSG_INFO,"argc %d\n",argc);if(argc 2) {if (wifi_type[0] 0) …

【MySQL】数据库基础(表的操作)

目录 一、创建表 二、查看表结构 三、修改表 3.1 添加新列 3.2 修改列属性 3.3 删除列属性 3.4 修改表名 3.5 向表中插入 3.6 修改列名 四、删除表 一、创建表 语法&#xff1a; CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) …

docker容器安全加固参考建议——筑梦之路

这里主要是rootless的方案。 在以 root 用户身份运行 Docker 会带来一些潜在的危害和安全风险&#xff0c;这些风险包括&#xff1a; 容器逃逸&#xff1a;如果一个容器以 root 权限运行&#xff0c;并且它包含了漏洞或者被攻击者滥用&#xff0c;那么攻击者可能会成功逃出容器…

车载camera avm框图

一、关键词介绍: POC: power on coax LVDS: Low-Voltage Differential Signaling GMSL:Gigabit Multimedia Serial Link AVM: Around View Monitor Serdes:DeSerializer、Serializer DVP:Interface with ISP and Sensor: DVP(Digital Video Port) 二、车载camera avm…