Java JUC(四) 自定义线程池实现与原理分析

news2024/11/23 15:17:43

目录

一. 阻塞队列 BlockingQue

二. 拒绝策略 RejectPolicy

三. 线程池 ThreadPool 

四. 模拟运行


在 Java基础(二) 多线程编程 中,我们简单介绍了线程池 ThreadPoolExecutor 的核心概念与基本使用。在本文中,我们将基于前面学习的各种锁与同步工具来实现自定义的线程池,同时来探究和分析 Java 线程池的基本原理。

一. 阻塞队列 BlockingQue

在线程池的生态中,阻塞队列是至关重要的一环,其用于实现任务与工作线程之间的平衡(类似于生产者/消费者模式)。 在此处,我们实现了一个自定义的阻塞队列 BlockingQue,其代码如下:

// 阻塞队列实现
public class BlockingQue<T> {
    // 1. 任务队列
    private Deque<T> queue;
    // 2. 锁
    private ReentrantLock lock;
    // 3. 生产者条件变量
    private Condition fullWaitSet;
    // 4. 消费者条件变量
    private Condition emptyWaitSet;
    // 5. 容量
    private int capacity;

    public BlockingQue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.capacity = capacity;
        // ArrayDeque: 基于 Object[] 实现,可以自动扩容
        this.queue = new ArrayDeque<>();
        this.lock = new ReentrantLock(fair);
        // 读写共用一把锁
        this.fullWaitSet = lock.newCondition();
        this.emptyWaitSet = lock.newCondition();
    }

    public BlockingQue(int capacity) {
        this(capacity, false);
    }

    // 阻塞添加
    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                fullWaitSet.await();
            }
            queue.addLast(element);
            // 唤醒消费线程
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 非阻塞添加
    public boolean offer(T element) {
        lock.lock();
        try {
            if (queue.size() == capacity)
                return false;
            queue.addLast(element);
            // 唤醒消费线程
            emptyWaitSet.signal();
            return  true;
        } finally {
            lock.unlock();
        }
    }

    // 超时阻塞添加
    public boolean offer(T element, long timeout, TimeUnit unit) throws InterruptedException {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.size() == capacity) {
                // 已经超时则返回 false
                if (nanos <= 0)
                    return false;
                nanos = fullWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)
            }
            queue.addLast(element);
            // 唤醒消费线程
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    // 阻塞获取
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                emptyWaitSet.await();
            }
            T element = queue.removeFirst();
            // 唤醒生产线程
            fullWaitSet.signal();
            return element;
        } finally {
            lock.unlock();
        }
    }

    // 超时阻塞获取
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        lock.lock();
        try {
            // 将 timeout 统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                // 已经超时则返回 null
                if(nanos <= 0){
                    return null;
                }
                nanos = emptyWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)
            }
            T element = queue.removeFirst();
            // 唤醒生产线程
            fullWaitSet.signal();
            return element;
        }finally {
            lock.unlock();
        }
    }

    //获取大小
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

可以看出,上述代码使用了 Deque 作为元素存储容器,但若将 Deque 换成 Object[] 数组,则其基本就是 ArrayBlockingQueue 的实现源码。在实际工作中,若要实现自定义阻塞队列,我们只需要实现 BlockingQueue<E> 接口及其抽象方法即可。

package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;

public interface BlockingQueue<E> extends Queue<E> {

    boolean add(E e);


    boolean offer(E e);


    void put(E e) throws InterruptedException;


    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;


    E take() throws InterruptedException;


    E poll(long timeout, TimeUnit unit) throws InterruptedException;


    int remainingCapacity();


    boolean remove(Object o);


    public boolean contains(Object o);


    int drainTo(Collection<? super E> c);


    int drainTo(Collection<? super E> c, int maxElements);
}

二. 拒绝策略 RejectPolicy

在线程数量已满且阻塞队列已满的情况下,主线程则会因为无法放置任务而一直阻塞等待,因此我们需要拒绝策略来处理这种溢出情况。拒绝策略一般定义为接口,并允许我们自定义策略,其代码如下:

// 拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {
    void reject(BlockingQue<T> queue, T task);
}

一般接口方法需要提供阻塞队列以及当前任务两个参数,并支持函数式编程;常见的拒绝策略包括:阻塞等待、放弃执行、抛出异常、由调用线程执行等(后续会实现)。在实际工作中,Java已经为我们提供了拒绝策略的顶层设计,若想自定义拒绝策略,我们只需实现 RejectedExecutionHandler 接口并实现其 rejectedExecution 抽象方法即可。

package java.util.concurrent;

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

三. 线程池 ThreadPool 

在本节,我们将实现一个简单的自定义线程池,其只包含核心线程数,并且规定线程池的运行规则如下:

1.若当前线程数 < corePoolSize,则新建线程处理任务;

2.若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待;

3.若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略;

/**
 * 自定义线程池实现:
 *  1. 若当前线程数 < corePoolSize,则新建线程处理任务
 *  2. 若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待
 *  3. 若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略
 */
public class ThreadPool {
    // 任务队列
    private BlockingQue<Runnable> taskQueue;
    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 锁
    private ReentrantLock mainLock = new ReentrantLock();
    // 核心线程数
    private int coreSize;
    // 获取任务的超时时间(allowThreadTimeOut=true时有效)
    private long timeOut;
    // 时间单位(allowThreadTimeOut=true时有效)
    private TimeUnit timeUnit;
    // 是否允许线程超时等待(默认允许)
    private boolean allowThreadTimeOut = true;
    // 拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }
    // 设置 allowThreadTimeOut 参数
    public void setAllowThreadTimeOut(boolean allowThreadTimeOut) {
        this.allowThreadTimeOut = allowThreadTimeOut;
    }

    // 执行任务 task
    public void execute(Runnable task){
        mainLock.lock();
        try{
            if(workers.size() < coreSize){
                // 添加核心线程
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            }else if(!taskQueue.offer(task)){
                // 执行拒绝策略
                rejectPolicy.reject(taskQueue, task);
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    // 工作线程类
    private class Worker extends Thread{
        // 执行任务
        private Runnable task;

        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            // 获取任务
            while(task != null || (task = getTask()) != null){
                try{
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            // worker 线程终止
            synchronized (workers){
                // 移除 worker
                workers.remove(this);
            }
        }
    }

    // 从阻塞队列中获取等待任务(提供给Worker的钩子方法)
    private Runnable getTask(){
        for(;;){
            try {
                Runnable r = allowThreadTimeOut ? taskQueue.poll(timeOut, timeUnit) : taskQueue.take();
                return r;
            } catch (InterruptedException e) {
                // 若被中断则重新等待
                e.printStackTrace();
            }
        }
    }
}

Java ThreadPoolExecutor 的实现相比我们自定义的线程池更加复杂和安全(增加了线程池状态的维护、最大线程数的逻辑、线程池终止方法等),但在核心思想的实现上基本一致,因此这段自定义代码的实现可以帮助我们更加方便的理解 ThreadPoolExecutor 的源码。

四. 模拟运行

public class Main {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(3,
                10000, TimeUnit.MILLISECONDS, 5,
                (queue, task) -> {
                    // 1. 死等
                    //try {
                    //    queue.put(task);
                    //} catch (InterruptedException e) {
                    //    e.printStackTrace();
                    //}
                    // 2. 放弃任务执行
                    // do nothing...
                    System.out.println("do discard policy...");
                    // 3. 抛出异常
                    //throw new RuntimeException("task run fail" + task);
                    // 4. 调用线程执行任务
                    //task.run();
                });

        for (int i = 0; i < 10; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j + "is running...");
            });
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2230857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

金华迪加 现场大屏互动系统 mobile.do.php 任意文件上传漏洞复现

0x01 产品简介 金华迪加现场大屏互动系统是一种集成了先进技术和创意设计的互动展示解决方案,旨在通过大屏幕和多种交互方式,为观众提供沉浸式的互动体验。该系统广泛应用于各类活动、展览、会议等场合,能够显著提升现场氛围和参与者的体验感。 0x02 漏洞概述 金华迪加 现…

2024年系统架构师---下午题目真题

1. 数据仓库架构风格的优缺点&#xff1a; 优点&#xff1a; 1&#xff09;数据统一保存在中央数据仓库&#xff0c;数据处理流程相对独立&#xff0c;支持交互式处理。 缺点&#xff1a; 1&#xff09;仓库风格不支持并行&#xff0c;效率低。 2&#xff09;仓库风格容错性和健…

JVM、JRE、JDK区别和联系

JVM(java virtual machine)&#xff1a;Java虚拟机主要包括类加载器、执行引擎、本地接口和运行时数据区&#xff0c;其中运行时数据区是JVM的主要部分。JVM的主要作用是将class文件中的二进制数据加载到运行时数据区的方法区&#xff0c;在堆区生成相应的java.lang.Class对象&…

Vue中ref、reactive、toRef、toRefs的区别

一、ref、reactive setup 函数中默认定义的变量并不是响应式的&#xff08;即数据变了以后页面不会跟着变&#xff09;&#xff0c;如果想让变量变为响应式的变量&#xff0c;需要使用 ref 和 reactive 函数修饰变量。 ref 函数可以把基本类型变量变为响应式引用reactive 函数…

Linux安装es和kibana

安装Elasticsearch 参考文档&#xff1a;https://www.elastic.co/guide/en/elasticsearch/reference/current/targz.html#targz-enable-indices 基本步骤下载包&#xff0c;解压&#xff0c;官网提示&#xff1a; wget https://artifacts.elastic.co/downloads/elasticsearc…

spreadjs实现类似于企业微信的协同提示

核心代码 import * as GC from "grapecity-software/spread-sheets";function HighlightLayout(name:string){this.name name;this._eventNs ".HighlightLayout" name || "";this._sheetRangesInfo {} } HighlightLayout.prototype.bind f…

Linux云计算 |【第五阶段】PROJECT3-DAY1

主要内容&#xff1a; 跳板机&#xff08;堡垒机&#xff09;的概念、部署JumpeServer 一、跳板机&#xff08;堡垒机&#xff09;的概念 跳板机&#xff08;Jump Server 或 Bastion Host&#xff09;是一种网络安全设备或服务器&#xff0c;也称堡垒机&#xff0c;是一类可作…

PAT甲级-1133 Splitting A Linked List

题目 题目大意 给定一个链表的首节点地址和节点个数&#xff0c;以及一个数k。要求重新排列该链表&#xff0c;使其按<0 &#xff0c;> 0 && < k&#xff0c;>k 的顺序排序。但是不改变原有顺序&#xff0c;比如-4 -> -6 -> -2&#xff0c;不需要再…

重新回顾反向传播与梯度下降:训练神经网络的基石

有关反向传播与梯度下降&#xff1a;流程与公式推导 背景前向传播反向传播 背景 反向传播则是一种训练神经网络的算法&#xff0c;目前我们使用的深度学习模型大都是通过这种方法训练的。它的核心思想是通过计算损失函数相对于每个参数的导数&#xff0c;来更新神经网络中的权重…

Java | Leetcode Java题解之第524题通过删除字母匹配到字典里最长单词

题目&#xff1a; 题解&#xff1a; class Solution {public String findLongestWord(String s, List<String> dictionary) {int m s.length();int[][] f new int[m 1][26];Arrays.fill(f[m], m);for (int i m - 1; i > 0; --i) {for (int j 0; j < 26; j) {…

PHP合成图片,生成海报图,poster-editor使用说明

之前写过一篇使用Grafika插件生成海报图的文章&#xff0c;但是当我再次使用时&#xff0c;却发生了错误&#xff0c;回看Grafika文档&#xff0c;发现很久没更新了&#xff0c;不兼容新版的GD&#xff0c;所以改用了intervention/image插件来生成海报图。 但是后来需要对海报…

机器人领域中的scaling law:通过复现斯坦福机器人UMI——探讨数据规模化定律(含UMI的复现关键)

前言 在24年10.26/10.27两天&#xff0c;我司七月在线举办的七月大模型机器人线下营时&#xff0c;我们带着大家一步步复现UMI「关于什么是UMI&#xff0c;详见此文&#xff1a;UMI——斯坦福刷盘机器人&#xff1a;从手持夹持器到动作预测Diffusion Policy(含代码解读)」&…

丝杆支撑座的更换与细节注意事项

丝杆支撑座是支撑连接丝杆和电机的轴承支撑座&#xff0c;分固定侧和支撑侧&#xff0c;它们都有用预压调整的JIS5级的交界处球轴承。在自动化设备中是常用的传动装置&#xff0c;作为核心部件&#xff0c;对设备精度、稳定性和生产效率产生直接影响。在长时间运行中&#xff0…

行业深耕+全球拓展双轮驱动,用友U9 cloud加速中国制造全球布局

竞争加剧、供应链动荡、出海挑战……在日益激烈的市场竞争和新的全球化格局中&#xff0c;中国制造业的数智化转型已经步入深水区。 作为面向中型和中大型制造业的云ERP&#xff0c;用友U9 cloud一直是中国制造业转型升级的参与者和见证者。自2021年发布以来&#xff0c;用友U…

C#实现word和pdf格式互转

1、word转pdf 使用nuget&#xff1a; Microsoft.Office.Interop.Word winform页面&#xff1a; 后端代码&#xff1a; //using Spire.Doc; //using Spire.Pdf; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using Sy…

Spring MVC 完整生命周期和异常处理流程图

先要明白 // 1. 用户发来请求: localhost:8080/user/1// 2. 处理器映射器(HandlerMapping)的工作 // 它会找到对应的Controller和方法 GetMapping("/user/{id}") public User getUser(PathVariable Long id) {return userService.getById(id); }// 3. 处理器适配…

wps宏代码学习

推荐学习视频&#xff1a;https://space.bilibili.com/363834767/channel/collectiondetail?sid1139008&spm_id_from333.788.0.0 打开宏编辑器和JS代码调试 工具-》开发工具-》WPS宏编辑器 左边是工程区&#xff0c;当打开多个excel时会有多个&#xff0c;要注意不要把…

vscode | 开发神器vscode快捷键删除和恢复

目录 快捷键不好使了删除快捷键恢复删除的快捷键 在vscode使用的过程中&#xff0c;随着我们自身需求的不断变化&#xff0c;安装的插件将会持续增长&#xff0c;那么随之而来的就会带来一个问题&#xff1a;插件的快捷键重复。快捷键重复导致的问题就是快捷键不好使了&#xf…

Java-02

笔试算法&#xff1a; 41. 回文串 我们称一个字符串为回文串&#xff0c;当且仅当这个串从左往右和从右往左读是一样的。例如&#xff0c;aabbaa、a、abcba 是回文串&#xff0c;而 ab、ba、abc 不是回文串。注意单个字符也算是回文串。 现在&#xff0c;给你一个长度为n的…

《数字图像处理基础》学习05-数字图像的灰度直方图

目录 一&#xff0c;数字图像的数值描述 &#xff11;&#xff0c;二值图像 &#xff12;&#xff0c;灰度图像 3&#xff0c;彩色图像 二&#xff0c;数字图像的灰度直方图 一&#xff0c;数字图像的数值描述 在之前的学习中&#xff0c;我知道了图像都是二维信息&…