【并发编程】Executor线程池

news2025/1/18 4:40:40

一、线程

1.线程

线程是调度CPU资源的最小单位。java线程与OS线程保持1:1映射关系,也就是说,一个Java线程也会在操作系统里有一个对应线程。

2.线程的生命周期

NEW,新建
RUNNABLE,运行
BLOCKED,阻塞
WAITING,等待
TIMED_WAITING,超时等待
TERMINATED,终结
在这里插入图片描述

二、线程池

1.为什么使用线程池

通过减少频繁创建和销毁线程来降低性能损耗。
例如:当并发请求量比较大的时候,单个任务的执行时间很短,频繁创建和销毁线程会大大降低系统的效率。

2.线程池的使用场景

1.单个任务执行时间比较短
2.任务数量比较多

3.Executor线程池

3-1 结构

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。

package java.util.concurrent;

public interface Executor {
    void execute(Runnable var1);
}

在这里插入图片描述
Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为:
1、excute(Runnable command) : 履行Runnable类型任务。
2、submit(task) :可以用来提交Callable或Runnable任务,并返回代表此任务的Future对象。
3、shutdown():在完成已提交的任务后封闭,不再接管新任务。
4、shutdownNow() : 停止所有正在履行的任务并封闭。
5、isTerminated() : 测试是否所有任务都履行完毕了。
6、isShutdown() : 测试是否该ExecutorService已被关闭。

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2);

    Future<?> submit(Runnable var1);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService实现了ExecutorService中的方法:

package java.util.concurrent;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

public abstract class AbstractExecutorService implements ExecutorService {
    public AbstractExecutorService() {
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable var1, T var2) {
        return new FutureTask(var1, var2);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> var1) {
        return new FutureTask(var1);
    }

    public Future<?> submit(Runnable var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var2 = this.newTaskFor(var1, (Object)null);
            this.execute(var2);
            return var2;
        }
    }

    public <T> Future<T> submit(Runnable var1, T var2) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var3 = this.newTaskFor(var1, var2);
            this.execute(var3);
            return var3;
        }
    }

    public <T> Future<T> submit(Callable<T> var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var2 = this.newTaskFor(var1);
            this.execute(var2);
            return var2;
        }
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> var1, boolean var2, long var3) throws InterruptedException, ExecutionException, TimeoutException {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            int var5 = var1.size();
            if (var5 == 0) {
                throw new IllegalArgumentException();
            } else {
                ArrayList var6 = new ArrayList(var5);
                ExecutorCompletionService var7 = new ExecutorCompletionService(this);
                boolean var23 = false;

                Object var14;
                try {
                    var23 = true;
                    ExecutionException var8 = null;
                    long var9 = var2 ? System.nanoTime() + var3 : 0L;
                    Iterator var11 = var1.iterator();
                    var6.add(var7.submit((Callable)var11.next()));
                    --var5;
                    int var12 = 1;

                    while(true) {
                        Future var13 = var7.poll();
                        if (var13 == null) {
                            if (var5 > 0) {
                                --var5;
                                var6.add(var7.submit((Callable)var11.next()));
                                ++var12;
                            } else {
                                if (var12 == 0) {
                                    if (var8 == null) {
                                        var8 = new ExecutionException();
                                    }

                                    throw var8;
                                }

                                if (var2) {
                                    var13 = var7.poll(var3, TimeUnit.NANOSECONDS);
                                    if (var13 == null) {
                                        throw new TimeoutException();
                                    }

                                    var3 = var9 - System.nanoTime();
                                } else {
                                    var13 = var7.take();
                                }
                            }
                        }

                        if (var13 != null) {
                            --var12;

                            try {
                                var14 = var13.get();
                                var23 = false;
                                break;
                            } catch (ExecutionException var24) {
                                var8 = var24;
                            } catch (RuntimeException var25) {
                                var8 = new ExecutionException(var25);
                            }
                        }
                    }
                } finally {
                    if (var23) {
                        int var18 = 0;

                        for(int var19 = var6.size(); var18 < var19; ++var18) {
                            ((Future)var6.get(var18)).cancel(true);
                        }

                    }
                }

                int var15 = 0;

                for(int var16 = var6.size(); var15 < var16; ++var15) {
                    ((Future)var6.get(var15)).cancel(true);
                }

                return var14;
            }
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedExcepti

3-2 简单使用

 public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 5, 5000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));

        for (int i=0;i<5;i++){
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("i m task :"+Thread.currentThread().getName());
                }
            },i);
        }

    }

在这里插入图片描述

阿里巴巴开发规范:在这里插入图片描述

3-3 线程池属性

public class ThreadPoolExecutor extends AbstractExecutorService {
     // 新建ThreadPoolExecutor的时候会对ctl进行初始化,-536870912 | 0
     // ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:线程池的运行状态(runState) 和线程池内有效线程的数量(workerCount),使用了Integer类型保存,高3位保存runState,低29位保存workerCount,大约是5亿。
     private final AtomicInteger ctl = new AtomicInteger(ctlOf(-536870912, 0)); // 1110 0000 0000 0000 0000 0000 0000 0000
     private static final int COUNT_BITS = 29; // 0001 1101
     private static final int CAPACITY = 536870911; // 0001 1111 1111 1111 1111 1111 1111 1111
     // 线程池状态
     // 线程池处在RUNNING状态时,能够接受新任务,以及对已添加的任务进行处理。
     private static final int RUNNING = -536870912; // 1110 0000 0000 0000 0000 0000 0000 0000
     // 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
     private static final int SHUTDOWN = 0; //0000 0000 0000 0000 0000 0000 0000 0000
     // 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
     private static final int STOP = 536870912; // 0010 0000 0000 0000 0000 0000 0000 0000
     // 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
     private static final int TIDYING = 1073741824; // 0100 0000 0000 0000 0000 0000 0000 0000
     // 线程池彻底终止
     private static final int TERMINATED = 1610612736; // 0110 0000 0000 0000 0000 0000 0000 0000
}

在这里插入图片描述

3-4 源码解析

3-4-1 线程池的具体实现

ThreadPoolExecutor 默认线程池
ScheduledThreadPoolExecutor 定时线程池

3-4-2 线程池的创建

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数解释:
1>corePoolSize:核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池提前创建并启动所有核心线程。
2>maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
3>keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
4>unit:keepAliveTime时间的单位;
5>workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
6>handler:线程池的饱和(拒绝)策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

3-4-3 ThreadPoolExecutor执行全流程

在这里插入图片描述
1) 当前运行的线程数 < corePoolSize,则创建新线程来执行任务;
2)当前运行的线程数 >= corePoolSize,则任务加入阻塞BlockingQueue;
3)阻塞队列BlockingQueue已满,则创建新的线程来处理任务;
4)如果创建新线程将使当前运行的线程 > 最大线程数,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

3-4-3-1 execute执行全流程

![在这里插入图片描述](https://img-blog.csdnimg.cn/68547633343f442fa1afaca14700ddb1.png

public void execute(Runnable var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            int var2 = this.ctl.get();
            if (workerCountOf(var2) < this.corePoolSize) {
                if (this.addWorker(var1, true)) {
                    return;
                }

                var2 = this.ctl.get();
            }

            if (isRunning(var2) && this.workQueue.offer(var1)) {
                int var3 = this.ctl.get();
                if (!isRunning(var3) && this.remove(var1)) {
                    this.reject(var1);
                } else if (workerCountOf(var3) == 0) {
                   // 为了保证线程池在RUNNING状态下必须要有一个线程来执行任务
                    this.addWorker((Runnable)null, false);
                }
            } else if (!this.addWorker(var1, false)) {
                this.reject(var1);
            }

        }
    }
3-4-3-2 addWorker执行全流程

在这里插入图片描述

private boolean addWorker(Runnable var1, boolean var2) {
        while(true) {
            int var3 = this.ctl.get();
            int var4 = runStateOf(var3);
            if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {
                return false;
            }

            while(true) {
                int var5 = workerCountOf(var3);
                if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) {
                    return false;
                }

                if (this.compareAndIncrementWorkerCount(var3)) {
                    boolean var18 = false;
                    boolean var19 = false;
                    ThreadPoolExecutor.Worker var20 = null;

                    try {
                        // 根据firstTask来创建Worker对象
                        var20 = new ThreadPoolExecutor.Worker(var1);
                        // 每一个Worker对象都会创建一个线程
                        Thread var6 = var20.thread;
                        if (var6 != null) {
                            ReentrantLock var7 = this.mainLock;
                            var7.lock();

                            try {
                                int var8 = runStateOf(this.ctl.get());
                                // rs < 0表示是RUNNING状态;
                                // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                                if (var8 < 0 || var8 == 0 && var1 == null) {
                                    if (var6.isAlive()) {
                                        throw new IllegalThreadStateException();
                                    }

                                    this.workers.add(var20);
                                    int var9 = this.workers.size();
                                    // largestPoolSize记录着线程池中出现过的最大线程数量
                                    if (var9 > this.largestPoolSize) {
                                        this.largestPoolSize = var9;
                                    }

                                    var19 = true;
                                }
                            } finally {
                                var7.unlock();
                            }

                            if (var19) {
                                // 启动线程
                                var6.start();
                                var18 = true;
                            }
                        }
                    } finally {
                        if (!var18) {
                            this.addWorkerFailed(var20);
                        }

                    }

                    return var18;
                }

                var3 = this.ctl.get();
                // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
                if (runStateOf(var3) != var4) {
                    break;
                }
            }
        }
    }
3-4-3-3 runWorker执行全流程

在这里插入图片描述

 // 使用AQS来实现独占锁
 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread; // 用来处理任务的线程
        Runnable firstTask; // 传入的任务
        volatile long completedTasks;

        Worker(Runnable var2) {
            //tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断
            this.setState(-1);
            this.firstTask = var2;
            // newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
            this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
        }

        public void run() {
            ThreadPoolExecutor.this.runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return this.getState() != 0;
        }

        protected boolean tryAcquire(int var1) {
            if (this.compareAndSetState(0, 1)) {
                this.setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                return false;
            }
        }

        protected boolean tryRelease(int var1) {
            this.setExclusiveOwnerThread((Thread)null);
            this.setState(0);
            return true;
        }

        public void lock() {
            this.acquire(1);
        }

        public boolean tryLock() {
            return this.tryAcquire(1);
        }

        public void unlock() {
            this.release(1);
        }

        public boolean isLocked() {
            return this.isHeldExclusively();
        }

        void interruptIfStarted() {
            Thread var1;
            if (this.getState() >= 0 && (var1 = this.thread) != null && !var1.isInterrupted()) {
                try {
                    var1.interrupt();
                } catch (SecurityException var3) {
                }
            }

        }
    }
 final void runWorker(ThreadPoolExecutor.Worker var1) {
        Thread var2 = Thread.currentThread();
        // 获取第一个任务
        Runnable var3 = var1.firstTask;
        var1.firstTask = null;
        // 允许中断
        var1.unlock();
        boolean var4 = true;

        try {
            // // 如果task为空,则通过getTask来获取任务
            while(var3 != null || (var3 = this.getTask()) != null) {
                var1.lock();
                if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) {
                    var2.interrupt();
                }

                try {
                    this.beforeExecute(var2, var3);
                    Object var5 = null;

                    try {
                        var3.run();
                    } catch (RuntimeException var28) {
                        var5 = var28;
                        throw var28;
                    } catch (Error var29) {
                        var5 = var29;
                        throw var29;
                    } catch (Throwable var30) {
                        var5 = var30;
                        throw new Error(var30);
                    } finally {
                        this.afterExecute(var3, (Throwable)var5);
                    }
                } finally {
                    var3 = null;
                    ++var1.completedTasks;
                    var1.unlock();
                }
            }

            var4 = false;
        } finally {
            this.processWorkerExit(var1, var4);
        }

    }
  private void processWorkerExit(ThreadPoolExecutor.Worker var1, boolean var2) {
        if (var2) {
            this.decrementWorkerCount();
        }

        ReentrantLock var3 = this.mainLock;
        var3.lock();

        try {
            //统计完成的任务数
            this.completedTaskCount += var1.completedTasks;
            // 从workers中移除,也就表示着从线程池中移除了一个工作线程
            this.workers.remove(var1);
        } finally {
            var3.unlock();
        }
        // 根据线程池状态进行判断是否结束线程池
        this.tryTerminate();
        int var4 = this.ctl.get();
        // 536870912 -> STOP的取值,小于STOP的状态是运行状态或者SHUTDOWN
        if (runStateLessThan(var4, 536870912)) {
           // 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
            if (!var2) {
                 // 如果允许核心线程超时,最小值为0,否则为corePoolSize
                int var5 = this.allowCoreThreadTimeOut ? 0 : this.corePoolSize;
                 // 如果最小值为0,同时任务队列不空,则更新最小值为1
                if (var5 == 0 && !this.workQueue.isEmpty()) {
                    var5 = 1;
                }
                // 工作线程数大于等于最小值,直接返回不新增非核心线程
                if (workerCountOf(var4) >= var5) {
                    return;
                }
            }

            this.addWorker((Runnable)null, false);
        }

    }

    private Runnable getTask() {
        boolean var1 = false;

        while(true) {
            int var2 = this.ctl.get();
            int var3 = runStateOf(var2);
            /** 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
               1. rs >= STOP,线程池是否正在stop;
               2. 阻塞队列是否为空。
              如果以上条件满足,则将workerCount减1并返回null。
               因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
            */
            if (var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) {
                this.decrementWorkerCount();
                return null;
            }

            int var4 = workerCountOf(var2);
            boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize;
            if (var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) {
                try {
                    // 根据var5来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null(其实就是,在keepAliveTime时间内能否获取到任务)
                    // 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空
                    Runnable var6 = var5 ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
                    if (var6 != null) {
                        return var6;
                    }

                    var1 = true;
                } catch (InterruptedException var7) {
                    var1 = false;
                }
            } else if (this.compareAndDecrementWorkerCount(var2)) {
                return null;
            }
        }
    }

工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束。
在这里插入图片描述
线程池监控:

public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

3-5 定时线程池(ScheduledThreadPoolExecutor)

3-5-1 类结构图

在这里插入图片描述

3-5-2 简单使用

public static void main(String[] args) {

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        long cur = System.currentTimeMillis();
        System.out.println("执行开始时间:" + cur);
        // 延时2s执行任务
        ScheduledFuture<?> future = scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("Hello World");
            return 1;
        }, 2000, TimeUnit.MILLISECONDS);
        System.out.println("执行完成时间:" + (System.currentTimeMillis() - cur));

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
}

在这里插入图片描述
但是,这个只能执行一次,如果想定时执行该怎么办呢?答:使用scheduleAtFixedRate方法,scheduleAtFixedRate(task,time,period,unit)
task-所要安排的任务,time-首次执行任务的时间,period-执行一次task的时间间隔,unit-单位
作用:时间等于或超过time首次执行task,之后每隔period后重复执行task

public class ThreadPool {

    private  static Integer count =1;
    MyTimerTask myTimerTask = new MyTimerTask();
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);

    public void start(){
        try {
            //从第一次任务开始时延时0s,一秒执行一次
            scheduledThreadPoolExecutor.scheduleAtFixedRate(myTimerTask, 0,1, TimeUnit.SECONDS);
            while (!scheduledThreadPoolExecutor.isTerminated()){
                lock.readLock().lock();
                if (count >20){
                    scheduledThreadPoolExecutor.shutdown();
                }
                lock.readLock().unlock();

            }
        }catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Finished all threads");
    }
    private class MyTimerTask implements Runnable {
        @Override
        public void run(){
            lock.writeLock().lock();
            System.out.println("第 "+count+ " 次执行任务,count="+count);
            count ++;
            lock.writeLock().unlock();
        }

    }

    public static void main(String[] args) {
        new ThreadPool().start();
    }
}    

在这里插入图片描述
当任务的执行时间大于下一任务开始执行的时间,会出现什么情况呢?

    public static void main(String[] args) {

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            System.out.println(new Date() + ": 任务执行完成。。。");
        },1000,2000,TimeUnit.MILLISECONDS);
}        

在这里插入图片描述

  public static void main(String[] args) {

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            long startTime = System.currentTimeMillis();
            System.out.println(new Date(startTime) + ": 任务开始执行。。。");
            long nowTime = startTime;
            while ((nowTime - startTime) < 5000) {
                nowTime = System.currentTimeMillis();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(new Date(nowTime) + ": 任务执行完成。。。");
        },1000,2000,TimeUnit.MILLISECONDS);
}        

在这里插入图片描述
当任务的执行时间大于下一任务开始执行时间,会导致后续任务进行积压,当前任务执行完成后再继续执行后面的任务。当任务数量过多时,积压这么多也不是办法,有什么方法可以保证上一任务执行完成后 延时指定的时间后 再继续执行下一任务呢? 答:scheduleWithFixedDelay方法,
scheduleWithFixedDelay(task,time,delay,unit)
task-所要安排的任务, time-首次执行任务的时间 ,delay-每次执行任务的延迟时间,unit-单位
作用:时间等于或超过time首次执行task,之后每隔delay后重复执行task

 public static void main(String[] args) {

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
            long startTime = System.currentTimeMillis();
            System.out.println(new Date(startTime) + ": 任务开始执行。。。");
            long nowTime = startTime;
            while ((nowTime - startTime) < 5000) {
                nowTime = System.currentTimeMillis();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(new Date(nowTime) + ": 任务执行完成。。。");
        },1000,2000,TimeUnit.MILLISECONDS);
 }       

在这里插入图片描述
当执行任务抛出异常,系统会报错吗?答:不会,因为线程池中try…catcn了异常

    public static void main(String[] args) {

        System.out.println(111);
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
            System.out.println(new Date() + ": 任务执行完成。。。");
            throw new RuntimeException("error...");
        },1000,2000,TimeUnit.MILLISECONDS);

        System.out.println(222);
}        

在这里插入图片描述

3-5-3 使用场景

3-5-3-1 分布式锁

在这里插入图片描述

3-5-3-2 SpringCloud-服务注册与发现中心

在这里插入图片描述

3-5-3 源码解析

在这里插入图片描述

它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三种提交任务
的方式:
1.schedule
2.scheduledAtFixedRate
3.scheduledWithFixedDelay

它采用DelayQueue存储等待的任务
1.DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若
time相同则根据sequenceNumber排序;
2.DelayQueue也是一个无界队列;

基本方法与ThreadPoolExecutor中的一样,接下来就介绍一下它独有的处理方式。

3-5-3-1 构造方法
    public ScheduledThreadPoolExecutor(int var1) {
        super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2) {
        super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
    }

    public ScheduledThreadPoolExecutor(int var1, RejectedExecutionHandler var2) {
        super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2);
    }

    public ScheduledThreadPoolExecutor(int var1, ThreadFactory var2, RejectedExecutionHandler var3) {
        super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue(), var2, var3);
    }

调ThreadPoolExecutor类中的构造方法,

    public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6) {
        this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7) {
        this(var1, var2, var3, var5, var6, var7, defaultHandler);
    }

    public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, RejectedExecutionHandler var7) {
        this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), var7);
    }

    public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
            if (var6 != null && var7 != null && var8 != null) {
                this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
                this.corePoolSize = var1;
                this.maximumPoolSize = var2;
                this.workQueue = var6;
                this.keepAliveTime = var5.toNanos(var3);
                this.threadFactory = var7;
                this.handler = var8;
            } else {
                throw new NullPointerException();
            }
        } else {
            throw new IllegalArgumentException();
        }
    }
3-5-3-2 schedule/scheduleAtFixedRate/scheduleWithFixedDelay方法
 public <V> ScheduledFuture<V> schedule(Callable<V> var1, long var2, TimeUnit var4) {
        if (var1 != null && var4 != null) {
            RunnableScheduledFuture var5 = this.decorateTask((Callable)var1, new ScheduledThreadPoolExecutor.ScheduledFutureTask(var1, this.triggerTime(var2, var4)));
            this.delayedExecute(var5);
            return var5;
        } else {
            throw new NullPointerException();
        }
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6) {
        if (var1 != null && var6 != null) {
            if (var4 <= 0L) {
                throw new IllegalArgumentException();
            } else {
                ScheduledThreadPoolExecutor.ScheduledFutureTask var7 = new ScheduledThreadPoolExecutor.ScheduledFutureTask(var1, (Object)null, this.triggerTime(var2, var6), var6.toNanos(var4));
                RunnableScheduledFuture var8 = this.decorateTask((Runnable)var1, var7);
                var7.outerTask = var8;
                this.delayedExecute(var8);
                return var8;
            }
        } else {
            throw new NullPointerException();
        }
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable var1, long var2, long var4, TimeUnit var6) {
        if (var1 != null && var6 != null) {
            if (var4 <= 0L) {
                throw new IllegalArgumentException();
            } else {
                ScheduledThreadPoolExecutor.ScheduledFutureTask var7 = new ScheduledThreadPoolExecutor.ScheduledFutureTask(var1, (Object)null, this.triggerTime(var2, var6), var6.toNanos(-var4));
                RunnableScheduledFuture var8 = this.decorateTask((Runnable)var1, var7);
                var7.outerTask = var8;
                this.delayedExecute(var8);
                return var8;
            }
        } else {
            throw new NullPointerException();
        }
    }

      private void delayedExecute(RunnableScheduledFuture<?> var1) {
        //如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
        if (this.isShutdown()) {
            this.reject(var1);
        } else {
            //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列(使用的是DelayedWorkQueue)
            super.getQueue().add(var1);
            //如果当前状态无法执行任务,则取消
            if (this.isShutdown() && !this.canRunInCurrentRunState(var1.isPeriodic()) && this.remove(var1)) {
                var1.cancel(false);
            } else {
                //这里是增加一个worker线程,避免提交的任务没有worker去执行。原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
                this.ensurePrestart();
            }
        }

    }

    // 由用户自己来实现特殊逻辑
    protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> var1, RunnableScheduledFuture<V> var2) {
        return var2;
    }

     void ensurePrestart() {
        int var1 = workerCountOf(this.ctl.get());
        if (var1 < this.corePoolSize) {
            this.addWorker((Runnable)null, true);
        } else if (var1 == 0) {
            this.addWorker((Runnable)null, false);
        }

    }
3-5-3-3 ScheduledFutureTask
  private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
        private final long sequenceNumber; // 任务的序号
        private long time; // 任务开始的时间
        private final long period; // 任务执行的时间间隔
        RunnableScheduledFuture<V> outerTask = this;
        int heapIndex;

        ScheduledFutureTask(Runnable var2, V var3, long var4) {
            super(var2, var3);
            this.time = var4;
            this.period = 0L;
            this.sequenceNumber = ScheduledThreadPoolExecutor.sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Runnable var2, V var3, long var4, long var6) {
            super(var2, var3);
            this.time = var4;
            this.period = var6;
            this.sequenceNumber = ScheduledThreadPoolExecutor.sequencer.getAndIncrement();
        }

        ScheduledFutureTask(Callable<V> var2, long var3) {
            super(var2);
            this.time = var3;
            this.period = 0L;
            this.sequenceNumber = ScheduledThreadPoolExecutor.sequencer.getAndIncrement();
        }

        public long getDelay(TimeUnit var1) {
            return var1.convert(this.time - ScheduledThreadPoolExecutor.this.now(), TimeUnit.NANOSECONDS);
        }

        // ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序
        public int compareTo(Delayed var1) {
            if (var1 == this) {
                return 0;
            } else if (var1 instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
                ScheduledThreadPoolExecutor.ScheduledFutureTask var5 = (ScheduledThreadPoolExecutor.ScheduledFutureTask)var1;
                long var3 = this.time - var5.time;
                if (var3 < 0L) {
                    return -1;
                } else if (var3 > 0L) {
                    return 1;
                } else {
                    return this.sequenceNumber < var5.sequenceNumber ? -1 : 1;
                }
            } else {
                long var2 = this.getDelay(TimeUnit.NANOSECONDS) - var1.getDelay(TimeUnit.NANOSECONDS);
                return var2 < 0L ? -1 : (var2 > 0L ? 1 : 0);
            }
        }

        public boolean isPeriodic() {
            return this.period != 0L;
        }

        private void setNextRunTime() {
            long var1 = this.period;
            if (var1 > 0L) {
                this.time += var1;
            } else {
                this.time = ScheduledThreadPoolExecutor.this.triggerTime(-var1);
            }

        }

        public boolean cancel(boolean var1) {
            boolean var2 = super.cancel(var1);
            if (var2 && ScheduledThreadPoolExecutor.this.removeOnCancel && this.heapIndex >= 0) {
                ScheduledThreadPoolExecutor.this.remove(this);
            }

            return var2;
        }

        public void run() {
            boolean var1 = this.isPeriodic();
            //如果当前线程池已经不支持执行任务,则取消
            if (!ScheduledThreadPoolExecutor.this.canRunInCurrentRunState(var1)) {
                this.cancel(false);
            } else if (!var1) { //如果不需要周期性执行,则直接执行run方法然后结束
                ScheduledFutureTask.super.run();
            } else if (ScheduledFutureTask.super.runAndReset()) { //如果需要周期执行,则在执行完任务以后,设置下一次执行时间
                // 计算下次执行该任务的时间
                this.setNextRunTime();
                //重复执行任务
                ScheduledThreadPoolExecutor.this.reExecutePeriodic(this.outerTask);
            }

        }
    }

  void reExecutePeriodic(RunnableScheduledFuture<?> var1) {
        if (this.canRunInCurrentRunState(true)) {
            super.getQueue().add(var1);
            if (!this.canRunInCurrentRunState(true) && this.remove(var1)) {
                var1.cancel(false);
            } else {
                this.ensurePrestart();
            }
        }

    }
3-5-3-4 DelayedWorkQueue

为什么要使用DelayedWorkQueue呢?
答:定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当
前队列中执行时间最靠前的,所以自然要使用优先级队列。
DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中
执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时
间复杂度是 O(logN)。

在这里插入图片描述
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务 是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

    static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture[16];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
        private Thread leader = null;
        private final Condition available;

        DelayedWorkQueue() {
            this.available = this.lock.newCondition();
        }

        private void setIndex(RunnableScheduledFuture<?> var1, int var2) {
            if (var1 instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
                ((ScheduledThreadPoolExecutor.ScheduledFutureTask)var1).heapIndex = var2;
            }

        }

        private void siftUp(int var1, RunnableScheduledFuture<?> var2) {
            while(true) {
                if (var1 > 0) {
                    int var3 = var1 - 1 >>> 1;
                    RunnableScheduledFuture var4 = this.queue[var3];
                    if (var2.compareTo(var4) < 0) {
                        this.queue[var1] = var4;
                        this.setIndex(var4, var1);
                        var1 = var3;
                        continue;
                    }
                }

                this.queue[var1] = var2;
                this.setIndex(var2, var1);
                return;
            }
        }

        private void siftDown(int var1, RunnableScheduledFuture<?> var2) {
            int var4;
            for(int var3 = this.size >>> 1; var1 < var3; var1 = var4) {
                var4 = (var1 << 1) + 1;
                RunnableScheduledFuture var5 = this.queue[var4];
                int var6 = var4 + 1;
                if (var6 < this.size && var5.compareTo(this.queue[var6]) > 0) {
                    var4 = var6;
                    var5 = this.queue[var6];
                }

                if (var2.compareTo(var5) <= 0) {
                    break;
                }

                this.queue[var1] = var5;
                this.setIndex(var5, var1);
            }

            this.queue[var1] = var2;
            this.setIndex(var2, var1);
        }

        private void grow() {
            int var1 = this.queue.length;
            int var2 = var1 + (var1 >> 1);
            if (var2 < 0) {
                var2 = 2147483647;
            }

            this.queue = (RunnableScheduledFuture[])Arrays.copyOf(this.queue, var2);
        }

        private int indexOf(Object var1) {
            if (var1 != null) {
                int var2;
                if (var1 instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
                    var2 = ((ScheduledThreadPoolExecutor.ScheduledFutureTask)var1).heapIndex;
                    if (var2 >= 0 && var2 < this.size && this.queue[var2] == var1) {
                        return var2;
                    }
                } else {
                    for(var2 = 0; var2 < this.size; ++var2) {
                        if (var1.equals(this.queue[var2])) {
                            return var2;
                        }
                    }
                }
            }

            return -1;
        }

        public boolean contains(Object var1) {
            ReentrantLock var2 = this.lock;
            var2.lock();

            boolean var3;
            try {
                var3 = this.indexOf(var1) != -1;
            } finally {
                var2.unlock();
            }

            return var3;
        }

        public boolean remove(Object var1) {
            ReentrantLock var2 = this.lock;
            var2.lock();

            boolean var4;
            try {
                int var3 = this.indexOf(var1);
                if (var3 >= 0) {
                    this.setIndex(this.queue[var3], -1);
                    int var10 = --this.size;
                    RunnableScheduledFuture var5 = this.queue[var10];
                    this.queue[var10] = null;
                    if (var10 != var3) {
                        this.siftDown(var3, var5);
                        if (this.queue[var3] == var5) {
                            this.siftUp(var3, var5);
                        }
                    }

                    boolean var6 = true;
                    return var6;
                }

                var4 = false;
            } finally {
                var2.unlock();
            }

            return var4;
        }

        public int size() {
            ReentrantLock var1 = this.lock;
            var1.lock();

            int var2;
            try {
                var2 = this.size;
            } finally {
                var1.unlock();
            }

            return var2;
        }

        public boolean isEmpty() {
            return this.size() == 0;
        }

        public int remainingCapacity() {
            return 2147483647;
        }

        public RunnableScheduledFuture<?> peek() {
            ReentrantLock var1 = this.lock;
            var1.lock();

            RunnableScheduledFuture var2;
            try {
                var2 = this.queue[0];
            } finally {
                var1.unlock();
            }

            return var2;
        }

        public boolean offer(Runnable var1) {
            if (var1 == null) {
                throw new NullPointerException();
            } else {
                RunnableScheduledFuture var2 = (RunnableScheduledFuture)var1;
                ReentrantLock var3 = this.lock;
                var3.lock();

                try {
                    int var4 = this.size;
                    if (var4 >= this.queue.length) {
                        this.grow();
                    }

                    this.size = var4 + 1;
                    if (var4 == 0) {
                        this.queue[0] = var2;
                        this.setIndex(var2, 0);
                    } else {
                        this.siftUp(var4, var2);
                    }

                    if (this.queue[0] == var2) {
                        this.leader = null;
                        this.available.signal();
                    }
                } finally {
                    var3.unlock();
                }

                return true;
            }
        }

        public void put(Runnable var1) {
            this.offer(var1);
        }

        public boolean add(Runnable var1) {
            return this.offer(var1);
        }

        public boolean offer(Runnable var1, long var2, TimeUnit var4) {
            return this.offer(var1);
        }

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> var1) {
            int var2 = --this.size;
            RunnableScheduledFuture var3 = this.queue[var2];
            this.queue[var2] = null;
            if (var2 != 0) {
                this.siftDown(0, var3);
            }

            this.setIndex(var1, -1);
            return var1;
        }

        public RunnableScheduledFuture<?> poll() {
            ReentrantLock var1 = this.lock;
            var1.lock();

            RunnableScheduledFuture var3;
            try {
                RunnableScheduledFuture var2 = this.queue[0];
                if (var2 != null && var2.getDelay(TimeUnit.NANOSECONDS) <= 0L) {
                    var3 = this.finishPoll(var2);
                    return var3;
                }

                var3 = null;
            } finally {
                var1.unlock();
            }

            return var3;
        }

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            ReentrantLock var1 = this.lock;
            var1.lockInterruptibly();

            try {
                while(true) {
                    while(true) {
                        RunnableScheduledFuture var2 = this.queue[0];
                        if (var2 != null) {
                            long var3 = var2.getDelay(TimeUnit.NANOSECONDS);
                            if (var3 <= 0L) {
                                RunnableScheduledFuture var14 = this.finishPoll(var2);
                                return var14;
                            }

                            var2 = null;
                            if (this.leader != null) {
                                this.available.await();
                            } else {
                                Thread var5 = Thread.currentThread();
                                this.leader = var5;

                                try {
                                    this.available.awaitNanos(var3);
                                } finally {
                                    if (this.leader == var5) {
                                        this.leader = null;
                                    }

                                }
                            }
                        } else {
                            this.available.await();
                        }
                    }
                }
            } finally {
                if (this.leader == null && this.queue[0] != null) {
                    this.available.signal();
                }

                var1.unlock();
            }
        }

        public RunnableScheduledFuture<?> poll(long var1, TimeUnit var3) throws InterruptedException {
            long var4 = var3.toNanos(var1);
            ReentrantLock var6 = this.lock;
            var6.lockInterruptibly();

            try {
                while(true) {
                    RunnableScheduledFuture var7 = this.queue[0];
                    if (var7 != null) {
                        long var22 = var7.getDelay(TimeUnit.NANOSECONDS);
                        if (var22 <= 0L) {
                            RunnableScheduledFuture var21 = this.finishPoll(var7);
                            return var21;
                        }

                        Thread var10;
                        if (var4 <= 0L) {
                            var10 = null;
                            return var10;
                        }

                        var7 = null;
                        if (var4 >= var22 && this.leader == null) {
                            var10 = Thread.currentThread();
                            this.leader = var10;

                            try {
                                long var11 = this.available.awaitNanos(var22);
                                var4 -= var22 - var11;
                            } finally {
                                if (this.leader == var10) {
                                    this.leader = null;
                                }

                            }
                        } else {
                            var4 = this.available.awaitNanos(var4);
                        }
                    } else {
                        if (var4 <= 0L) {
                            Object var8 = null;
                            return (RunnableScheduledFuture)var8;
                        }

                        var4 = this.available.awaitNanos(var4);
                    }
                }
            } finally {
                if (this.leader == null && this.queue[0] != null) {
                    this.available.signal();
                }

                var6.unlock();
            }
        }

        public void clear() {
            ReentrantLock var1 = this.lock;
            var1.lock();

            try {
                for(int var2 = 0; var2 < this.size; ++var2) {
                    RunnableScheduledFuture var3 = this.queue[var2];
                    if (var3 != null) {
                        this.queue[var2] = null;
                        this.setIndex(var3, -1);
                    }
                }

                this.size = 0;
            } finally {
                var1.unlock();
            }

        }

        private RunnableScheduledFuture<?> peekExpired() {
            RunnableScheduledFuture var1 = this.queue[0];
            return var1 != null && var1.getDelay(TimeUnit.NANOSECONDS) <= 0L ? var1 : null;
        }

        public int drainTo(Collection<? super Runnable> var1) {
            if (var1 == null) {
                throw new NullPointerException();
            } else if (var1 == this) {
                throw new IllegalArgumentException();
            } else {
                ReentrantLock var2 = this.lock;
                var2.lock();

                try {
                    RunnableScheduledFuture var3;
                    int var4;
                    for(var4 = 0; (var3 = this.peekExpired()) != null; ++var4) {
                        var1.add(var3);
                        this.finishPoll(var3);
                    }

                    int var5 = var4;
                    return var5;
                } finally {
                    var2.unlock();
                }
            }
        }

        public int drainTo(Collection<? super Runnable> var1, int var2) {
            if (var1 == null) {
                throw new NullPointerException();
            } else if (var1 == this) {
                throw new IllegalArgumentException();
            } else if (var2 <= 0) {
                return 0;
            } else {
                ReentrantLock var3 = this.lock;
                var3.lock();

                try {
                    RunnableScheduledFuture var4;
                    int var5;
                    for(var5 = 0; var5 < var2 && (var4 = this.peekExpired()) != null; ++var5) {
                        var1.add(var4);
                        this.finishPoll(var4);
                    }

                    int var6 = var5;
                    return var6;
                } finally {
                    var3.unlock();
                }
            }
        }

        public Object[] toArray() {
            ReentrantLock var1 = this.lock;
            var1.lock();

            Object[] var2;
            try {
                var2 = Arrays.copyOf(this.queue, this.size, Object[].class);
            } finally {
                var1.unlock();
            }

            return var2;
        }

        public <T> T[] toArray(T[] var1) {
            ReentrantLock var2 = this.lock;
            var2.lock();

            Object[] var3;
            try {
                if (var1.length < this.size) {
                    var3 = (Object[])Arrays.copyOf(this.queue, this.size, var1.getClass());
                    return var3;
                }

                System.arraycopy(this.queue, 0, var1, 0, this.size);
                if (var1.length > this.size) {
                    var1[this.size] = null;
                }

                var3 = var1;
            } finally {
                var2.unlock();
            }

            return var3;
        }

        public Iterator<Runnable> iterator() {
            return new ScheduledThreadPoolExecutor.DelayedWorkQueue.Itr((RunnableScheduledFuture[])Arrays.copyOf(this.queue, this.size));
        }

        private class Itr implements Iterator<Runnable> {
            final RunnableScheduledFuture<?>[] array;
            int cursor = 0;
            int lastRet = -1;

            Itr(RunnableScheduledFuture<?>[] var2) {
                this.array = var2;
            }

            public boolean hasNext() {
                return this.cursor < this.array.length;
            }

            public Runnable next() {
                if (this.cursor >= this.array.length) {
                    throw new NoSuchElementException();
                } else {
                    this.lastRet = this.cursor;
                    return this.array[this.cursor++];
                }
            }

            public void remove() {
                if (this.lastRet < 0) {
                    throw new IllegalStateException();
                } else {
                    DelayedWorkQueue.this.remove(this.array[this.lastRet]);
                    this.lastRet = -1;
                }
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/175952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

超级完整的 Git 下载、安装与配置

Git的下载、安装与配置 一、git下载安装 1、访问git官方下载网址&#xff0c;点击这里&#xff0c;然后根据自己的电脑系统&#xff0c;下载对应的安装包&#xff1a; 2、在淘宝镜像网站 下载对应的安装包&#xff1a; 注&#xff1a; 如果由于官网下载速度过于缓慢&#xff…

String 有趣简单的编程题

String 有趣简单的编程题 每博一文案 师父说: 世上没有真正的感同身受&#xff0c;也没有谁能完全做到将心比心&#xff0c;我们一路走来。 慢慢的学会了收敛情绪&#xff0c;越成熟越沉默&#xff0c;有些人&#xff0c;背负沉重的压力&#xff0c;却从来不敢说累&#xff0c…

[python刷题模板] 树的直径/换根DP

[python刷题模板] 树的直径/换根DP 一、 算法&数据结构1. 描述2. 复杂度分析3. 常见应用4. 常用优化二、 模板代码1. 单纯询问树的直径值2. 求出树的直径两端搞事情3. 换根DP求树的直径(大炮打蚊子&#xff0c;别这么做&#xff0c;只是用来帮助理解换根DP)4. 换根dp求特定…

UDS诊断系列介绍14-2F服务

本文框架1. 系列介绍1.1 2F服务概述2. 2F服务请求与应答2.1 2F服务请求2.2 2F服务正响应2.3 2F服务否定响应3. 2F诊断使用示例4. Autosar系列文章快速链接1. 系列介绍 UDS&#xff08;Unified Diagnostic Services&#xff09;协议&#xff0c;即统一的诊断服务&#xff0c;是…

学习记录667@项目管理之项目人力资源管理

什么是项目人力资源管理 项目人力资源管理包括编制人力资源管理计划、组建项目团队、建设项目团队与管理项目团队的各个过程&#xff0c;不但要求充分发挥参与项目的个人的作用&#xff0c;还包括充分发挥所有与项目有关的人员-----项目负责人、客户、为项目做出贡献的个人及其…

[QMT]04-在QMT之外调用xtquant直接编写策略

背景希望不用在QMT软件里面憋屈地写代码&#xff0c;想使用pychar、vscode、notepad等IDE编写python代码&#xff0c;因为有代码提示、补全。这完全没问题&#xff01;QMT简直是为个人量化交易者量身打造的神器&#xff0c;它支持以上想法。QMT这个东东基本是由两部分组成的&am…

1. 深度学习简介|计算机视觉简介|得分函数|损失函数作用|前向传播整体流程

文章目录深度学习简介计算机视觉简介k近邻算法得分函数损失函数作用前向传播整体流程机器学习是一个大块&#xff0c;其中就包含着深度学习&#xff0c;计算机视觉等 机器学习的流程&#xff1a; 数据获取特征工程建立模型评估与应用 深度学习简介 深度学习通过特征学习进行…

力扣sql简单篇练习(四)

力扣sql简单篇练习(四) 1 超过五名学生的课 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 1.2 示例sql语句 SELECT class FROM Courses GROUP BY class HAVING count(student)>51.3 运行截图 2 超过经理收入的员工 2.1 题目内容 2.1.1 基本题目信息 2.1.2 示例…

零信任-发展历程及概念(1)

零信任发展历程 2010 Forrester约翰金德维格正式提出零信任概念 2013 CSA成立软件定义边界SDP工作组,次年发布SDP标准规范1.0 2017 Gartner正式提出“CARTA”零信任模型 2018 Forrester发表零信任扩展模型ZTX 2019 Gartner发布零信任网络&#xff08;ZTNA&#xff09;云安…

String 与 StringBuffer 与 StringBuilder 各自的妙用

String 与 StringBuffer 与 StringBuilder 各自的妙用 每博一文案 我从未见过&#xff0c;一个早起&#xff0c;勤奋&#xff0c;谨慎&#xff0c;诚实的人&#xff0c;抱怨命运不好的。 最完美的状态&#xff0c;不是你从不失误&#xff0c;而是你从没放弃成长。没人能把你变…

【SpringMVC】看完这篇简单理解并入门SpringMVC:通过入门案例举例子的方式快速理解

SpringMVC简介1.什么是MVC2.什么是SpringMVC3.SpringMVC的特点4.入门案例1.准备工作2.配置web.xml3.创建请求控制器4.创建SpringMVC的配置文件5.测试HelloWorld6.总结1.什么是MVC MVC是一种软件架构的思想&#xff0c;将软件按照模型、视图、控制器来划分 M&#xff1a;Model&…

【Java】比较器 Comparator Comparable

一、背景 我们在使用 Collections.sort() 对链表进行排序&#xff08;或者使用 Arrays.sort() 对数组进行排序&#xff09;时&#xff0c;常常需要根据不同情况自定义排序规则。比如&#xff1a;当我们存储学生对象时&#xff0c;我们需要按照学生年龄进行排序&#xff0c;这时…

树状数组(Binary Indexed Tree (B.I.T))

树状数组 树状数组 (Binary Indexed Tree(B.I.T), Fenwick Tree) 是一个查询和修改复杂度都为 log(n) 的数据结构。 「前缀和查询」与「单点更新」 直接前驱&#xff1a;c[i] 的直接前驱为 c[i - lowbid(i)]&#xff0c;即 c[i] 左侧紧邻的子树的根。 直接后继&#xff1a;c[i…

财务精度:BigInteger 与 BigDecimal

财务精度&#xff1a;BigInteger 与 BigDecimal 每博一文案 师父说: 人这一辈子&#xff0c;真地好难。 有些人&#xff0c;好着好着&#xff0c;忽然就变陌生了&#xff0c;有些手&#xff0c;牵着牵着&#xff0c;瞬间就放开了&#xff0c;有些路&#xff0c;走着走着&#…

算法练习笔记——栈的常用方法以及算法练习

栈学习常用方法介绍力扣练习力扣 20. 有效的括号力扣 32. 最长有效括号常用方法介绍 Stack<Character> characters new Stack<>();//判断栈是否为空boolean empty characters.empty();//将a压入栈底&#xff0c;同时也返回aCharacter push characters.push(a);/…

MYSQL中的常见知识问题(一)

1、MYSQL中redolog、binlog 、undolog的区别与作用。redolog&#xff1a;即重做日志&#xff0c;用来实现事物的一个持久性&#xff0c;由radiobuff和radiolog两部分组成。其中 radiobuff是一个缓冲&#xff0c;存放在内存里面&#xff1b;radiolog是文件&#xff0c;存放在磁盘…

基于粒子群优化和引力搜索混合优化算法改进的前馈神经网络(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【3-神经网络八股】北京大学TensorFlow2.0

课程地址&#xff1a;【北京大学】Tensorflow2.0_哔哩哔哩_bilibiliPython3.7和TensorFlow2.1六讲&#xff1a;神经网络计算&#xff1a;神经网络的计算过程&#xff0c;搭建第一个神经网络模型神经网络优化&#xff1a;神经网络的优化方法&#xff0c;掌握学习率、激活函数、损…

走进 HTML

文章目录01 什么是HTML&#xff1f;02 HTML的基本结构03 网页基本标签04 图像标签05 链接标签06 块元素和行内元素07 列表07 表格08 视频和音频09 页面结构10 iframe内联框架11 表单语法&#x1f449; 表单元素格式&#x1f449; 表单的应用&#x1f449; 表单初级验证01 什么是…

【Mysql】 数据库用户管理

【Mysql】 数据库用户管理 DCL:英文全称是Data Control Language(数据控制语言)&#xff0c;用来管理数据库用户、控制数据库的访问权限。 1. 管理用户 想要对数据库用户进行操作&#xff0c;我们首先得进入 mysql 数据库 use mysql1.1 查询用户 select * from user;该条命…