Java自定义一个线程池

news2025/1/11 14:10:51

线程池图解 

线程池与主线程之间通过一个阻塞队列来平衡任务分配,阻塞队列中既可以满足线程等待,又要接收主线程的任务。

线程池实现

使用一个双向链表实现任务队列

 创建任务队列

//阻塞队列
public class BlockingQueue<T> {
    //双线链表
    private Deque<T> queue = new ArrayDeque();
    //锁
    private ReentrantLock lock =new ReentrantLock();
    //生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    //消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    //容器容量大小
    private int capacity;

    //阻塞获取
    public T pull(long timeOut, TimeUnit unit){
        lock.lock();
        //判断链表中是否存在任务待处理
        try {
            //将尝试时间转化为纳秒
            long nanos = unit.toNanos(timeOut);
            while (queue.isEmpty()){
                try {
                    if (nanos<0){
                        return null;
                    }
                    //awaitNanos返回结果是最大等待时间减去睡眠时间的剩余时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    //阻塞添加
    public void put(T element){
        lock.lock();
        try{
            while(queue.size()==capacity){
                //说明满了,暂时无法添加新的任务
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    //获取队列任务数量
    public int size(){
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }
}

创建线程池 

public class ThreadPool {
    //任务队列
    private BlockingQueue<Runnable> blockingQueue;
    //线程集合
    private HashSet<Worker> workers = new HashSet();
    //核心线程数
    private int coreNum;
    //超时时间
    private long timeOut;
    private TimeUnit unit;

    public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity) {
        System.out.println("初始化线程池");
        this.coreNum = coreNum;
        this.timeOut = timeOut;
        this.unit = unit;
        this.blockingQueue = new BlockingQueue<>(queueCapacity);
    }

    //线程执行任务
    public void execute(Runnable task) {
        //当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueue
        synchronized (workers) {
            if (workers.size() < coreNum) {
                Worker worker = new Worker(task);
                System.out.println("新增worker"+worker);
                workers.add(worker);
                worker.start();
            } else {
                System.out.println("从消息队列中获取task");
                blockingQueue.put(task);
            }
        }
    }

    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            while (task != null || (task = blockingQueue.pull(timeOut, unit)) != null) {
                try {
                    System.out.println("Worker执行任务");
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers){
                System.out.println("Worker执行完毕"+this);
                workers.remove(this);
            }
        }
    }
}

测试 

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,3000, TimeUnit.MILLISECONDS,5);
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产任务:"+j);
            });
        }
    }
}

初始化线程池

新增workerThread[Thread-0,5,main]

新增workerThread[Thread-1,5,main]

新增workerThread[Thread-2,5,main]

Worker执行任务

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@7ba4f24f

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@3b9a45b3

加入任务队列TheadPool.Test$$Lambda$1/1078694789@7699a589

加入任务队列TheadPool.Test$$Lambda$1/1078694789@58372a00

加入任务队列TheadPool.Test$$Lambda$1/1078694789@4dd8dc3

等待加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736

生产任务:2

生产任务:1

生产任务:0

Worker执行任务

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@6d03e736

Worker执行任务

加入任务队列TheadPool.Test$$Lambda$1/1078694789@378bf509

生产任务:3

生产任务:4

生产任务:5

Worker执行任务

Worker执行任务

Worker执行任务

生产任务:6

生产任务:8

生产任务:7

Worker执行任务

Worker执行完毕Thread[Thread-1,5,main]

Worker执行完毕Thread[Thread-0,5,main]

生产任务:9

Worker执行完毕Thread[Thread-2,5,main]

添加拒绝策略

上面测试中,有一点不友好的是,当任务队列满了之后,再向其中添加任务时,主线程会死等任务添加成功。

对此我们可以选择多种解决方案

  • 死等
  • 添加超时时间
  • 让调用者方式执行
  • 让调用者抛出异常
  • 让调用者自己执行

创建拒绝策略

@FunctionalInterface
public interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue,T task);
}

修改线程池的执行方法 

	//添加属性
	private RejectPolicy rejectPolicy;
	//构造方法
	public ThreadPool(int coreNum, long timeOut, TimeUnit unit, int queueCapacity, RejectPolicy rejectPolicy) {
        System.out.println("初始化线程池");
        this.coreNum = coreNum;
        this.timeOut = timeOut;
        this.unit = unit;
        this.blockingQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

	//线程执行任务
    public void execute(Runnable task) {
        //当线程数没有超过coreNum时,直接交给Worker对象执行,如果超过了coreNum数,则加入BlockingQueue
        synchronized (workers) {
            if (workers.size() < coreNum) {
                Worker worker = new Worker(task);
                System.out.println("新增worker" + worker);
                workers.add(worker);
                worker.start();
            } else {
//                System.out.println("从消息队列中获取task");
//                blockingQueue.put(task);
                blockingQueue.tryPut(rejectPolicy,task);
            }
        }
    }

 任务队列添加方法

    public void tryPut(RejectPolicy rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                //如果满了,需要调用拒绝策略
                rejectPolicy.reject(this,task);
            } else {
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

测试 

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,3000,
                TimeUnit.MILLISECONDS,5,
                (queue,task)->{
                    //由调用者决定任务队列满了之后如何处理后续任务
                    queue.put(task);//死等
                    queue.offer(task,1000,TimeUnit.MILLISECONDS);//超时返回
                                    //啥也不干,直接丢弃任务
                    task.run();//调用者自己执行
                    throw new RuntimeException("任务秩序异常");//抛出异常
                });
        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产任务:"+j);
            });
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1260127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【一周AI简讯】OpenAI奥特曼王者归来,马斯克AI模型Grok下周开放测试,ChatGPT语音对话功能向所有用户免费开放

OpenAI奥特曼王者归来&#xff0c;董事会改组 终于&#xff0c;经历大约5天的极限拉扯&#xff0c;年底AI界吃瓜大戏落下帷幕&#xff0c;奥特曼确认回归。 ChatGPT语音对话功能向所有用户免费开放 ChatGPT 语音输入最初于 9 月份推出&#xff0c;标题是“ChatGPT 现在可以看…

[Spring ~必知必会] Bean 基础常识汇总

文章目录 Bean 相关到底什么是beanFactorybeanFactory能干啥ApplicationContext是什么ApplicationContext的功能比 BeanFactory多了什么 容器的实现BeanFactory的实现ApplicationContext的实现xml 配置配置类配置 Bean 的生命周期3.1 Bean 的常见的后处理器测试代码总结 3.2 工…

【中间件】配置中心中间件intro

中间件middleware 内容管理 why use 配置中心配置中心feature配置中心develop主流配置中心Apollo浅谈 本文从理论上介绍一下服务化中服务治理系统中的配置中心的理论并浅析Apllo 配置在生产中是很关键的一个部分&#xff0c;cfeng在work中遇到几次配置问题引发的问题&#xff0…

为何要隐藏IP地址?网络上哪些行为需要隐藏IP和更换IP?

网络已经成为现代人生活的重要组成部分&#xff0c;人们在网络上交流、学习、娱乐、购物等。但是&#xff0c;在享受网络带来的便利时&#xff0c;我们也需要时刻保护自己的隐私和安全。其中&#xff0c;IP地址作为网络通信中的重要标识&#xff0c;如何隐藏以及在哪些情况下需…

电商数据采集中如何采集1688平台商品详情SKU数据

一、背景介绍 1688.com是阿里旗下国内最大的B2B批发采购平台&#xff0c;1688分销客是依托此平台的官方营销平台&#xff0c;通过此平台API接口的接入推广平台商家的商品&#xff0c;按照商品成交金额的一定比例获得佣金。可以调用1688平台上的商品详情&#xff0c;SKU数据&…

西南科技大学电路分析基础实验A1(一阶电路的设计)

目录 一、实验目的 二、实验设备 三、预习内容(如:基本原理、电路图、计算值等) 四、实验数据及结果分析(预习写必要实验步骤和表格) 1. 观测一阶电

万字解析设计模式之观察者模式、中介者模式、访问者模式

一、观察者模式 1.1概述 观察者模式是一种行为型设计模式&#xff0c;它允许一个对象&#xff08;称为主题或可观察者&#xff09;在其状态发生改变时&#xff0c;通知它的所有依赖对象&#xff08;称为观察者&#xff09;并自动更新它们。这种模式提供了一种松耦合的方式&…

什么是高性能计算岗位

最近有小伙伴咨询什么是高性能计算岗位。 1、什么是高性能计算 高性能计算&#xff0c;在很多招聘信息中也会被标注为 HPC&#xff0c;是 High Performance Computing 的缩写。 目前很多 AI 公司或者从事 AI 的部门招聘都有这个岗位需求&#xff0c;我从某聘上截取了几个有代…

VBA高级应用30例:Ribbon(功能区)的介绍

《VBA高级应用30例》&#xff08;版权10178985&#xff09;&#xff0c;是我推出的第十套教程&#xff0c;教程是专门针对高级学员在学习VBA过程中提高路途上的案例展开&#xff0c;这套教程案例与理论结合&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以…

【Proteus仿真】【STM32单片机】感应水龙头设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真STM32单片机控制器&#xff0c;使用LCD1602液晶模块、HCSR04超声波等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显示超声波模块检测的距离&#xff0c;若检测距离小…

二十九、微服务案例完善(数据聚合、自动补全、数据同步)

目录 一、定义 二、分类 1、桶(Bucket)聚合: 2、度量(Metric&#xff09;聚合: 3、管道聚合&#xff08;Pipeline Aggregation&#xff09;&#xff1a; 4、注意&#xff1a; 参与聚合的字段类型必须是: 三、使用DSL实现聚合 聚合所必须的三要素&#xff1a; 聚合可配…

数字系列——数字经济

数字经济是全球经济未来发展方向&#xff0c;正在成为重组全球要素资源、重塑全球经济结构、改变全球竞争格局的关键力量。都知道数字经确实很重要&#xff0c;但有些人还傻傻搞不懂数字经济到底是什么&#xff1f;小编今天就给大家捋一捋。 什么是数字经济&#xff1f; 数字经…

tabs切换,组件库framework7

IOS和安卓兼容的背景下&#xff0c; 可以使用&#xff1a;framework7.io文档 效果展示&#xff1a; 代码&#xff1a; <!-- Top Tabs --> <div class"tabs tabs-top"><div class"tab tab1 active">...</div><div class"…

高级/进阶”算法和数据结构书籍推荐

“高级/进阶”算法和数据结构书籍推荐《高级算法和数据结构》 高级算法和数据结构 为什么要选择本书 谈及为什么需要花时间学算法&#xff0c;我至少可以列举出三个很好的理由。 (1)性能&#xff1a;选择正确的算法可以显著提升应用程序的速度。仅就搜索来说&#xff0c;用二…

jQuery_07 函数的使用

在jQuery中&#xff0c;如何使用函数呢&#xff1f; 1.基本函数 函数(常用的) 其实有很多函数&#xff0c;但是我们只需要掌握常用的函数即可 1.val 操作dom对象的value val() 没有参数 获取dom数组中第一个dom对象的value值 val(value) 有参数 设置dom数组中所有dom对象的…

毫米波雷达DOA角度计算-----DBF算法

DBF算法实现程序如下&#xff1a; 输入&#xff1a; parameter 是 毫米波雷达的参数设置。 antVec 是 目标点的8个虚拟天线的非相参积累数据。 function [angle,doa_abs] dbfMethod(parameter,antVec)txAntenna parameter.txAntenna; % 发射天线 [1 1]rxAntenna para…

使用功能点估算法进行估算,5大注意事项

功能点估算法在软件项目管理中起着重要的作用&#xff0c;其有助于项目的早期估算&#xff0c;更准确地预测项目成本和进度&#xff0c;有助于更好地理解项目规模&#xff0c;并做出相应的资源分配和进度安排。如果不使用此估算方法&#xff0c;可能会导致项目范围不清晰&#…

R语言期末考试复习二

上篇文章的后续&#xff01;&#xff01;&#xff01;&#xff01; http://t.csdnimg.cn/sqvYD 1.给向量vec1设置名为"A","B","C","D","E","F","G"。 2.将矩阵mat1的行名设置为"Row1"&#…

Unreal Engine 学习笔记 (4)—— 多方向动画

1.创建混合空间 1.设置水平方向命名为Direction表示行进方向 -45,300表示向左前方45度方向行走-90,300表示向正左方90度方向行走-135,300表示向左后方45度方向行走-180,300表示向正后方行走右侧方向动画与上述左侧使用同样方法设置Run动画与Walk动画使用同样方法设置 2. 设置…

【网安AIGC专题】46篇前沿代码大模型论文、24篇论文阅读笔记汇总

网安AIGC专题 写在最前面一些碎碎念课程简介 0、课程导论1、应用 - 代码生成2、应用 - 漏洞检测3、应用 - 程序修复4、应用 - 生成测试5、应用 - 其他6、模型介绍7、模型增强8、数据集9、模型安全 写在最前面 本文为邹德清教授的《网络安全专题》课堂笔记系列的文章&#xff0c…