共享模型之工具(二)

news2025/1/12 17:56:45

1.自定义线程池

1>.在实际开发过程中建议不要使用JDK提供的方式创建线程池,因为底层不方便优化,在请求量非常大的情况下可能会出现OOM,我们需要手动实现一个线程池;
在这里插入图片描述
2>.代码实现:

@Slf4j
public class TestThreadPoolDemo1 {
    public static void main(String[] args) {
        //创建线程池对象
        CustomThreadPool customThreadPool = new CustomThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
            //queue.put(task); //拒绝策略①:添加任务失败,一直等待;
            //queue.offer(3000,TimeUnit.MILLISECONDS,task); //拒绝策略②:添加任务失败,等待指定的时间,返回结果;
            //log.info("放弃任务{}",task); //拒绝策略③:任务添加失败,直接放弃任务,后面的其他任务也会放弃;
            //throw new RuntimeException(); //拒绝策略④:任务添加失败,当前任务不执行,抛出异常,后面的其他任务都不执行;
            //task.run();//拒绝策略④:任务添加失败,调用者线程自己去执行任务(当前是main线程);
        });
        //提交请求任务
        for (int i = 0; i < 4; i++) {
            int j = i;
            customThreadPool.executor(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //请求任务,打印信息
                log.info("{}", j);
            });
        }
    }
}

//阻塞队列,存放请求任务
@Slf4j
class CustomBlockQueue<T> {
    //(有界)队列
    private Deque<T> queue = new ArrayDeque<T>();

    //锁对象(多个线程同时操作同一个位置上的元素)
    private ReentrantLock lock = new ReentrantLock();

    //生产者条件变量,队列满了让生产者线程等待
    private Condition fullWaitSetCondition = lock.newCondition();

    //消费者条件变量,队列空了让消费者线程等待
    private Condition emptyWaitSetCondition = lock.newCondition();

    //队列容量
    private int capcity;

    public CustomBlockQueue(int capcity) {
        this.capcity = capcity;
    }

    //带超时的等待获取
    public T poll(long timeOut, TimeUnit timeUnit) {
        lock.lock();
        try {
            //将超时时间统一转换为nanos
            long nanos = timeUnit.toNanos(timeOut);

            while (queue.isEmpty()) {
                //队列为空,当前线程等待
                try {
                    //当前线程只等待指定的时间,如果提前唤醒,则提前结束等待;如果超时未唤醒,则自动唤醒;
                    //该等待方法的返回值是剩余等待时间,即剩余等待时长=等待总时长-已经等待的时长;
                    //为了防止虚假唤醒,需要将这个方法返回的剩余等待时间重新赋值给nanos变量,
                    //下一轮循环就按照这个剩余时间进行等待,而不是再等待nanos时长
                    if (nanos <= 0) {
                        //等待时间结束仍然没有获取到元素,当前线程运行结束
                        return null;
                    }
                    nanos = emptyWaitSetCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //正常获取元素(弹出元素)
            T t = queue.removeFirst();

            //取出元素,队列中有空位,唤醒等待中的生产线程
            fullWaitSetCondition.signalAll();

            return t;
        } finally {
            lock.unlock();
        }
    }

    //从队列中获取元素
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                //队列为空,当前线程等待
                try {
                    emptyWaitSetCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //正常获取元素(弹出元素)
            T t = queue.removeFirst();

            //取出元素,队列中有空位,唤醒等待中的生产线程
            fullWaitSetCondition.signalAll();

            return t;
        } finally {
            lock.unlock();
        }
    }

    //带超时时间的等待添加
    public boolean offer(long timeOut, TimeUnit timeUnit, T task) {
        lock.lock();
        try {
            //将超时时间转换成Nanos
            long nanos = timeUnit.toNanos(timeOut);

            while (queue.size() >= capcity) {
                //队列已满,当前线程等待(等待指定的时间)
                try {
                    log.info("任务队列已满,等待加入任务队列...{}",nanos);
                    //如果等待时间用完了任务还是没有添加到任务队列中,结束等待
                    //返回一个false,表示本次添加任务失败!
                    if (nanos <= 0) {
                        log.info("等待结束,返回结果{}",false);
                        return false;
                    }
                    //线程提前被虚假唤醒,返回一个剩余等待时间,这个时间也是下一轮循环当前线程要等待的时间
                    //把它重新赋值给nanos变量,作为下一轮循环等待时间
                    nanos = fullWaitSetCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //正常添加元素
            log.info("添加任务到任务队列中:{}", task);
            queue.addLast(task);

            //队列中有元素,唤醒处于等待状态中的消费线程
            emptyWaitSetCondition.signalAll();

            //添加成功,返回true
            return true;
        } finally {
            lock.unlock();
        }
    }

    //添加元素到队列中
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                //队列已满,当前线程等待(一直等待)
                try {
                    log.info("任务队列已满,等待加入任务队列...");
                    fullWaitSetCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //正常添加元素
            log.info("添加任务到任务队列中:{}", task);
            queue.addLast(task);

            //队列中有元素,唤醒处于等待状态中的消费线程
            emptyWaitSetCondition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    //获取队列大小
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    //包含拒绝策略的队列添加元素添加方法
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判读队列是否已满
            if (queue.size() == capcity) {
                //队列已满,根据指定的拒绝策略处理当前这个请求任务
                rejectPolicy.reject(this, task);
            } else {
                //正常添加元素
                log.info("添加任务到任务队列中:{}", task);
                queue.addLast(task);

                //队列中有元素,唤醒处于等待状态中的消费线程
                emptyWaitSetCondition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}

//自定义线程池
@Slf4j
class CustomThreadPool {

    //任务队列(线程队列)
    private CustomBlockQueue<Runnable> taskQueue;

    //工作线程集合
    private HashSet<Worker> works = new HashSet<Worker>();

    //核心线程数
    private int coreSize;

    //工作线程从任务队列中获取任务的最大等待时间
    private long timeOut;

    //拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    private TimeUnit timeUnit;

    public CustomThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        this.taskQueue = new CustomBlockQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    //提交任务
    public void executor(Runnable task) {
        synchronized (works) {
            //当任务数量(工作线程数量)没有超过coreSize时,直接交给worker执行;
            if (works.size() < coreSize) {
                //创建一个worker线程
                Worker worker = new Worker(task);

                log.info("新创建worker工作线程:{}", worker);

                //将新创建的线程加入到线程集合中
                works.add(worker);
                //线程启动
                worker.start();
            } else {
                //如果任务数量(工作线程数量)超过了coreSize,将任务加入到任务队列中暂存;
                //带拒绝策略的添加
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    //工作线程,真正执行任务
    class Worker extends Thread {
        //要执行的任务
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            //(循环)任务执行
            //当task不为空,直接执行任务
            //当task执行完毕,再接着从任务队列中获取任务执行,如果超过指定的等待时间还是没有获取到新的任务,
            //那么直接返回null,而不是一直等待!!!
            //while (this.task != null || (this.task = taskQueue.take()) != null) {
            while (this.task != null || (this.task = taskQueue.poll(timeOut, timeUnit)) != null) {
                try {
                    log.info("正在执行的请求任务:{}", this.task);
                    this.task.run();
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    this.task = null;
                }
            }

            //当前线程执行完毕,从线程集合works中移除
            synchronized (works) {
                log.info("即将被移除的worker工作线程:{}", this);
                works.remove(this);
            }
        }
    }
}

//创建线程池拒绝策略接口
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(CustomBlockQueue<T> queue, T task);
}

2.异步模式之工作线程模式

2.1.定义

1>.让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务.也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式;

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message);

***注意:不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率;

例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工;

2.2.饥饿现象(线程池中的线程不足)

***注意:固定大小的线程池会有饥饿现象!

例如

①.两个工人是同一个线程池中的两个线程;
②.他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作;

  • 1).客人点餐: 必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待;
  • 2).后厨做菜: 没啥说的,做就是了;

③.比如工人A 处理了点餐任务,接下来它要等着工人B把菜做好,然后上菜,他俩也配合的蛮好;
④.但现在同时来了两个客人,这个时候工人A 和工人B都去处理点餐了,这时没人做饭了,饥饿;

代码实现:

@Slf4j
public class TestDeadLock {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();

    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        //针对不同的任务类型创建不同的线程池
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookingPool = Executors.newFixedThreadPool(1);

        waiterPool.execute(() -> {
            log.info("处理点餐...");
            Future<String> f = cookingPool.submit(() -> {
                log.info("做菜");
                return cooking();
            });

            try {
                log.info("上菜{}", f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        waiterPool.execute(() -> {
            log.info("处理点餐...");
            Future<String> f = cookingPool.submit(() -> {
                log.info("做菜");
                return cooking();
            });

            try {
                log.info("上菜{}",f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

在这里插入图片描述

3.线程池中合理的线程数

问题:

①.过小会导致程序不能充分地利用系统资源、容易导致饥饿;
②.过大会导致更多的线程上下文切换,占用更多内存;

1>.CPU密集型运算

通常采用cpu 核数 + 1能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费;

2>.I/O密集型

CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时、远程RPC 调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率;
经验公式如下:

  • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
    例如4核CPU计算时间是50%,其它等待时间是50%,期望cpu被100%利用,套用公式: 4 * 100% * 100% / 50% = 8

4.Tomcat线程池

4.1.Tomcat中哪里用到了线程池?

连接器和container容器!

在这里插入图片描述
组件说明:

①.LimitLatch用来限流,可以控制最大连接个数,类似J.U.C中的Semaphore;
②.Acceptor只负责"接收新的socket连接";
③.Poller只负责监听socket channel是否有"可读的I/O事件"
④.一旦可读,封装一个任务对象(socketProcessor),提交给Executor线程池处理;
⑤.Executor线程池中的工作线程最终负责"处理请求";

Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同:

如果总线程数达到maximumPoolSize,这时不会立刻抛RejectedExecutionException异常,而是再次尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException异常;

Tomcat-7.0.42源码:

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
      super.execute(command);
    } catch (RejectedExecutionException rx) {
  
    if (super.getQueue() instanceof TaskQueue) {
          final TaskQueue queue = (TaskQueue)super.getQueue();
          try {
            if (!queue.force(command, timeout, unit)) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException("Queue capacity is full.");
            }
          } catch (InterruptedException x) {
  
          submittedCount.decrementAndGet();
          Thread.interrupted();
          throw new RejectedExecutionException(x);
        }
    }else {
        submittedCount.decrementAndGet();
        throw rx;
    }
  }
}

TaskQueue.java

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
   if ( parent.isShutdown() ) 
      throw new RejectedExecutionException(
        "Executor not running, can't force a command into the queue"
   );
   
   return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}

4.2.Tomcat线程池配置

***注意:在Tomcat的配置文件"server.xml"中配置!

1>.connetor标签的线程相关配置
在这里插入图片描述
2>.Executor标签相关的线程配置
在这里插入图片描述
3>.线程池执行流程:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/351613.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

容器安全风险and容器逃逸漏洞实践

本文博客地址&#xff1a;https://security.blog.csdn.net/article/details/128966455 一、Docker存在的安全风险 1.1、Docker镜像存在的风险 不安全的第三方组件&#xff1a;用户自己的代码依赖若干开源组件&#xff0c;这些开源组件本身又有着复杂的依赖树&#xff0c;甚至…

在 Python 中只接受数字作为用户输入

只接受数字作为用户输入&#xff1a; 使用 while True 循环进行循环&#xff0c;直到用户输入一个数字。使用 float() 类尝试将值转换为浮点数。如果用户输入了一个数字&#xff0c;请使用 break 语句跳出循环。 while True:try:# &#x1f447;️ use int() instead of floa…

宝马项目化流程标准(BMW ABC flyer requirement)

ABC flyer/ BMWQMT build phase requirement宝马的项目流程标准叫做ABC flyer,也叫QMT build phase requirement.为什么叫这么名字&#xff0c;是因为宝马项目的产品零件分为几个阶段&#xff1a;A-samples, B-samples,C-samples, initial-samples.1、PVL/ VS0:100% ok parts i…

高通平台开发系列讲解(Android篇)AudioTrack音频流数据传输

文章目录 一、音频流数据传输通道创建1.1、流程描述1.2、流程图解二、音频数据传输2.1、流程描述2.2、流程图解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇章主要图解AudioTrack音频流数据传输 。 一、音频流数据传输通道创建 1.1、流程描述 AudioTrack在set函…

项目自动化构建工具make/Makefile

目录 make/Makefile概念和关系 make/Makefie的使用 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的规则来指定&#xff0c;哪些文件需要先编译&#xff0c;哪些文件需要后编译&#xff0c;哪些文件需要重…

33岁测试宝妈,在家带娃一年半,入职新公司居然年薪30W+

疫情3年&#xff0c;每一个行业的危机&#xff0c;每一个企业的倒下&#xff0c;背后都是无数人的降薪、降职和失业。这也暴露了人生的残酷真相&#xff1a;人活一辈子&#xff0c;总有“丰年”和“荒年” 优秀的测试既过得了丰年&#xff0c;也受得住荒年。 我之前认识的一个…

零信任-Zscaler零信任介绍(7)

​Zscaler零信任介绍 Zscaler是一家专注于网络安全的公司&#xff0c;他们提供了一种名为Zscaler Zero Trust Exchange (ZTX)的零信任解决方案。这种解决方案旨在帮助企业提高网络安全&#xff0c;并确保只有授权的用户&#xff0c;设备和应用程序才能访问敏感信息。ZTX采用多…

畅购电商项目

1. 电商项目架构图技术框架/技术选型1.1 系统架构该项目是一个B2C的电商项目&#xff08;类似小米商城、京东商城、天猫商城&#xff09;允许客户通过网络购买商品该项目主要完成的是电商项目前台的开发。采用前后端分离的方式进行开发的前端&#xff1a;vue全家桶&#xff08;…

QT入门Containers之Widget、Frame

目录 一、QWidget界面相关 1、布局介绍 2、基本界面属性 3、特殊属性 二、QFrame 三、Demo展示 此文为作者原创&#xff0c;创作不易&#xff0c;转载请标明出处&#xff01; 一、QWidget界面相关 1、布局介绍 为什么将QWidget容器放在第一个&#xff0c;因为目前使用过…

前端缓存知识-强缓存与协商缓存

缓存的作用 减少了冗余的数据传输&#xff0c;节省了网费。减少了服务器的负担&#xff0c; 大大提高了网站的性能加快了客户端加载网页的速度 缓存分类 强制缓存如果生效&#xff0c;不需要再和服务器发生交互&#xff0c;而对比缓存不管是否生效&#xff0c;都需要与服务端…

查询蓝牙适配器版本

台式机不支持蓝牙&#xff0c;装了个蓝牙适配器&#xff0c;结果换来换去又忘记自己这个是啥版本了&#xff0c;适配器自己也不写。好在微软官方也给了说明如何查询我的电脑运行哪个蓝牙版本&#xff1f; - Microsoft 支持https://support.microsoft.com/zh-cn/windows/%E6%88%…

day44【代码随想录】动态规划之零钱兑换II、组合总和 Ⅳ、零钱兑换

文章目录前言一、零钱兑换II&#xff08;力扣518&#xff09;二、组合总和 Ⅳ&#xff08;力扣377&#xff09;三、零钱兑换&#xff08;力扣322&#xff09;总结前言 1、零钱兑换II 2、组合总和 Ⅳ 3、零钱兑换 一、零钱兑换II&#xff08;力扣518&#xff09; 给你一个整数…

教你如何实现一个页面自动打字效果

前言&#xff1a; 最近在写一个类似于 windows 启动页的项目&#xff0c;不知道大家是否还记的 windows 很经典的小光标加载页面&#xff0c;我又稍微改造了一下效果如下&#xff1a; 一. 光标闪烁效果的实现 tips&#xff1a; 在这里我们使用了 UnoCSS&#xff0c;如果你不清…

金三银四,如果没准备这些面试题,找工作还是缓一缓吧

前言: 最近金三银四跳槽季&#xff0c;相信很多小伙伴都在面试找工作&#xff0c;怎样才能拿到大厂的offer&#xff0c;没有掌握绝对的技术&#xff0c;那么就要不断的学习… 如何拿下阿里等大厂的offer的呢&#xff0c;今天分享一个秘密武器&#xff0c;资深测试开发师整理的…

【面试问题-java内存模型JMM】

今天面试&#xff0c;我把运行时数据区域答成了java内存模型&#xff0c;回来把这方面的问题给纠正一下。 以下内容阅读自《深入理解Java虚拟机》第12章 下面小段只做了解即可。重点是Java内存模型。 多任务处理在现代计算机操作系统中是必备的功能。 计算机运行速度与它的存储…

【MySQL】数据库基础

目录 1、什么是数据库 2、 数据库基本操作 2.1 查看当前数据库 2.2 创建一个数据库 2.3 选中数据库 2.4 删除数据库 3、常见的数据类型 3.1 数值类型 3.2 字符串类型 3.3 日期类型 4、表的操作 4.1 创建表 4.2 查看指定数据库下的所有表 4.3 查看表的结构 4.…

java常见的异常

异常分类 Throwable 是java异常的顶级类&#xff0c;所有异常都继承于这个类。 Error,Exception是异常类的两个大分类。 Error Error是非程序异常&#xff0c;即程序不能捕获的异常&#xff0c;一般是编译或者系统性的错误&#xff0c;如OutOfMemorry内存溢出异常等。 Exc…

环境变量和进程地址空间

目录 环境变量&#xff1a; env&#xff1a;显示所有的环境变量&#xff1a; echo $环境变量名表示查看环境变量的值 理解环境变量&#xff1a; getenv&#xff1a;显示环境变量的值 export set命令&#xff1a;显示所有变量 unset取消变量&#xff1a; pwd&#xff1a;当…

Django框架之模型查询-关联查询

关联查询 查询书籍为1的所有人物信息 查询人物为1的书籍信息由一到多的访问语法&#xff1a; 一对应的模型类对象.多对应的模型类名小写_set 例&#xff1a; >>> book BookInfo.objects.get(id1) >>> book.peopleinfo_set.all() <QuerySet [<Peopl…

buntu18 安装 openpose(GPU)环境

openpose环境 搭建 很费劲&#xff0c; 需要装软件也多&#xff0c; 还必须要考虑版本的问题。我主要是参考链接 ubuntu18安装openpose详细步骤_litbo的博客-CSDN博客_ubuntu安装openpose 其中&#xff0c;我的实验中 有如下需要更改。 1、我的是 cuda-10.2 2、gcc 和g 必…