(十四)线程池

news2024/11/18 12:17:48

线程池

线程池是一组线程的集合。线程池维护一个队列,调用者向这个队列中添加任务,而线程池中的线程则不停地从队列中取出任务执行。

线程池的继承关系如下图,其中 ThreadPoolExecutorScheduledThreadPoolExecutor是具体的实现。

ThreadPoolExecutor

基本的线程池,可以提交无返回值和有返回值的任务执行。重载的构造方法最长有七个参数,分比是 核心线程(常驻线程)个数,最大线程个数(当核心线程已满,任务队列也满,最大扩充线程的个数),非核心线程空闲的时间,时间单位,阻塞任务队列,线程生产工厂,饱和策略(用来处理核心线程已满,最大线程已满,任务队列满时的新到来任务的策略,也叫拒绝策略)。
其中,饱和策略共四种:
1.DiscardOldestPolicy,抛弃最早的任务,将新任务加入队列。
2.AbortPolicy,拒绝执行新任务,并抛出异常。
3.CallerRunsPolicy,交由调用者线程执行新任务,如果调用者线程已关闭,则抛弃任务。
4.DiscardPolicy,直接抛弃新任务。
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

使用

一般在使用线程池时需要自行定制,一般来说普遍的理论是:
如果是CPU密集型任务:那么核心线程数 = CPU核心数 + 1;加1是为了防止突发情况导致某个线程不可执行,最大化利用CPU。
如果是IO密集型任务:那么核心线程数 = CPU核心数 * 2;

实际在应用过程中许多任务也分不出是IO密集还是CPU密集,上述理论也不一定是最好的,如果对性能追求极致,需要自行探索,也可以通过线程池的一些set方法动态修改核心线程数,空闲时间等参数。
下面是一个简单的例子:
package com.example.demo;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadPool {
    private static final ThreadPoolExecutor executor;
    //线程所属的线程组
    private static final ThreadGroup group = new ThreadGroup("my-group");
    //线程名字的前缀
    private static final String prefix = "my-thread-";
    //线程计数器
    private static final AtomicInteger atomicInteger=new AtomicInteger(0);

    static {
        int core = Runtime.getRuntime().availableProcessors();
        ThreadFactory factory= r -> new Thread(group, r, prefix+atomicInteger.incrementAndGet());
        executor = new ThreadPoolExecutor(
                core,
                core*2,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(32),
                factory,
                //一共四种策略,这里采用调用者执行策略
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        //虚拟机退出时关闭线程池
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                executor.shutdown();
                try {
                    boolean flag;
                    do {
                        //一直等待线程池关闭完成
                        flag = executor.awaitTermination(1,TimeUnit.SECONDS);
                    } while (!flag);
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
        
    }

    public static ThreadPoolExecutor getExecutor(){
        return executor;
    }
}
测试自定义的线程池
package com.example.demo;

import org.junit.Test;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestClass {
    Random random = new Random();
    @Test
    public void test() throws InterruptedException {
        //采用闭锁等待所有线程池线程执行完成
        CountDownLatch latch=new CountDownLatch(50);
        ThreadPoolExecutor executor = MyThreadPool.getExecutor();
        for (int i=0;i<50;i++) {
            final int j = i;
            executor.execute(()->{
                try {
                    System.out.println(String.format("我是线程:%s,执行第%d个任务",
                            Thread.currentThread().getName(),j));
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
    }
}

实现原理

基本结构

部分源码如下,包含了主要的属性
public class ThreadPoolExecutor extends AbstractExecutorService {
    //线程池状态控制变量,一个int包含两部分,低29位为workCount,高3位为runState,初始为RUNNING状态,workCount=0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //workCount最大2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //线程池运行的状态,共5种
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;
    //访问变量用的互斥锁
    private final ReentrantLock mainLock = new ReentrantLock();
    //任务线程的集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //awaitTermination()方法的条件变量
    private final Condition termination = mainLock.newCondition();

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

execute方法

分3步进行:
1、当前工作线程数与corePoolSize比较,如果小于则尝试启动一个新线程,使用给定的参数command作为其第一个任务,否则执行2步骤
2、如果任务能够成功入队,则再次检查运行状态,如果不是运行状态则从队列中移除任务;如果不能入队则执行3步骤
3、如果不能将任务入队,则尝试添加一个新的非核心线程。如果增加失败了,则执行拒绝策略。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //判断线程个数是否小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果是运行状态,则入队,任务队列没满则入队成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再检查状态是不是运行状态,不是则尝试从队列中移除任务,移除成功则执行饱和策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //启动一个线程,什么都不做
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //核心线程、任务队列都满了,增加非核心线程,如果也增加失败,则执行饱和策略
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        //获取运行状态
        int rs = runStateOf(c);

        // 运行状态>= SHUTDOWN,说明线程池进入关闭状态
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //获取workCount
            int wc = workerCountOf(c);
            //线程超出个数,返回失败
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS操作,将workCount+1,成功则退出循环,否则重新读取控制变量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //CAS失败,如果状态不变继续进行内部for循环,如果变了则跳到外部for循环重新开始
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    //标识任务已启动
    boolean workerStarted = false;
    //标识任务已新增
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {

                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    //线程是活跃状态
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    //更新最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //如果新增成功,则启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //线程没启动成功,当做增加失败处理,addWorkerFailed内部把workCount减1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

submit方法

submit方法定义在 AbstractExecutorService中:
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    //封装成RunnableFuture执行,RunnableFuture实现了Runnable
    RunnableFuture<T> ftask = newTaskFor(task);
    //执行
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
Callable
在线程池中执行的任务有两种类型,一种是无参无返回值的,例如Runnable类型的,还有一种是无参有返回值的,即为Callable类型的。Callable接口如下定义:
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Worker

任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
private final class Worker extends AbstractQueuedSynchronizer 
implements Runnable {
    //.......
    /** 运行worker的线程. */
    final Thread thread;
    /** 接收的第一个任务 */
    Runnable firstTask;
    /** 执行完成的任务个数 */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); //初始为RUNNING状态
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
    //.......
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //不停地从队列中取出任务
        while (task != null || (task = getTask()) != null) {
            w.lock();//执行任务之前先上锁,AQS实现
            //检测线程池状态,如果状态>=STOP,则线程给自己发中断信号
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //钩子函数
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //完成的任务数+1
                w.completedTasks++;
                w.unlock();
            }
        }
        //可根据此变量判断Worker以何种方式退出(正常,中断,其他异常)
        completedAbruptly = false;
    } finally {
        //清理worker
        processWorkerExit(w, completedAbruptly);
    }
}

钩子方法

ThreadPoolExecutor提供了下列的钩子方法,如果要继承ThreadPoolExecutor实现自己的线程池,可以考虑实现下列方法。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

关闭线程池

当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
状态转换过程
线程池有两个关闭函数,shutdown()和shutdownNow(),这两个函数会让线程池切换到不同的状态。但无论是哪个状态,在队列为空且线程池也为空之后,会切换为TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才算真正关闭。
状态迁移是从小到大,按照-1,0,1,2,3的顺序,只会正向迁移,不能颠倒。
两种shutdown的区别
shutdown()方法不会清空任务队列,会等所有任务执行完成,而shutdownNow()会清空任务队列。
shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。

tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子函数terminated()。当钩子函数执行完成时,把状态从TIDYING 改为TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。

部分源码如下:
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检查是否有关闭线程池的权限
        checkShutdownAccess();
        //CAS操作,将线程池状态迁移为SHUTDOWN
        advanceRunState(SHUTDOWN);
        //关闭空闲的线程
        interruptIdleWorkers();
        //给ScheduledThreadPoolExecutor留的钩子方法
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    //迁移到TERMINATED状态
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检查是否有关闭线程池的权限
        checkShutdownAccess();
        //CAS操作,将线程池状态迁移为STOP
        advanceRunState(STOP);
        //关闭所有线程
        interruptWorkers();
        //移除任务队列的元素并返回它们
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //迁移到TERMINATED状态
    tryTerminate();
    return tasks;
}

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //线程未中断且Worker尝试加锁成功,说明线程处于空闲状态,如果加锁失败,说明线程当前持有锁,正在执行任务
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //中断所有线程
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //任务队列为空,workCount=0会走到这里
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //迁移到TIDYING状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //调用钩子函数
                    terminated();
                } finally {
                    //迁移到TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    //通知等待的awaitTermination()方法
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // 迁移状态失败则重新循环
    }
}
推荐的关闭方式
executor.shutdown();
try {
    boolean flag;
    do {
        //一直等待线程池关闭完成
        flag = executor.awaitTermination(1,TimeUnit.SECONDS);
    } while (!flag);
} catch (Exception e){
    e.printStackTrace();
}

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,除了基本的线程池功能,还实现了
ScheduledExecutorService,可以延迟执行任务或按频率进行任务调度,具体体现在以下两方面:
1、延迟执行任务
2、周期执行任务
对应的方法如下:
//延迟执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);

//周期执行任务,按固定频率执行,与任务本身执行时间无关,但是任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,
TimeUnit unit);
//周期执行任务,按固定间隔执行,与任务本身执行时间有关。假如任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit);

使用

调度线程池的构建比较简单,重载的构造方法最多需要3个参数,分别是 核心线程数,线程工厂和饱和策略。下面简单演示了几种延迟调度的方式:
public class TestClass {
    private static final ThreadGroup group = new ThreadGroup("schedule-group");
    private static final String prefix = "schedule-thread-";
    private static final AtomicInteger atomicInteger=new AtomicInteger(0);
    private static final ThreadLocal<DateTimeFormatter> timeFormatters=new ThreadLocal();

    private DateTimeFormatter getFormatter(){
        DateTimeFormatter formatter = timeFormatters.get();
        if (Objects.isNull(formatter)){
            DateTimeFormatter formatter1=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
            timeFormatters.set(formatter1);
            return formatter1;
        }
        return formatter;
    }
    @Test
    public void schedule(){
        ThreadFactory factory= r -> new Thread(group, r ,prefix+atomicInteger.incrementAndGet());
        int i = Runtime.getRuntime().availableProcessors();
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(i, factory, new ThreadPoolExecutor.CallerRunsPolicy());
        System.out.println("启动调度线程池:"+getFormatter().format(LocalDateTime.now()));
        //延迟执行,执行一次
        ScheduledFuture<String> delay = executor.schedule(
                () -> "单次延迟执行开始时间:"+getFormatter().format(LocalDateTime.now()),
                3, TimeUnit.SECONDS);
        //固定频率执行
        executor.scheduleAtFixedRate(() ->
                System.out.println("固定频率开始执行:"+getFormatter().format(LocalDateTime.now())),
                1,3, TimeUnit.SECONDS);
        //固定延迟执行
        executor.scheduleWithFixedDelay(()->
                System.out.println("固定延迟开始执行:"+getFormatter().format(LocalDateTime.now())),
                        1,3,TimeUnit.SECONDS);
        while (!delay.isDone()) {
            try {
                String s = delay.get();
                System.out.println("单次延迟执行结果:"+s);
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
            break;
        }
        while (true){}
    }
}

实现原理

构造方法调用了 ThreadPoolExecutor的构造方法,最大线程个数设置为了int的最大值,空闲时间也设为0,任务队列也固定为一个自定义延迟队列。
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

基本结构

DelayedWorkQueue
ScheduledThreadPoolExecutor的任务队列使用了一个自定义的延迟队列 DelayedWorkQueue
跟普通的延迟队列相差不大,这个静态内部类定义如下:
static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

        private static final int INITIAL_CAPACITY = 16;
        //RunnableScheduledFuture的唯一实现是ScheduledFutureTask
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
        //队头等待的线程
        private Thread leader = null;
        //当队列头部有新的任务可用或新线程可能需要成为leader时发出的条件
        private final Condition available = lock.newCondition();
        //......
}
ScheduledFutureTask
ScheduledThreadPoolExecutor还有一个内部类 ScheduledFutureTask,实现如下:
private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    //任务序号
    private final long sequenceNumber;
    //以纳秒为单位,启动任务的时间(周期任务启动时间一直会累加)
    private long time;
    //周期(以纳秒为单位)用于重复任务。正值表示固定速率执行。负值表示固定延迟执行。0表示非重复任务
    private final long period;
    //通过reExecutePeriodic()方法重新加入队列的实际任务
    RunnableScheduledFuture<V> outerTask = this;
    //当前任务在延迟队列中的索引,用于更快的取消任务
    int heapIndex;

    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }

    public int compareTo(Delayed other) {
        if (other == this) 
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    public boolean isPeriodic() {
        return period != 0;
    }

    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }

    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();//设置下一次启动的时间
            reExecutePeriodic(outerTask);//重新入队
        }
    }
}

schedule方法(Runnable)

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //把Runnable包装成RunnableScheduledFuture,ScheduledFutureTask是其实现
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //线程池关闭则执行拒绝策略
    if (isShutdown())
        reject(task);
    else {
        //把任务放入延迟队列
        super.getQueue().add(task);
        //线程池关闭或者 当前线程池状态不能运行任务,且从队列中移除任务成功,则取消任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    //当小于核心线程数只开启新线程(不运行任务)
    if (wc < corePoolSize)
        addWorker(null, true);
    //即使corePoolSize=0,当第一次运行时,也启动一个新线程
    else if (wc == 0)
        addWorker(null, false);
}

scheduleWithFixedDelay和scheduleAtFixedRate方法

两个方法实现几乎一模一样,唯一的区别是ScheduledFutureTask的构造方法一个是正数,一个是负数,在 ScheduledFutureTask中,正值表示固定速率执行。负值表示固定延迟执行。0表示非重复任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));//负数
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));//正数
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

Future

Future的继承关系如下图所示

RunnableFuture

接口定义如下,继承了Runnable和Future两个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    
    void run();
}

FutureTask

这个类是RunnableFuture的实现,内部存有一个Callable,作为一个适配器,将Callable转换为Runnable执行。

基本结构

public class FutureTask<V> implements RunnableFuture<V> {
    //任务状态
    private volatile int state;
    //初始状态
    private static final int NEW          = 0;
    //中间状态
    private static final int COMPLETING   = 1;
    //以下都是最终状态
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    //中间状态
    private static final int INTERRUPTING = 5;
    //最终状态
    private static final int INTERRUPTED  = 6;

    /** 实际要执行的Callable,通过构造函数传入 */
    private Callable<V> callable;
    /** 存放执行结果或者抛出的异常 */
    private Object outcome;
    /** 运行Callable的线程 */
    private volatile Thread runner;
    /** 在get方法等待的线程链表 */
    private volatile WaitNode waiters;

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       
    }

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
}

run方法

public void run() {
    //任务状态不是NEW 或者 其他线程在执行任务,结束方法
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //异常存入outcome变量
                setException(ex);
            }
            if (ran)
                //返回值存入outcome变量
                set(result);
        }
    } finally {
        //置空
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)//处理中断
            handlePossibleCancellationInterrupt(s);
    }
}
//状态 NEW->COMPLETING->EXCEPTIONAL
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        //通知等待的线程
        finishCompletion();
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //通知等待的线程
        finishCompletion();
    }
}

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //唤醒
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; 
                q = next;
            }
            break;
        }
    }
    //空实现
    done();
    callable = null;
}

get方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)//不是正在完成状态,无限期等待
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//等待指定时间
        throw new TimeoutException();
    return report(s);
}
//返回结果
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {//线程中断了则从链表中移除
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 正处于中间状态
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)//入队
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else//阻塞
            LockSupport.park(this);
    }
}

Executors

这个类是Executor框架的辅助工具类,用来创建预定义的线程池和一些任务(Callable)。在实际环境下,不建议直接使用此工具创建的线程池,特别是无界阻塞队列和不限制线程个数的实现,容易造成OOM,资源耗尽等严重问题。
//每来一个任务,就创建一个线程
ExecutorService executorService1 = Executors.newCachedThreadPool();
//固定线程数的线程池
ExecutorService executorService2 = Executors.newFixedThreadPool(6);
//多线程的调度线程池
ExecutorService executorService3 = Executors.newScheduledThreadPool(8);
//单线程线程池
ExecutorService executorService4 = Executors.newSingleThreadExecutor();
//单线程调度线程池
ExecutorService executorService5 = Executors.newSingleThreadScheduledExecutor();
//forkjoin线程池
ExecutorService executorService6 = Executors.newWorkStealingPool();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/172806.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode刷题模版:141 - 150

目录 简介141. 环形链表142. 环形链表 II143. 重排链表144. 二叉树的前序遍历145. 二叉树的后序遍历146. LRU 缓存【未实现】147. 对链表进行插入排序148. 排序链表149. 直线上最多的点数150. 逆波兰表达式求值结语简介 Hello! 非常感谢您阅读海轰的文章,倘若文中有错误的地方…

多轮对话(三):Spoken Language Understanding 进展和前沿

本篇博客基于哈工大发表在IJCAI上的论文&#xff1a;A Survey on Spoken Language Understanding - Recent Advances and New Frontiers。 论文链接 github链接 口语理解&#xff08;SLU&#xff09;旨在提取用户查询的语义框架&#xff0c;是面向任务的对话系统的核心组件。本…

excel函数技巧:两个查询函数的用法比较 上篇

EXCEL函数江湖烽烟再起&#xff0c;函数大擂台迎来两位重量级选手。守擂者是号称全民偶像、人见人爱车见车载的巨星级函数VLOOKUP&#xff0c;挑战者则是名气不大实力强劲高手的LOOKUP函数&#xff01;这对与生俱来的对手&#xff0c;究竟会在函数擂台上擦出怎样的火花&#xf…

Nginx原理

一、master和worker二、worker当客户端发送请求&#xff0c;先到达master,master通知所有的worker,然后所有的worker开始竞争任务。三、一个master和多个worker有什么好处&#xff08;1&#xff09;可以使用nginx -s reload热部署&#xff0c;利用nginx进行热部署&#xff08;2…

8、MariaDB11数据库安装初始化密码Navicat连接

MariaDB11安装 安装前准备 下载安装包 点我去MariaDB官网下载安装包 查看相关文档 Mariadb Server官方文档 使用zip安装 解压缩zip 将下载到的zip解压缩到想安装的位置。 生成data目录 打开cmd并进入到刚才解压后的bin目录&#xff0c; 执行mysql_install_db.exe程序生…

Python异步编程Future对象详解

今天继续给大家介绍Python相关知识&#xff0c;本文主要内容是Python异步编程Future对象详解。 一、Python Future对象简介 在上文Python Task对象详解中&#xff0c;我们介绍到了Task对象&#xff0c;而Future对象是Task对象的基类&#xff0c;比Task更加底层。一个Future是…

英方软件在科创板上市:总市值89亿元,胡军擎、江俊夫妇为实控人

1月19日&#xff0c;上海英方软件股份有限公司&#xff08;下称“英方软件”&#xff0c;SH:688435&#xff09;在上海证券交易所科创板上市。本次上市&#xff0c;英方软件的发行价为38.66元/股&#xff0c;发行2094.6737万股&#xff0c;募资总额约为8.10亿元&#xff0c;募资…

linux的工具(yum,vim)

前言 linux工具的意义Linux已经成为工作、娱乐和个人生活等多个领域的支柱&#xff0c;人们已经越来越离不开它。在 Linux 的帮助下&#xff0c;技术的变革速度超出了人们的想象&#xff0c;Linux 开发的速度也以指数规模增长。因此&#xff0c;越来越多的开发者也不断地加入开…

pycharm远程链接服务器配置

拿到gpu的节点以后开始下面的配置 1. 下载专业版pycharm&#xff0c;一定是专业版&#xff0c;community版本没有远程连接ssh的功能。 2. python编译器->添加ssh编译器->新创建服务器配置 3. 输入host名&#xff0c;用户名。 host名字如&#xff1a;vpcc-gpu032&#xf…

【自学Docker】Docker wait命令

Docker wait命令 大纲 docker wait教程 docker wait 命令可以用于阻塞一个或多个 Docker容器 直到容器停止&#xff0c;然后打印退出代码。 docker wait命令后面的 CONTAINER 可以是容器Id&#xff0c;或者是容器名。 docker wait语法 haicoder(www.haicoder.net)# docker…

基于蒙特卡洛随机潮流研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

代码审计-8 ThinkPHP框架代码审计 2

文章目录熟悉网站结构确定网站的路由1.通过分析url直接得出路由2.查看app/route.php了解参数过滤情况SQL注入举例任意文件下载与删除任意文件下载代码分析任意文件删除代码分析熟悉网站结构 首先对系统的功能点进行大致的了解&#xff0c;对系统的目录情况进行大致了解&#x…

9. 列表list类型详解

python3 list类型的使用 1. 基本知识 List&#xff08;列表&#xff09; 是 Python 中使用最频繁的数据类型。 列表可以完成大多数集合类的数据结构实现。列表中元素的类型可以不相同&#xff0c;它支持数字&#xff0c;字符串甚至可以包含列表&#xff08;所谓嵌套&#xff…

全球化商家平台技术探索与演进

作者&#xff1a;马金金 阿里全球化业务平台团队 全球化业务高速发展给平台技术带来了极大的挑战。如何差异化支撑全局业务的高效迭代&#xff1f;如何轻量化支撑创新业务的快速建站&#xff1f;本文将聚焦全球化商家平台技术架构演进&#xff0c;为大家分享背后的技术思考。 一…

Understanding LSTM Networks

文章目录Recurrent Neural NetworksThe Problem of Long-Term DependenciesLSTM NetworksThe Core Idea Behind LSTMs.Step-by-Step LSTM Walk Through本篇文章记述了自己对“Understanding LSTM Networks”的理解 Recurrent Neural Networks Humans don’t start their thin…

springmvc统一日志打印request和response内容

在web项目中&#xff0c;有不少场景需要统一处理一些和实际业务基本不相关的逻辑&#xff0c;比如rest接口的监控、出入参日志、操作记录、统一异常处理(避免将错误堆栈等信息直接打到web端)。如果你觉得日志打印rest接口出入参非常简单&#xff0c;直接getParameter()就好了&a…

Redis6学习笔记【part4】Jedis-API与手机验证码功能实现

1.连接 Jedis 第一步&#xff0c;修改 redis 的配置&#xff0c;以允许外网 ip 访问 redis。 在 redis.conf 中注释掉 bind 127.0.0.1 &#xff0c;并修改 protected-mode no 。 第二步&#xff0c;导入依赖。 <dependency><groupId>redis.clients</groupId…

Unity 进阶 之 资源文件夹下资源名的重名检查,并简单生产资源表的方法整理

Unity 进阶 之 资源文件夹下资源名的重名检查,并简单生产资源表的方法整理 目录 Unity 进阶 之 资源文件夹下资源名的重名检查,并简单生产资源表的方法整理 一、简单介绍 二、简单实现过程 三、关键代码 一、简单介绍 Unity中的一些知识点整理。 本节简单介绍在Unity开发…

python使用sentinelsat库下载sentinel影像数据

GIS遥感不分家&#xff0c;最近开始找一些影像的下载脚本了&#xff0c;这两天搞定了哨兵和modis的&#xff0c;分别贴一下 鉴于《Python中使用sentinelsat包自动下载Sentinel系列数据》这篇文章已经写得非常全乎&#xff0c;这里就简单补充一下&#xff0c;放个最简单的下载脚…

Vue CLI(Vue.js 开发的标准工具)

Vue CLI&#xff08;Vue.js 开发的标准工具&#xff09;参考描述Vue CLI获取检测项目创建项目Please pick a presetCheck the features needed for your projectChoose a version of Vue.jsPrefer placing configSave this as a preset for future projects?Save preset asFin…