手写线程池

news2025/1/4 6:30:57

为什么要使用线程池?

  • 降低资源的消耗,降低线程创建和销毁的资源消耗;
  • 降低响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间;
  • 提高线程的可管理性

线程池的核心思想:线程复用,同一个线程可以被重复停用,来处理多个任务。

实现流程

在这里插入图片描述
这个手写线程池的实现也非常简单,只会体现出核心流程:包括:

  1. 有n个一直在运行的线程,相当于我们创建线程池时允许的线程池大小;
  2. 把线程提交给线程池运行;
  3. 如果运行线程池已满,则把线程放入队列中;
  4. 最后当有空闲时,则获取队列中线程进行运行。

代码实现

阻塞队列的实现

  • 阻塞队列主要存放任务,有容量限制
  • 阻塞队列提供添加和删除任务的API,如果超过容量,阻塞不能添加任务,如果没有任务,阻塞无法获取任务
public class BlockingQueue<T> {

    private Logger logger = LoggerFactory.getLogger(BlockingQueue.class);

    // 容量
    private Integer capacity;

    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 尝试添加任务
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 如果队列超过容量
            if (deque.size() > capacity) {
                logger.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            } else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    // 阻塞的方式添加任务
    public void put(T task) {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.size() >= capacity) {
                logger.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            logger.debug("task add successfully");
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取任务
    public T take() {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.isEmpty()) {
                try {
                    logger.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            logger.debug("take task successfully");
            // 从队列中获取元素
            return task;
        } finally {
            lock.unlock();
        }
    }


}
  • put()方法是向阻塞队列中添加任务
  • take()方法是向阻塞队列中获取任务

线程池消费端实现

  1. 定义执行器接口
public interface Executor {

    // 提交任务执行
    void execute(Runnable task);

}
  1. 定义线程池类实现该接口
public class ThreadPool implements Executor {

    private Logger logger = LoggerFactory.getLogger(ThreadPool.class);

    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 核心工作线程数
    private int coreSize;

    // 工作线程集合
    private final Set<Worker> workers = new HashSet<>();

    // 拒绝策略
    private RejectPolicy rejectPolicy;

    public ThreadPool(int coreSize, int capacity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    // 提交任务执行
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                // taskQueue.put(task);
                // 调用tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    // 工作线程,对执行的任务做了一层包装经验
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 如果任务不为空,或者可以从队列中获取任务
            while (Objects.nonNull(task) || Objects.nonNull(task = taskQueue.take())) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 执行完后,设置任务为空
                    task = null;
                }
            }

            // 移除工作线程
            synchronized (workers) {
                logger.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}
  • Worker类是工作线程类,包装了执行任务,里面实现了从队列获取任务,然后执行任务
  • execute()方法的实现中,如果工作线程数量小于阈值的话,直接创建新的工作线程,否则将任务添加到队列中

获取任务超时设计

目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略

public class TimeoutBlockingQueue<T> {

    private Logger logger = LoggerFactory.getLogger(TimeoutBlockingQueue.class);

    // 容量
    private int capacity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public TimeoutBlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 带超时时间的获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将timeout统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回的是剩余的等待时间,更改nanos的值,使虚假唤醒的时候可以继续等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的增加
    public boolean offer(T task, long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将timeout统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    // 更新剩余需要等待的时间
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            logger.debug("加入任务队列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }
}

拒绝策略设计

目前的实现还是有个漏洞,无法自定义任务超出出阈值的一个拒绝策略,我们可以通过函数式编程+策略模式去实现

  1. 定义策略模式的函数式接口
@FunctionalInterface
public interface RejectPolicy<T> {

    // 拒绝策略的接口
    void reject(BlockingQueue<T> queue, T task);

}
  1. 添加函数式接口的调用入口
    我们可以在阻塞队列添加任务新加一个api,添加任务如果超过容量,调用函数式接口
// 尝试添加任务
  public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
      lock.lock();
      try {
          // 如果队列超过容量
          if (deque.size() > capacity) {
              logger.debug("task too much, do reject");
              rejectPolicy.reject(this, task);
          } else {
              deque.offer(task);
              emptyWaitSet.signal();
          }
      } finally {
          lock.unlock();
      }
  }
  1. 演示
public class TestThreadPool1 {

    private static Logger LOGGER = LoggerFactory.getLogger(TestThreadPool1.class);

    public static void main(String[] args) throws InterruptedException {
        Executor executor = new ThreadPool(2, 4, new RejectPolicy() {
            @Override
            public void reject(BlockingQueue queue, Object task) {
                LOGGER.error("task too much");
            }
        });
        // 提交任务
        for (int i = 0; i < 6; i++) {
            final int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    LOGGER.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }
        Thread.sleep(10000);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/182838.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【从零带你玩转Linux】权限及相关指令

前言 &#x1f3e0;个人主页&#xff1a;泡泡牛奶 &#x1f335;系列专栏&#xff1a;从零带你玩转Linux 本期将会让大家了解什么是权限&#xff0c;权限该如何理解&#xff0c;以及Linux中一些有关权限设置的指令操作&#xff0c;让你在Linux使用中更加得心应手(&#xff5e;&…

Git详细使用文档

Git 1.项目存在哪些问题 1.项目安全性太低2.项目很难协同开发3.项目无端报错4.项目版本混乱 2.Git概念 Git是一个分布式的版本控制及协同开发工具 3.版本控制工具分类 3.1.集中式版本控制工具 cvs svn ​ 集中式版本控制系统&#xff0c;版本库是集中存放在中央服务器的&am…

LeetCode 刷题系列 -- 143. 重排链表

给定一个单链表 L 的头节点 head &#xff0c;单链表 L 表示为&#xff1a;L0 → L1 → … → Ln - 1 → Ln请将其重新排列后变为&#xff1a;L0 → Ln → L1 → Ln - 1 → L2 → Ln - 2 → …不能只是单纯的改变节点内部的值&#xff0c;而是需要实际的进行节点交换。示例 1&a…

深度卷积神经网络、池化层、为什么使用卷积、残差网络

目录1.深度卷积神经网络(a deep convolutional neural network)输入图像的维度是&#xff0c;如果&#xff0c;计算输出图像维度公式&#xff1a;。s表示步幅&#xff0c;p表示填充的层数。filters的通道数是和输入图像的通道数保持一致的。分析上图案例&#xff1a;第一层卷积…

leetcode刷题记录总结-5.双指针专题

文章目录一、过滤保序27.移除元素题解题解1&#xff1a;暴力解法题解2&#xff1a;双指针法[26. 删除有序数组中的重复项](https://leetcode.cn/problems/remove-duplicates-from-sorted-array/)题解[283. 移动零 ](https://leetcode.cn/problems/move-zeroes/description/)题解…

借助ChatGPT学习ROS2机器人编程

很好用&#xff0c;很方便。简单发布和订阅代码直接能跑的。如下&#xff1a;学习效率指数提升&#xff0c;果然数字生产力之神&#xff01;空洞的问题和回复&#xff1a;如何在一个月时间内掌握ROS2机器人操作系统的全部核心内容&#xff1f;要在一个月时间内掌握ROS2机器人操…

源码启动MeterSphereV2.6版本注意事项(三)

前言 之前写过一篇MeterSphereV2.3版本Mac本地启动详细教程&#xff08;含常见错误&#xff09;本地启动V2.3版本的。时隔3个月&#xff0c;MeterSphere已经到了V2.6 版本了&#xff0c;很多小伙伴私信我让我写一篇V2.6 版本的启动&#xff0c;刚好趁过年有时间&#xff0c;给…

Spring和SpringMvc详细讲解

&#x1f3c6;今日学习目标&#xff1a; &#x1f340;Spring和SpringMvc详细讲解 ✅创作者&#xff1a;林在闪闪发光 ⏰预计时间&#xff1a;30分钟 &#x1f389;个人主页&#xff1a;林在闪闪发光的个人主页 &#x1f341;林在闪闪发光的个人社区&#xff0c;欢迎你的加入: …

金仓数据库单表与多表查询

单表与多表查询 单引号与双引号 针对有空格、特殊字符、数字开头的字段别名必须加双引号 针对标量字符串表达式必须用加单引号 连接运算 字符串的拼接运算 字符串拼接经常用于生成SQL脚本 删除exam模式下所有的表&#xff0c;可以通过拼接生成如下批量的SQL select drop t…

scipy learn sharpen filter

文章目录1. 问题2. 方案2.1 学习一个 5 * 5的滤波核2.2 学习分通道的滤波核 以及 分离卷积3. 分析根据图像对学习滤波核之前研究过根据图像对生成3Dlut, 以及生成颜色变换系数 这里我们利用图像对学习 滤波 1. 问题 遇到的问题是这样的&#xff0c;已知一个图像和经过邻域滤…

爱快软路由对笔记本实现网络唤醒

本人有一台爱快软路由作为动态域名和端口映射&#xff0c;实现通过阿里域名远程访问内网设备。一台X201笔记本连接在软路由上。由于X201电池已经卸下无法实现来电开机&#xff0c;只能通过爱快e云APP手动实现网络唤醒&#xff0c;感觉非常麻烦。爱快云web端也不能实现开机唤醒X…

学长教你学C-day11-C语言结构体、枚举、联合体

“前面我们学习了数组&#xff0c;从数据类型来看&#xff0c;数组就是具有相同数据类型的变量集合&#xff1b;从内存空间来看&#xff0c;数组就是一串由相同大小的数据空间组成的较大的内存空间。那么结构体是什么呢&#xff1f;从内存角度讲&#xff0c;结构体也是一块地址…

PEG化芘衍生物——Pyrene-PEG-Acid,Pyrene-PEG-COOH,芘丁酸-聚乙二醇-羧基

一&#xff1a;产品描述 1、名称 英文&#xff1a;Pyrene-PEG-COOH&#xff0c;Pyrene-PEG-Acid 中文&#xff1a;芘丁酸-聚乙二醇-羧基 2、CAS编号&#xff1a;N/A 3、所属分类&#xff1a; Carboxylic acid PEG Pyrene PEG 4、分子量&#xff1a;可定制2000、1000、340…

2022个人年度总结:别让内心的烦躁和焦虑,占据本就不多的热情。

在从毕业一直到现在&#xff0c;我都会写一篇关于自己的从技术、商业、人情世故以及未来展望的博文&#xff0c;以至于归纳每个时期的自己&#xff0c; 走在互联网开发的边缘&#xff0c;不得不抽出时间鞭策自己学习新知识&#xff0c;未知的知识是 充满好奇的&#xff0c; 就好…

Makefile学习⑧:Makefile中通用部分做公共头文件

Makefile学习⑧&#xff1a;Makefile中通用部分做公共头文件 创建2个文件夹Demo1和Demo2,2个文件夹中的文件完全一样&#xff0c;但是命名不一样。 博主创建的如下&#xff0c;内容沿用前几章的函数文件。 这两个Makefile中的内容除了目标文件名和依赖文件名不一致&#xff0…

轻松实现一个Python+Selenium的自动化测试框架

首先你得知道什么是Selenium&#xff1f; Selenium是一个基于浏览器的自动化测试工具&#xff0c;它提供了一种跨平台、跨浏览器的端到端的web自动化解决方案。Selenium主要包括三部分&#xff1a;Selenium IDE、Selenium WebDriver 和Selenium Grid。 Selenium IDE&#xff…

知识抽取-实体及关系抽取

信息抽取的三个最重要&#xff0c;最受关注的子任务&#xff1a; 实体抽取 命名实体识别&#xff0c;包括实体检测&#xff08;find)和分类&#xff08;classify) 关系抽取。 通常我们所述的三元组抽取&#xff0c; 一个谓词&#xff08;predicate)带2个形参&#xff08;argum…

MySQL学习记录(9)存储引擎

文章目录6、InnoDB存储引擎6.1、逻辑存储结构6.2、架构6.2.1、概述6.2.2、内存结构6.2.3、磁盘结构6.2.4、后台线程6.3、事务原理6.3.1、事务基础6.3.2、redo log日志6.3.3、undo log日志6.4、MVCC6.4.1、基本概念6.4.2、记录中隐藏字段6.4.3、undo log日志6.4.4、readview6.4.…

算法基础(二):数组知识点及题型讲解

算法基础&#xff08;二&#xff09;&#xff1a;数组知识点及题型讲解1 数组定义2 Python数组常用操作2.1 创建数组2.2 添加元素2.3 访问元素2.4 更新元素2.5 删除元素2.6 获取数组长度2.7 遍历数组2.8 查找某个元素2.9 数组排序3 力扣题目训练一些算法基础知识点和leetcode题…

汇编语言1基础知识

机器语言 机器语言是机器指令的集合&#xff0c;即计算机可以执行的指令。 机器指令由一连串二进制数字构成&#xff0c;计算机中用高低电平表示。高电平为1&#xff0c;低电平为0。 早期通过在纸带上打孔输入计算机运算。打孔为1&#xff0c;不打孔为0。 上图出自剧版三体第…