【并发编程】手写线程池阻塞队列

news2025/1/7 9:11:55

       📝个人主页:五敷有你      
 🔥系列专栏:并发编程
⛺️稳重求进,晒太阳

示意图 

步骤1:自定义任务队列

变量定义

  1. 用Deque双端队列来承接任务
  2. 用ReentrantLock 来做锁
  3. 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
  4. 最后定义容量 capcity

方法:

  1. 添加任务
    1. 注意点:
      1. 任务容量慢了 用await
      2. 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
      3. 考虑万一死等的情况,加入时间的判断
  2. 取出任务
    1. 注意点:
      1. 任务空了 用await
      2. 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
      3. 考虑超时的情况,加入时间的判断
public class MyBlockQueue<T> {
    //1.任务队列
    private Deque<T> deque=new ArrayDeque();
    //2.锁
    private ReentrantLock lock=new ReentrantLock();

    //3.生产者条件变量
    private Condition fullWaitSet=lock.newCondition();
    //4.消费者条件变量
    private Condition emptyWaitSet=lock.newCondition();

    //5.容量
    private int capcity;

    public MyBlockQueue(int capcity) {
        this.capcity = capcity;
    }

    //带超时的阻塞获取
    public T poll(long timeOut, TimeUnit unit){

        lock.lock();
        try {
            //将timeOUt转换成统一转换为ns
            long nanos = unit.toNanos(timeOut);

            while (deque.isEmpty()) {
                try {
                    //返回值=等待时间-经过的时间
                    if(nanos<=0){
                        return null;
                    }
                   nanos= emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }



    }

    //6. 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
         T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }




    }





    //阻塞添加
    public void put(T element){
        lock.lock();
        try {
            while (deque.size()==capcity){
                try {
                    fullWaitSet.await();
                }catch (Exception e){

                }
             }
            deque.addLast(element);
            emptyWaitSet.signalAll();

        } finally {
            lock.unlock();
        }
    }
    public int size(){
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }
    }

步骤2:自定义线程池

  1. 定义变量:
    1. 任务队列 taskQueue
    2. 队列的容量
    3. 线程的集合
    4. 核心线程数
    5. 获取任务的超时时间
    6. 时间单位
  2. 方法
    1. 构造方法 初始化一些核心的参数
    2. 执行方法 execute(task) 里面处理任务
      1. 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
      2. 当任务数量>核心数量时,就加入到阻塞队列中
  3. 自定义的类worker
    1. 继承Thread 重写Run方法
      1. 执行传递的任务,每次任务执行完毕,不回收,
      2. 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
package com.aqiuo.juc;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    //任务队列
    private MyBlockQueue<Runnable> taskQueue;
    //队列容量
    int queueCapcity;
    //线程集合
    private HashSet<Worker> workers=new HashSet();

    //线程池的核心线程
    private int coreSize;

    //获取任务的超时时间
    private long timeOut;

    //时间单位
    private TimeUnit timeUnit;

    public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {
        this.coreSize = coreSize;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        taskQueue=new MyBlockQueue<>(queueCapcity);
    }

    public void exectue(Runnable task){
            //当任务数没有超过coreSize时,直接交给work对象执行
            //如果任务超过coreSize时,加入任务队列
      synchronized (workers){
          if(workers.size()<coreSize){
              Worker worker=new Worker(task);
              System.out.println("新增worker");
              workers.add(worker);
              worker.start();
              //任务数超过了核心数
          }else{
              System.out.println(task+"加入任务队列");
              taskQueue.put(task);
          }
      }


    }

    class Worker extends Thread{

        private Runnable task;
        public Worker(Runnable task){
            this.task=task;
        }

        @Override
        public void run() {
            //执行任务
            //1)当task不为空,执行任务
            //2)当task执行完毕,再接着从任务队列中获取任务

            while (task!=null||(task=taskQueue.take())!=null){
                try {
                    System.out.println("正在执行worker"+this);
                    sleep(10000);
                    task.run();
                } catch (Exception e) {

                }finally {
                    task=null;
                }

            }

            //执行完任务后销毁线程
            synchronized (workers){
                workers.remove(this);
            }

        }
    }

    


}

测试

开启15个线程测试

public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
        for (int i=0;i<15;i++){
            int j=i;
            threadPool.exectue(()->{
                System.out.println(j);
            });
        }
    }

        执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,

        同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。

这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet

改进

获取任务的超时结束

获取任务take的增强 超时

  //带超时的阻塞获取
    public T poll(long timeOut, TimeUnit unit){

        lock.lock();
        try {
            //将timeOUt转换成统一转换为ns
            long nanos = unit.toNanos(timeOut);

            while (deque.isEmpty()) {
                try {
                    //返回值=等待时间-经过的时间
                    if(nanos<=0){
                        return null;
                    }
                   nanos= emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }



    }

修改worker的run函数

      public void run() {
            //执行任务
            //1)当task不为空,执行任务
            //2)当task执行完毕,再接着从任务队列中获取任务
//            while (task!=null||(task=taskQueue.take())!=null){
            //修改如下
            while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){
                try {
                    System.out.println("正在执行worker"+this);
                    sleep(1000);
                    task.run();
                } catch (Exception e) {

                }finally {
                    task=null;
                }

            }

正常结束了

放入任务的超时结束offer()

那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入

//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){
    lock.lock();
    long nanos = unit.toNanos(timeOut);
    try {

        while (deque.size()==capcity){
            try {
                long l = fullWaitSet.awaitNanos(nanos);
                if(l<=0){
                    return false;
                }

            }catch (Exception e){

            }
        }
        deque.addLast(element);
        emptyWaitSet.signalAll();
        return true;
    } finally {
        lock.unlock();
    }
}

拒绝策略

函数式接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(MyBlockQueue<T> queue, T task);
}

代码改进

如下部分代码是存入任务的部分

public void exectue(Runnable task){
            //当任务数没有超过coreSize时,直接交给work对象执行
            //如果任务超过coreSize时,加入任务队列
      synchronized (workers){
          if(workers.size()<coreSize){
              Worker worker=new Worker(task);
              System.out.println("新增worker");
              workers.add(worker);
              worker.start();
              //任务数超过了核心数
          }else{
              //存入任务
              //taskQueue.put(task);
              //当队列满了之后 执行的策略
              //1) 死等
              //2)带有超时的等待
              //3)当调用者放弃任务执行
              //4)让调用者抛出异常
              //5)让调用者自己执行任务...
              //为了增加灵活性,这里不写死,交给调用者
              //重新写了一个放入任务的方法
              taskQueue.tryPut(rejectPolicy,task);
          }
      }

    }

阻塞队列里的tryPut

public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {

    lock.lock();

    try {
        //如果队列容量满了,就开始执行拒绝策略
        if(capcity>= deque.size()){
            rejectPolicy.reject(this,task);

        }else{
            //不满就正常加入到队列中
            System.out.println(task+"正常加入到队列");
            deque.addLast(task);
        }
    }finally {
        lock.unlock();
    }

}

//1) 死等

//2)带有超时的等待

//3)当调用者放弃任务执行

//4)让调用者抛出异常

//5)让调用者自己执行任务...

谁调用方法,谁写拒绝策略

为了传入策略,就再构造函数里面加入一个方法的参数传入

//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;

public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
    this.coreSize = coreSize;
    this.timeOut = timeOut;
    this.timeUnit = timeUnit;
    taskQueue=new MyBlockQueue<>(queueCapcity);
    this.rejectPolicy=rejectPolicy;
}

主函数编写拒绝的策略,就lamda表达式会把...

public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{
            //死等
//            queue.put(task);
            //超时添加
//            System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));
            //放弃执行
//            System.out.print("我放弃");
            //调用者抛出异常
//            throw new RuntimeException("任务执行失败");
            //调用者执行
//            task.run();


        });
        for (int i=0;i<5;i++){
            int j=i;
            threadPool.exectue(()->{
                System.out.println(j);
            });
        }
    }

五种拒绝策略的结果(我不会用slog4j)

1.死等的结果

2.超时拒绝的结果(每个false都是时间到了,每加进去)

3.不作为,调用者放弃任务

4.抛出异常,停止

5.调用者线程执行了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436342.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AE2023 After Effects 2023

After Effects 2023是一款非常强大的视频编辑软件&#xff0c;提供了许多新功能和改进&#xff0c;使得视频编辑和合成更加高效和灵活。以下是一些After Effects 2023的特色功能&#xff1a; 新合成预设列表&#xff1a;After Effects 2023彻底修改了预设列表&#xff0c;使其…

《权力的游戏》AI创作大电影 震撼来袭

《权力的游戏》AI创作大电影 震撼来袭 Westeros, a world of power, intrigue, and dark magic. In the frozen North, the army of the dead marches towards Winterfell. The Seven Kingdoms are divided by war and bloodshed, as the Great Houses battle for supremacy.…

必看!嵌入式基于UART的通信协议-RS232、RS485协议解析

这两种都是串口通讯的变种&#xff0c;为了提升串口通信的距离和稳定性。通常来说&#xff0c;正常的串口通信使用的是TTL电平&#xff0c;即高电平为2.4-5V&#xff0c;低电平为0-0.4V。高低电平之间的范围很小&#xff0c;如果有静电或者其他外界的干扰&#xff0c;很快会将低…

了解这六种最佳移动自动化测试工具吗?

最好的移动自动化测试工具 在本文章关于移动应用程序测试的这一部分中&#xff0c;我们将研究 2023 年 6 种最佳移动自动化测试工具。 1、Appium Appium 是一个非常流行的开源自动化测试框架&#xff0c;支持各种操作系统的自动化。它可以与本机、混合和移动 Web 应用程序一…

春运开始,北斗卫星助力盲区来车预警提示

春运开始&#xff0c;北斗卫星助力盲区来车预警提示 近期春运开始&#xff0c;高德地图启动了2024年的“温暖回家路”服务计划&#xff0c;通过数字化服务创新保障春运出行。除了具备自学习能力的新能源导航首发亮相外&#xff0c;还重点升级了盲区会车预警服务。在山区弯道、…

docker复习笔记01(小滴课堂)安装+部署mysql

查看内核版本。 关闭防火墙&#xff1a; 查看docker版本&#xff1a; 下载阿里yum源&#xff1a; 再看一下yum版本都有哪些&#xff1a; 我们可以看的docker-ce了。 安装它&#xff1a; 设置docker服务开机启动&#xff1a; 更新日志文件&#xff1a; 启动docker&#xff1a; …

ETL是什么,有哪些ETL工具?就业前景如何?

ETL是什么 ETL&#xff08;Extract-Transform-Load&#xff09;&#xff0c;用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目标端的过程。ETL一词较常用在数据仓库&#xff0c;但其对象并不限于数据仓库。它可以自动化数据处理过程&#xff0c;减少…

2024/2/5总结

微信小程序 监听对象中所有属性的变化 如果某个对象中需要被监听的属性太多&#xff0c;为了方便&#xff0c;可以使用 通配符 ** 来监听 对象中所有属性的变化 什么是纯数字字段 概念&#xff1a;纯数字字段指的是那些不用于界面渲染的 data 字段。 好处&#xff1a;提升界面…

2024.02 国内认知大模型汇总

概述 大模型&#xff0c;又称为大规模机器学习模型&#xff0c;是一种基于大数据的人工智能技术。它通过深度学习和机器学习的方法&#xff0c;对大量数据进行训练&#xff0c;以实现对复杂问题的高效解决。大模型技术在语音识别、图像识别、自然语言处理等领域有着广泛的应用…

sqli.bypass靶场本地小皮环境(1-5关)

1、第一关 http://sqli.bypass/index1.php 单引号报错id1 双引号正常id1&#xff0c;应该是单引号闭合 id1--注释符用不了&#xff0c;%20和都用不了 %0a可以用 没有报错&#xff0c;用布尔盲注&#xff0c;POC&#xff1a;id1%0aand%0asubstr(ss,1,1)s%0aand%0a11 脚本跑数…

JavaScript流程控制详解之顺序结构和选择结构

流程控制 流程控制&#xff0c;指的是控制程序按照怎样的顺序执行 在JavaScript中&#xff0c;共有3种流程控制方式 顺序结构选择结构循环结构 顺序结构 在JavaScript中&#xff0c;顺序结构是最基本的结构&#xff0c;所谓的顺序结构&#xff0c;指的是代码按照从上到下、…

【python数据分析基础】—dataframe中index的相关操作(添加、修改index的列名、修改index索引值等)

文章目录 前言一、添加、修改index的列名二、修改index索引值 前言 本文主要讲dataframe结构中index的相关操作&#xff0c;index相当于是数据表的行。 一、添加、修改index的列名 新建一个dataframe表&#xff0c;我们可以自定义index的值&#xff0c;如下&#xff1a; imp…

Webpack源码浅析

webpack启动方式 webpack有两种启动方式&#xff1a; 通过webpack-cli脚手架来启动&#xff0c;即可以在Terminal终端直接运行&#xff1b; webpack ./debug/index.js --config ./debug/webpack.config.js通过require(webpack)引入包的方式执行&#xff1b;其实第一种方式最终…

sqli-labs-master靶场训练笔记(38-53|boss战)

2024.2.4 level-38 &#xff08;堆叠注入&#xff09; 这题乍一看感觉又是来卖萌的&#xff0c;这不是和level-1一模一样吗 然后仔细看了一下源代码&#xff0c;根据 mysqli_multi_query 猜测这题的本意应该是堆叠注入 mysqli_multi_query() 是 PHP 中用于执行多个 SQL 查…

Sysbench 性能测试(小白快速上手)

文末有惊喜哦 &#xff01; Sysbench 介绍 Sysbench 是一个在Linux系统上进行性能测试和基准测试的工具。它可以用于评估计算机系统的各种性能指标&#xff0c;如 CPU 性能、内存性能、文件 I/O性 能和数据库性能等。Sysbench 提供了多种测试模式和选项&#xff0c;可以帮助用户…

感悟笔记——2024年2月5日

今日阅读了一篇挺有深度的文章&#xff0c;主要阐述进入职场后的大部分人&#xff0c;是怎么逐渐沦为螺丝钉的?即使起点巨高的优等生&#xff0c;也不可避免。文章指路&#xff1a; 「优等生思维」正在将你变成「螺丝钉」和「老黄牛」从小到大&#xff0c;我一直都是那个「别…

EMC测试报告怎么看 PK、QP、AV

EMC测试报告怎么看 报告中的字母辐射报告1辐射报告2 测试条件 报告中的字母 1.PK.PEAK&#xff0c;是指峰值&#xff08;单位时间内的最高值&#xff09;&#xff1b; 2.QP&#xff08;QUASI-PEAK)是指准峰值&#xff08;单位时间内的平均值&#xff09;&#xff1b; 3.AV(AVE…

数据采集接口分类:数据采集、数据的采集有哪些?

中国的人工智能会面临着前所未有的发展机遇&#xff0c;她也将会以真正解决人类钢需载入史册&#xff0c;我们也期待着在天津跟在座的各位合作伙伴共同努力&#xff0c;真正的用人工智能建设美好世界。 API接口数据采集 主流电商数据采集 一、 什么是数据采集 确立一个算法模…

算法-2-异或运算

按位异或&#xff1a;相同为0&#xff0c;不同为1 异或运算性质 1&#xff09;异或运算就是无进位相加&#xff08;ab写二进制形式每位相加时不进位&#xff09; 2&#xff09;异或运算满足交换律、结合律&#xff0c;也就是同一批数字&#xff0c;不管异或顺序是什么&#…

【精选】java继承进阶,子类继承父类(内存图、内存分析工具)

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…