JUC:手写实现一个简易的线程池(Java)

news2024/11/15 15:54:42

目录

​编辑

先上完整代码:

解析:

任务队列:

线程池类:

拒绝策略:


先上完整代码:

public class MyThreadPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {
            // 1.死等
            queue.put(task);
            // 2.带超时时间等待加入等待队列
            // queue.offer(task, 500, TimeUnit.MICROSECONDS);
            // 3.放弃任务
            // 队列满了,没做人任何事情
            // 4.抛出异常
            // throw new RuntimeException("任务执行失败" + task);
            // 5.让调用者自己执行
            // task.run();
        });
        for (int i = 0; i < 15; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue queue, T task) ;
}
class ThreadPool {
    // 任务队列
    private BlockQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet();

    // 线程数
    private int coreSize;

    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;
    // 构造方法
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueSize);
        this.rejectPolicy = rejectPolicy;
    }

    public void execute(Runnable task) {
        // 当任务数没有超过核心数时,直接交给woker对象执行
        // 如果超过,放入任务队列中存起来
        synchronized (workers) { // workers不安全,把他锁起来
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println("新增worker");
                workers.add(worker); // 加入线程集合
                worker.start();
            } else {
                // taskQueue.put(task); // 任务添加进入
                // 1.死等
                // 2.带超时时间等待
                // 3.放弃任务
                // 4.抛出异常
                // 5.让调用者自己执行
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 当task任务不为空,执行
            // 当任务为空,去任务队列中去取
            //  while (task != null || (task = taskQueue.take()) != null) 一直等待获取
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在执行" + task);
                    task.run();
                } catch (Exception e) {

                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("worker被移除" + this);
                workers.remove(this); // 移除当前集合对象
            }
        }
    }
}

// 阻塞队列
class BlockQueue<T> {
    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 锁
    private ReentrantLock lock = new ReentrantLock();

    // 满了等待,生产者
    private Condition fullWaitSet = lock.newCondition();

    // 空的等待,消费者
    private Condition emptyWaitSet = lock.newCondition();

    // 容量
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    // 阻塞队列中获取任务
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await(); // 进入等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞队列中添加任务
    public void put(T t) {
         lock.lock();
         try {
             while (queue.size() == capacity) { // 如果满了,进入等待
                 try {
                     System.out.println("等待加入任务队列" +  t);
                     fullWaitSet.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             System.out.println("加入任务队列" + t);
             queue.addLast(t);
             emptyWaitSet.signal(); // 唤醒
         }finally {
             lock.unlock();
         }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock(); // 就算return也会执行
        }
    }

    // 带超时时间的获取,无需永久的等待了
    public T poll (long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout); // 时间转换为ns
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) return null; // 超时了,直接返回吧
                    nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的添加, return 添加成功 or 失败
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) { // 如果满了,进入等待
                try {
                    System.out.println("等待加入任务队列" +  task);
                    if (nanos <= 0) return false;
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列" + task);
            queue.addLast(task);
            emptyWaitSet.signal(); // 唤醒
            return true;
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否已满
            if (queue.size() == capacity) { // 有空闲
                rejectPolicy.reject(this, task); // 拒绝策略
            } else { // 有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

解析:

任务队列:

// 阻塞队列
class BlockQueue<T> {
    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 锁
    private ReentrantLock lock = new ReentrantLock();

    // 满了等待,生产者
    private Condition fullWaitSet = lock.newCondition();

    // 空的等待,消费者
    private Condition emptyWaitSet = lock.newCondition();

    // 容量
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    // 阻塞队列中获取任务
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await(); // 进入等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞队列中添加任务
    public void put(T t) {
         lock.lock();
         try {
             while (queue.size() == capacity) { // 如果满了,进入等待
                 try {
                     System.out.println("等待加入任务队列" +  t);
                     fullWaitSet.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             System.out.println("加入任务队列" + t);
             queue.addLast(t);
             emptyWaitSet.signal(); // 唤醒
         }finally {
             lock.unlock();
         }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock(); // 就算return也会执行
        }
    }

    // 带超时时间的获取,无需永久的等待了
    public T poll (long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout); // 时间转换为ns
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) return null; // 超时了,直接返回吧
                    nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal(); // 唤醒
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的添加, return 添加成功 or 失败
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) { // 如果满了,进入等待
                try {
                    System.out.println("等待加入任务队列" +  task);
                    if (nanos <= 0) return false;
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列" + task);
            queue.addLast(task);
            emptyWaitSet.signal(); // 唤醒
            return true;
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否已满
            if (queue.size() == capacity) { // 有空闲
                rejectPolicy.reject(this, task); // 拒绝策略
            } else { // 有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}
  1.  ArrayDeque 作为底层数据结构存储队列元素。
  2.  ReentrantLock 实现了线程安全。
  3. Condition 来实现阻塞等待机制,当队列为空时,消费者线程等待;当队列满时,生产者线程等待。
  4. 常规的入队 put()、出队 take() 操作。
  5. 带有超时的入队 offer() 和出队 poll() 操作。
  6. tryPut() 方法,该方法接受一个 RejectPolicy 接口,用于指定当队列已满时的拒绝策略

方法:

  • take(): 当队列为空时,消费者线程调用该方法将进入等待状态,直到队列中有元素可取。
  • put(T t): 当队列已满时,生产者线程调用该方法将进入等待状态,直到队列有空位可添加元素。
  • poll(long timeout, TimeUnit unit): 带有超时的出队操作,当队列为空时,会等待一段时间,如果在指定时间内仍未有元素可取,则返回 null。
  • offer(T task, long timeout, TimeUnit timeUnit): 带有超时的入队操作,当队列已满时,会等待一段时间,如果在指定时间内仍未有空位可添加元素,则返回 false。
  • tryPut(RejectPolicy<T> rejectPolicy, T task): 尝试添加元素,当队列已满时,根据拒绝策略 RejectPolicy 进行处理。

单看其实就是一个生产者消费者模式而已。

线程池类:

class ThreadPool {
    // 任务队列
    private BlockQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet();

    // 线程数
    private int coreSize;

    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;
    // 构造方法
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueSize);
        this.rejectPolicy = rejectPolicy;
    }

    public void execute(Runnable task) {
        // 当任务数没有超过核心数时,直接交给woker对象执行
        // 如果超过,放入任务队列中存起来
        synchronized (workers) { // workers不安全,把他锁起来
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println("新增worker");
                workers.add(worker); // 加入线程集合
                worker.start();
            } else {
                // taskQueue.put(task); // 任务添加进入
                // 1.死等
                // 2.带超时时间等待
                // 3.放弃任务
                // 4.抛出异常
                // 5.让调用者自己执行
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 当task任务不为空,执行
            // 当任务为空,去任务队列中去取
            //  while (task != null || (task = taskQueue.take()) != null) 一直等待获取
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在执行" + task);
                    task.run();
                } catch (Exception e) {

                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("worker被移除" + this);
                workers.remove(this); // 移除当前集合对象
            }
        }
    }
}
  1. BlockQueue<Runnable> 来存储待执行的任务。
  2. HashSet<Worker> 来存储线程集合。
  3. 提供构造方法来初始化线程池的核心线程数、超时时间、任务队列大小和拒绝策略。
  4. execute(Runnable task) 方法来提交任务到线程池中执行。
  5. 内部定义了 Worker 内部类,用于执行任务的线程。

方法:

  • execute(Runnable task): 提交任务到线程池中执行。如果当前线程数小于核心线程数,则直接创建新的 Worker 线程执行任务;如果当前线程数已达到核心线程数,则尝试将任务放入任务队列中,根据拒绝策略 rejectPolicy 进行处理。
  • Worker: 内部类实现了线程执行任务的逻辑。在 run() 方法中,线程会不断从任务队列中取出任务执行,如果队列为空则会等待一段时间,超时时间由 timeouttimeUnit 决定。

拒绝策略:

函数式接口,由使用者提供实现。

// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue queue, T task) ;
}
```java
public class MyThreadPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {
            // 1.死等
            queue.put(task);
            // 2.带超时时间等待加入等待队列
            // queue.offer(task, 500, TimeUnit.MICROSECONDS);
            // 3.放弃任务
            // 队列满了,没做人任何事情
            // 4.抛出异常
            // throw new RuntimeException("任务执行失败" + task);
            // 5.让调用者自己执行
            // task.run();
        });
        for (int i = 0; i < 15; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }
}

几种拒绝策略实现:

  1. 死等(Blocking): 当任务队列已满时,线程池会一直等待直到有空位。这里使用了 queue.put(task),该方法会阻塞当前线程直到队列有空位可用。

  2. 带超时时间等待(Timeout Blocking): 当任务队列已满时,线程池会等待一段时间,如果在指定时间内仍未有空位可用,则放弃当前任务。这里使用了 queue.offer(task, 500, TimeUnit.MICROSECONDS),该方法会在指定时间内等待,如果超时则返回 false。

  3. 放弃任务(Discard): 当任务队列已满时,线程池会放弃当前任务,不做任何处理。

  4. 抛出异常(Throw Exception): 当任务队列已满时,线程池会抛出异常,通知调用者任务执行失败。

  5. 让调用者自己执行(Caller Runs): 当任务队列已满时,不在线程池内执行任务,而是由调用者自己执行任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1575344.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于javassm实现的旅游景点线路网站

开发语言&#xff1a;Java 框架&#xff1a;ssm 技术&#xff1a;JSP JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.…

Q1公测手游复盘:大作很多,爆款没有

2024年第一季度过去&#xff0c;整个国内手游市场爆冷连连。 据眸娱不完全统计&#xff0c;一季度累计上新22部颇具影响力的手游&#xff08;买断制手游仅留鹰角网络《来自星尘》作为代表&#xff09;。许多备受期待的大作在一季度抢先完成了上线或测试&#xff0c;却在市场端…

IDEA中的Debug功能介绍

说明&#xff1a;本文介绍IDEA中的Debug功能&#xff0c;基于2023.2&#xff08;Ultimate Edition&#xff09;版本 简单介绍 首先&#xff0c;在程序需要停止的所在行号上&#xff0c;鼠标左键&#xff0c;可设置一个断点&#xff0c;是一个红色圆点标志&#xff0c;表示程序…

python入门到精通

本文基于《python3从入门到精通》进行编写 python是什么 是一种简单易学的计算机编程语言&#xff0c;有配套的软件工具和库。 是一种开源的语言。因其有许多强大的开源库使得python对与计算、大数据、人工智能都有很强的支持能力。 是一种解释型语言。其代码不需要编译就可…

基于 OpenHarmony ActiveOhos_sqllite 组件开发指南

1. ActiveOhos 功能介绍 1.1.组件介绍 基于鸿蒙系统连接数据库进行 sqlite 数据库操作的时候&#xff0c;创建连接的时候是有些繁琐的&#xff0c;本组件就是为了简化 sqlite 的连接&#xff0c;并且对鸿蒙原生的 API 进行封装加强&#xff0c;使得读写 sqlite 数据库的时候更…

Java8新特性 (jdk1.8)

目录 一、Lamdba表达式&#xff1f; 二、函数式接口 三、方法引用和构造引用 四、Stream API流 五、接口中的新增 默认方法和静态方法 六、新时间日期API 七、Optional 八、其他特性 一、Lamdba表达式&#xff1f; 为什么使用Lambda表达式&#xff1f; Lambda 是一个 匿…

[挖坟]如何安装Shizuku和LSPatch并安装模块(不需要Root,非Magisk)

2023年12月13日&#xff0c;LSPatch 停止维护 2024年1月8日&#xff0c;LSPosed 停止维护 2024年1月8日&#xff0c;ZygiskNext 停止维护 2024年1月9日&#xff0c;KernelSU 停止维护 这里使用 ColorOS 14 演示&#xff0c;其他品牌手机类似 安装 Shizuku 官网: https://shiz…

JQuery(二)---【使用JQuery对HTML、CSS进行操作】

零.前言 JQuery(一)---【JQuery简介、安装、初步使用、各种事件】-CSDN博客 一.使用JQuery对HTML操作 1.1获取元素内容、属性 使用JQ可以操作元素的“内容” text()&#xff1a;设置或返回元素的文本内容html()&#xff1a;设置或返回元素的内容(包括HTML标记)val()&#…

每天一个注解之@DataSource、 @DS

在Java中&#xff0c;DataSource 注解通常用于标记数据源&#xff08;DataSource&#xff09;相关的信息。数据源是一个用于获取数据库连接的对象&#xff0c;它通常用于与数据库进行交互。DataSource 注解的详细说明可能会因不同的框架或库而有所不同&#xff0c;但通常用于配…

2024-04-07 作业

作业要求&#xff1a; 1> 思维导图 2> 自由发挥应用场景实现一个登录窗口界面。 【可以是QQ登录界面、也可以是自己发挥的登录界面】 要求&#xff1a;尽量每行代码都有注释 作业1&#xff1a; 作业2&#xff1a; 运行代码&#xff1a; #include "myqwidget.h&quo…

部署安装ElasticSearch、Kibana、IK

文章目录 1、部署单点es1.1、创建网络1.2、加载镜像1.3、运行 2、部署kibana2.1、部署2.2、DevTools 3、IK分词器3.1、在线安装3.2、离线安装1&#xff09;查看数据卷目录2&#xff09;解压缩分词器安装包3&#xff09;上传到es容器的插件数据卷中4&#xff09;重启容器5&#…

2024.4.7

1. 2列火车 #include<myhead.h>pthread_mutex_t m1; pthread_mutex_t m2;void* run(void* arg) {while(1){pthread_mutex_lock(&m1);printf("火车B进入\n");printf("A请等待\n");pthread_mutex_unlock(&m2);sleep(2);} }int main(in…

火山方舟大模型服务平台调用Demo测试(豆包)

豆包得后台大模型支持为字节得火山方舟&#xff0c;所以想使用豆包的API&#xff0c;直接从这里就可以。 一、首先注册账号&#xff1a; 火山引擎-云上增长新动力 注册完成之后&#xff0c;控制台-账户-API访问密钥 二、找到API测试用例&#xff1a; Skylark-chat API调用…

白盒测试-语句覆盖

​ 语句覆盖法是指设计适当数量的测试用例&#xff0c;使被测程序中的每条语句至少被执行一次。语句覆盖率的计算方法为&#xff1a; ​ 至少被执行一次的语句数量 / 程序中可执行的语句总数。 案例 ​ 为了清晰地比较几种逻辑覆盖法设计测试用例的异同&#xff0c;逻辑覆盖…

LeetCode热题100:哈希

1.两数之和 题目链接&#xff1a;两数之和 题目描述&#xff1a;给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数…

11.2 浏览器调试常用技巧

目录 1、开发者工具介绍 2、查看节点事件 3、断点调试 4、观察调用栈 5、恢复 JavaScript 执行 6、Ajax 断点 7、改写 JavaScript 文件 1、开发者工具介绍 由于需要学习 JavaScript 逆向&#xff0c;所以此文主要介绍与 JavaScript 逆向有帮助的功能。 以下链接为例&a…

国内:深圳交通流量数据集

数据来源&#xff1a;深圳政府数据开放平台&#xff08;深圳市政府数据开放平台&#xff09;&#xff0c;这个官网上还有其他类数据集&#xff0c;值得收藏&#xff01;&#xff01;&#xff01; 数据集介绍&#xff1a;宝安区-G4高速西乡大道入口车流量统计 第一行每列的标题…

记一次Debug与Release版程序输出不一致的问题解决

问题叙述&#xff1a; 在x86平台下无论Debug还是Release都没问题&#xff0c;而在arm平台下Debug版本程序无问题&#xff0c;Release版本程序&#xff08;-O3编译&#xff09;发现输出值不正确&#xff0c;怀疑值被篡改&#xff0c;于是在调用前后分别使用printf打印出参数值&…

vitepress系列-04-规整sideBar左侧菜单导航

规整左侧菜单导航 新建navConfig.ts 文件用来管理左侧导航菜单&#xff1a; 将于其他的配置分开&#xff0c;避免config.mts太大 在config目录下&#xff0c;新建 sidebarModules文件目录用来左侧导航菜单 按模块进行分类&#xff1a; 在config下新建sidebarConfig.ts文件&…

3dmax经常染失败?优化方法提升染质量!

在三维建模和渲染的过程中&#xff0c;优化模型和场景的效率是至关重要的。以下是一些提升效率的方法&#xff1a; 模型简化&#xff1a;在创建模型时&#xff0c;应尽量减少使用的命令和修改器的数量。这是因为命令和修改器越多&#xff0c;消耗的内存和CPU资源也就越多&…