Java ThreadPoolExecutor,Callable,Future,FutureTask 详解

news2024/10/7 6:49:23

目 录

一、ThreadPoolExecutor类讲解

1、线程池状态

五种状态

2、ThreadPoolExecutor构造函数

2.1)线程池工作原理

2.2)KeepAliveTime

2.3)workQueue 任务队列

2.4)threadFactory

2.5)handler 拒绝策略

3、常用方法:

二、线程池相关接口介绍

1、ExecutorService接口:

1.1)submit方法示例:

1.2)ExecutionException

1.3)submit()和execute()方法区别

1.4)ScheduledExecutorService接口

2、Callable接口

3、Future接口

3.1)Future接口方法

3.2)FutureTask类

4、FutureTask,Runable,Future关系​编辑

 三 、AbstractExecutorService介绍

1、submit方法

2、Executors callable 方法

3、invokeAll 方法

4、invokeAny方法

参考文献:​


一、ThreadPoolExecutor类讲解


1、线程池状态


五种状态

线程池 的状态

说明

RUNNING

允许提交并处理任务

SHUTDOWN

不允许提交新的任务,但是会处理完已提交的任务

STOP

不允许提交新的任务,也不会处理阻塞队列中未执行的任务,

并设置正在执行的线程的中断标志位

TIDYING

所有任务执行完毕,池中工作的线程数为0,等待执行terminated()勾子方法

TERMINATED

terminated()勾子方法执行完毕

线程池的shutdown() 方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态
线程池的shutdownNow()方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。
注:SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED

2、ThreadPoolExecutor构造函数


ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。

接下来我们分别讲解这些参数的含义。

2.1)线程池工作原理

corePoolSize :线程池中核心线程数的最大值
maximumPoolSize :线程池中能拥有最多线程数
workQueue:用于缓存任务的阻塞队列
当调用线程池execute() 方法添加一个任务时,线程池会做如下判断:

如果有空闲线程,则直接执行该任务;
如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;
如果没有空闲线程,且当前的线程数等于corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程;
如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于maximumPoolSize ,则创建新的线程执行任务;
如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务。


2.2)KeepAliveTime

keepAliveTime :表示空闲线程的存活时间
TimeUnit unit :表示keepAliveTime的单位
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

注:如果线程池设置了allowCoreThreadTimeout参数为true(默认false),那么当空闲线程超过keepaliveTime后直接停掉。(不会判断线程数是否大于corePoolSize)即:最终线程数会变为0。

2.3)workQueue 任务队列

workQueue :它决定了缓存任务的排队策略
ThreadPoolExecutor线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue 和 ArrayBlockingQueue。

1)有界队列:

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
2)无界队列:

LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。
注意:keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。

2.4)threadFactory

threadFactory :指定创建线程的工厂。(可以不指定)
如果不指定线程工厂时,ThreadPoolExecutor 会使用ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 “pool-XXX-thread-” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。

2.5)handler 拒绝策略

handler :表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。(可以不指定)
策略

ThreadPoolExecutor.AbortPolicy()

抛出RejectedExecutionException异常。默认策略

ThreadPoolExecutor.CallerRunsPolicy()

由向线程池提交任务的线程来执行该任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃最旧的任务(最先提交而没有得到执行的任务)

最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。

3、常用方法:


除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下方法进行设置。

此外,还有一些方法:

getCorePoolSize():返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
getMaximumPoolSize():返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
getLargestPoolSize():记录了曾经出现的最大线程个数(水位线);
getPoolSize():线程池中当前线程的数量;
getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;
prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize;
prestartCoreThread():会启动一个核心线程(同上);
allowCoreThreadTimeOut(true):允许核心线程在KeepAliveTime时间后,退出;
4、Executors类:
Executors类的底层实现便是ThreadPoolExecutor! Executors 工厂方法有:

Executors.newCachedThreadPool():无界线程池,可以进行自动线程回收
Executors.newFixedThreadPool(int):固定大小线程池
Executors.newSingleThreadExecutor():单个后台线程
它们均为大多数使用场景预定义了设置。不过在阿里java文档中说明,尽量不要用该类创建线程池。

二、线程池相关接口介绍


1、ExecutorService接口:


该接口是真正的线程池接口。上面的ThreadPoolExecutor以及下面的ScheduledThreadPoolExecutor都是该接口的实现类。改接口常用方法:

Future<?> submit(Runnable task):提交Runnable任务到线程池,返回Future对象,由于Runnable没有返回值,也就是说调用Future对象get()方法返回null;
<T> Future<T> submit(Callable<T> task):提交Callable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Callable的返回值;
<T> Future<T> submit(Runnable task,T result):提交Runnable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Runnable的参数值;
invokeAll(collection of tasks)/invokeAll(collection of tasks, long timeout, TimeUnit unit):invokeAll会按照任务集合中的顺序将所有的Future添加到返回的集合中,该方法是一个阻塞的方法。只有当所有的任务都执行完毕时,或者调用线程被中断,又或者超出指定时限时,invokeAll方法才会返回。当invokeAll返回之后每个任务要么返回,要么取消,此时客户端可以调用get/isCancelled来判断具体是什么情况。
invokeAny(collection of tasks)/invokeAny(collection of tasks, long timeout, TimeUnit unit):阻塞的方法,不会返回 Future 对象,而是返回集合中某一个Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。和invokeAll区别是只要有一个任务执行完了,就把结果返回,并取消其他未执行完的任务;同样,也带有超时功能;
shutdown():在完成已提交的任务后关闭服务,不再接受新任;
shutdownNow():停止所有正在执行的任务并关闭服务;
isTerminated():测试是否所有任务都执行完毕了;
isShutdown():测试是否该ExecutorService已被关闭。


1.1)submit方法示例:

我们知道,线程池接口中有以下三个主要方法,接下来我们看一下具体示例:

1)Callable:

public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS, 
            new ArrayBlockingQueue<Runnable>(50),  
            new ThreadFactory(){ public Thread newThread(Runnable r) {
                return new Thread(r, "schema_task_pool_" + r.hashCode());
            }}, new ThreadPoolExecutor.DiscardOldestPolicy());
 
public static void callableTest() {
    int a = 1;
    //callable
    Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
        @Override
        public Boolean call() throws Exception {
            int b = a + 100;
            System.out.println(b);
            return true;
        }
    });
    try {
        System.out.println("feature.get");
        Boolean boolean1 = future.get();
        System.out.println(boolean1);
    } catch (InterruptedException e) {
        System.out.println("InterruptedException...");
        e.printStackTrace();
    } catch (ExecutionException e) {
        System.out.println("execute exception...");
        e.printStackTrace();
    } 
}
2)Runnable:

public static void runnableTest() {
    int a = 1;
    //runnable
    Future<?> future1 = threadPool.submit(new Runnable(){
        @Override
        public void run() {
            int b = a + 100;
            System.out.println(b);
        }
    });
    try {
        System.out.println("feature.get");
        Object x = future1.get(900,TimeUnit.MILLISECONDS);
        System.out.println(x);//null
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        System.out.println("execute exception...");
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

3)Runnable+result:

class RunnableTask implements Runnable {
    Person p;
    RunnableTask(Person p) {
        this.p = p;
    }
 
    @Override
    public void run() {
        p.setId(1);
        p.setName("Runnable Task...");
    }
}
class Person {
    private Integer id;
    private String name;
    
    public Person(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + "]";
    }
}
 
public static void runnableTest2() {
    //runnable + result
    Person p = new Person(0,"person");
    Future<Person> future2 = threadPool.submit(new RunnableTask(p),p);
    try {
        System.out.println("feature.get");
        Person person = future2.get();
        System.out.println(person);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}


1.2)ExecutionException

线程池执行时,Callable的call方法(Runnable的run方法)抛出异常后,会出现什么?

在上面的例子中我们可以看到,线程池无论是执行Callable还是Runnable,调用返回的Future对象get()方法时需要处理两种异常(如果是调用get(timeout)方法,需要处理三种异常),如下:

//在线程池上运行
Future<Object> future = threadPool.submit(callable);
try {
    System.out.println("feature.get");
    Object x = future.get(900,TimeUnit.MILLISECONDS);
    System.out.println(x);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    System.out.println("execute exception...");
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}
如果get方法被打断,进入InterruptedException异常;
如果线程执行过程(call、run方法)中抛出异常,进入ExecutionException异常;
如果get方法超时,进入TimeoutException异常;


1.3)submit()和execute()方法区别

ExecutorService、ScheduledExecutorService接口的submit()和execute()方法都是把任务提交到线程池中,但二者的区别是

接收的参数不一样,execute只能接收Runnable类型、submit可以接收Runnable和Callable两种类型;
submit有返回值,而execute没有返回值;submit方便Exception处理;
1)submit方法内部实现:

其实submit方法也没有什么神秘的,就是将我们的任务封装成了RunnableFuture接口(继承了Runnable、Future接口),再调用execute方法,我们看源码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);  //转成 RunnableFuture,传的result是null
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

2)newTaskFor方法内部实现:

newTaskFor方法是new了一个FutureTask返回,所以三个方法其实都是把task转成FutureTask,如果task是Callable,就直接赋值,如果是Runnable 就转为Callable再赋值。

当submit参数是Callable 时:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
    }
当submit参数是Runnable时:

   // 按顺序看,层层调用
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);  //转 runnable 为 callable 
        this.state = NEW; 
    }
   // 以下为Executors中的方法
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {  //适配器
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {   
            task.run();
            return result;
        }
    }

看了源码就揭开了神秘面纱了,就是因为Future需要返回结果,所以内部task必须是Callable,如果task是Runnable 就偷天换日,在Runnable 外面包个Callable马甲,返回的结果在构造时就写好。

参考:搞懂Runnable、Callable、Future、FutureTask 及应用_赶路人儿的博客-CSDN博客

1.4)ScheduledExecutorService接口

继承ExecutorService,并且提供了按时间安排执行任务的功能,它提供的方法主要有:

schedule(task, initDelay): 安排所提交的Callable或Runnable任务在initDelay指定的时间后执行;
scheduleAtFixedRate():安排所提交的Runnable任务按指定的间隔重复执行;
scheduleWithFixedDelay():安排所提交的Runnable任务在每次执行完后,等待delay所指定的时间后重复执行;
注:该接口的实现类是ScheduledThreadPoolExecutor。

2、Callable接口


jdk1.5以后创建线程可以通过一下方式:

继承Thread类,实现void run()方法;
实现Runnable接口,实现void run()方法;
实现Callable接口,实现V call() Throws Exception方法
1)Callable和Runnale接口区别:

Callable可以抛出异常,和Future、FutureTask配合可以用来获取异步执行的结果;
Runnable没有返回结果,异常只能内部消化;
2)执行Callable的线程的方法可以通过以下两种方式:

借助FutureTask,使用Thread的start方法来执行;
加入到线程池中,使用线程池的execute或submit执行;
注:Callable无法直接使用Thread来执行;

我们都知道,Callable带有返回值的,如果我们不需要返回值,却又想用Callable该如何做?

jdk中有个Void类型(大写V),但必须也要return null。

threadpool.submit(new Callable<Void>() {
    @Override
    public Void call() {
        //...
        return null;
    }
});
3)通过Executors工具类可以把Runnable接口转换成Callable接口:

Executors中的callable方法可以将Runnable转成Callable,如下:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
}
RunnableAdapter类在上面已经看过源码,原理就是将返回值result作为成员变量,通过参数传递进去,进而实现了Runnable可以返回值。

示例:

public static void test5() {
        Person p = new Person(0,"person");
        RunnableTask runnableTask = new RunnableTask(p);//创建runnable
        Callable<Person> callable = Executors.callable(runnableTask,p);//转换
        Future<Person> future1 = threadPool.submit(callable);//在线程池上执行Callable
        try {
            Person person = future1.get();
            System.out.println(person);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        
        Runnable runnable = new Runnable() {//创建Runnable
            @Override
            public void run() {
                
            }
        };
        Callable<Object> callable2 = Executors.callable(runnable);//转换
        Future<Object> future2 = threadPool.submit(callable2);//在线程池上执行Callable
        try {
            Object o = future2.get();
            System.out.println(o);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }


3、Future接口


3.1)Future接口方法

Future是用来获取异步计算结果的接口,常用方法:

boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。
boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。
boolean isDone():如果任务已完成,则返回 true,可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
V get()throws InterruptedException,ExecutionException:获取异步结果,此方法会一直阻塞等到计算完成;
V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:获取异步结果,此方法会在指定时间内一直阻塞等到计算完成,超时后会抛出超时异常。
通过方法分析我们也知道实际上Future提供了3种功能:

能够中断执行中的任务;
判断任务是否执行完成;
获取任务执行完成后额结果。
但是Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。

3.2)FutureTask类

1)FutureTask类的实现:

public class FutureTask<V> implements RunnableFuture<V> {
//...
}
 
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
FutureTask实现了Runnable、Future两个接口。由于FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行。并且还可以直接通过get()函数获取执行结果,该函数会阻塞,直到结果返回。因此FutureTask既是Future、Runnable,又是包装了Callable( 如果是Runnable最终也会被转换为Callable ), 它是这两者的合体。

2)FutureTask的构造函数:

public FutureTask(Callable<V> callable) {
 
}
 
public FutureTask(Runnable runnable, V result) {
 
}
3.3)示例:(FutureTask两种构造函数、以及在Thread和线程池上运行)

1)FutureTask包装过的Callable在Thread、线程池上执行:

public static void test3() {
        int a = 1,b = 2;
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return a + b;
            }
        };
        //通过futureTask来执行Callable
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        
        //1.使用Thread执行线程
        new Thread(futureTask).start();
        try {
            Integer integer = futureTask.get();
            System.out.println(integer);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
        //2.使用线程池执行线程
        Executors.newFixedThreadPool(1).submit(futureTask);
        threadPool.shutdown();
        try {
            Integer integer = futureTask.get();
            System.out.println(integer);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } 
    }
2)FutureTask包装过的Runnable在Thread、线程池上执行:

public static void test4() {
        Person p = new Person(0,"person");
        RunnableTask runnableTask = new RunnableTask(p);
        
        //创建futureTask来执行Runnable
        FutureTask<Person> futureTask = new FutureTask<>(runnableTask,p);
        
        //1.使用Thread执行线程
        new Thread(futureTask).start();
        try {
            Person x = futureTask.get();
            System.out.println(x);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } 
        
        //2.使用线程池执行线程
        threadPool.submit(futureTask);
        threadPool.shutdown();
        try {
            Person y = futureTask.get();
            System.out.println(y);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
Person、RunnableTask类同上面的示例中。

4、FutureTask,Runable,Future关系
在这里插入图片描述

 三 、AbstractExecutorService介绍

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现。这些方法包括submit,invokeAny和InvokeAll。

注意的是来自Executor接口的execute方法在AbstractExecutorService中未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略。这个在分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来。

1、submit方法

首先来看submit方法,它的基本逻辑是这样的:

1). 生成一个任务类型和Future接口的包装接口RunnableFuture的对象

2). 执行任务

3). 返回future。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
因为submit支持Callable和Runnable两种类型的任务,因此newTaskFor方法有两个重载方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
Callable和Runnable的区别在于前者带返回值,也就是说Callable=Runnable+返回值。因此java中提供了一种adapter,把Runnable+返回值转换成Callable类型。这点可以在newTaskFor中的FutureTask类型的构造函数的代码中看到:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }
 
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

2、Executors callable 方法

以下是Executors.callable方法的代码:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
那么RunnableAdapter的代码就很好理解了,它是一个Callable的实现,call方法的实现就是执行Runnable的run方法,然后返回那个value。

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

3、invokeAll 方法

接下来先说说较为简单的invokeAll:

1). 为每个task调用newTaskFor方法生成得到一个既是Task也是Future的包装类对象的List

2). 循环调用execute执行每个任务

3). 再次循环调用每个Future的get方法等待每个task执行完成

4). 最后返回Future的list。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 为每个task生成包装对象
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
 
            long lastTime = System.nanoTime();
 
            // 循环调用execute执行每个方法
            // 这里因为设置了超时时间,所以每次执行完成后
            // 检查是否超时,超时了就直接返回future集合
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }
 
            // 等待每个任务执行完成
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

4、invokeAny方法

最后说说invokeAny,它的难点在于只要一个任务执行成功就要返回,并且会取消其他任务,也就是说重点在于找到第一个执行成功的任务。

这里我想到了BlockingQueue,当所有的任务被提交后,任务执行返回的Future会被依次添加到一个BlockingQueue中,然后找到第一个执行成功任务的方法就是从BlockingQueue取出第一个元素,这个就是doInvokeAny方法用到的ExecutorCompletionService的基本原理。

因为两个invokeAny方法都是调用doInvokeAny方法,下面是doInvokeAny的代码分析:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        // ExecutorCompletionService负责执行任务,后面调用用poll返回第一个执行结果
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        // 这里出于效率的考虑,每次提交一个任务之后,就检查一下有没有执行完成的任务
 
        try {
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();
 
            // 先提交一个任务
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;
 
            for (;;) {
                // 尝试获取有没有执行结果(这个结果是立刻返回的)
                Future<T> f = ecs.poll();
                // 没有执行结果
                if (f == null) {
                    // 如果还有任务没有被提交执行的,就再提交一个任务
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 没有任务在执行了,而且没有拿到一个成功的结果。
                    else if (active == 0)
                        break;
                    // 如果设置了超时情况
                    else if (timed) {
                        // 等待执行结果直到有结果或者超时
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        // 这里的更新不可少,因为这个Future可能是执行失败的情况,那么还需要再次等待下一个结果,超时的设置还是需要用到。
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    // 没有设置超时,并且所有任务都被提交了,则一直等到第一个执行结果出来
                    else
                        f = ecs.take();
                }
                // 有返回结果了,尝试从future中获取结果,如果失败了,那么需要接着等待下一个执行结果
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
 
            // ExecutorCompletionService执行时发生错误返回了全是null的future
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
 
        } finally {
            // 尝试取消所有的任务(对于已经完成的任务没有影响)
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }

 

参考文献:​


​https://blog.csdn.net/liuxiao723846/article/details/108026782
https://blog.csdn.net/xinruyulu/article/details/64453449
https://juejin.cn/post/6844903672736907272
https://blog.csdn.net/wszhongguolujun/article/details/89708668

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/848576.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

递归在树的深度遍历中的运用

树的深度遍历 对于树这种数据结构&#xff0c;之前一直使用的是层次遍历&#xff0c;也就是广度优先搜索的方式&#xff08;BFS&#xff09;&#xff1b;对于树的遍历&#xff0c;还可以进行深度优先搜索&#xff08;DFS&#xff09;。 而结合递归&#xff0c;树的深度优先搜索…

【硬件设计】模拟电子基础三--集成运算放大电路

模拟电子基础三--集成运算放大电路 一、集成运算放大器1.1 定义、组成与性能1.2 电流源电路1.3 差动放大电路1.4 理想运算放大器 二、集成运算放大器的应用2.1 反向比例运算电路2.2 同向比例运算电路2.3 反向加法运算电路2.4 反向减法运算电路2.5 积分运算电路2.6 微分运算电路…

vivado tcl创建工程和Git管理

一、Tcl工程创建 二、Git版本管理 对于创建完成的工程需要Git备份时&#xff0c;不需要上传完整几百或上G的工程&#xff0c;使用tcl指令创建脚本&#xff0c;并只将Tcl脚本上传&#xff0c;克隆时&#xff0c;只需要克隆tcl脚本&#xff0c;使用vivado导入新建工程即可。 优…

最详细,手机APP测试-ADB命令总结大全,你要的都在这...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 adb是什么&#x…

系统集成项目成本管理

在项目中&#xff0c;成本是指项目活动或其组成部分的货币价值或价格&#xff0c;包括为实施、完成或创造该活动或其组成部分所需资源的货币价值。具体的成本一般包括直接工时、其他百接费用、间接工时、其他间接费用以及采购价格。 项目全过程所耗用的各种成本的总和为项目成本…

无数资深果粉称之为 Mac 装机必备软件的 ——CleanMyMac X

它就是被无数资深果粉称之为 Mac 装机必备软件的 ——CleanMyMac X。或许你没用过它&#xff0c;但是大概率你身边一定有它的资深用户&#xff0c;作为 MacPaw 旗下的老牌清理软件&#xff0c;在全球已经拥有超过 2500 万次的下载量。 它有着五大强悍的功能&#xff0c;可以帮…

超融合基础架构 (HCI) 监控

什么是超融合基础架构 &#xff08;HCI&#xff09; 超融合基础架构 &#xff08;HCI&#xff09; 是一种软件定义的基础架构技术&#xff0c;它将计算、虚拟化和网络功能全部整合到一个设备中。超融合基础架构 &#xff08;HCI&#xff09; 解决方案使用软件和 x86 服务器来取…

【Linux命令详解 | less命令】Linux系统中用于分页显示文件内容的命令

文章标题 简介一&#xff0c;参数列表二&#xff0c;使用介绍1. 分页显示文件内容2. 搜索关键词3. 显示行号4. 显示特定内容5. 只显示匹配行6. 忽略大小写搜索7. 输出到文件8. 动态查看文件增长9. 开启对二进制文件的支持10. 显示控制字符11. 忽略键盘输入12. 显示百分比进度条…

深度学习环境安装依赖时常见错误解决

1.pydantic 安装pydantic时报以下错误&#xff1a; ImportError: cannot import name Annotated from pydantic.typing (C:\Users\duole\anaconda3\envs\vrh\lib\site-packages\pydantic\typing.py) 这个是版本错误&#xff0c;删除装好的版本&#xff0c;重新指定版本安装就…

设计模式——设计模式以及六大原则概述

设计模式代表有经验的面向对象软件开发人员使用的最佳实践。 设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。 这些解决方案是由许多软件开发人员在相当长的时间内通过试错获得的。 什么是 GOF&#xff08;四人帮&#xff0c;全拼 Gang of Four&#xff09…

微信QQ链接防红屏障源码页面

一、作用&#xff1a;使用跳转到浏览器方式&#xff0c;防止自己链接在微信和QQ等软件无法打开。 微信QQ链接防红屏障.zip - 蓝奏云文件大小&#xff1a;2.8 M|https://wwwf.lanzout.com/iwrWS14sfzcb 二、图片&#xff1a;​ 六、安装与使用&#xff1a;上传到空间即可 使用…

Dubbo是干嘛的,Dubbo原理和机制,Dubbo的核心组件

目录 一、介绍1、Dubbo是什么2、为什么需要Dubbo3、Dubbo的特性 二、 Dubbo的核心概念1、暴露和引用&#xff08;Export and Refer&#xff09;2、服务提供者和服务消费者3、注册中心4、负载均衡5、集群容错 三、Dubbo的架构1、服务提供者和服务消费者之间的通信流程2、Dubbo的…

NR CSI(六) CSI reporting using PUCCH

之前NR CSI(二) the workflow of CSI report有对CSI report的相关流程进行介绍&#xff0c;而这篇主要看下CSI reporting over PUCCH的相关规定。 CSI report在PUCCH上传输的场景如上表红色字体&#xff0c;有三种场景&#xff0c;具体的对应的是Periodic 和Semi-Persistent CS…

sentinel核心流程源码解析

sentinel的处理槽(ProcessorSlot) 可以说&#xff0c;sentinel实现的各种功能就是由各处理槽完成的 ,ProcessorSlot定义了四个方法&#xff1a; 当进入该处理槽时触发该方法 处理完 entry方法之后触发该方法 退出该处理槽时触发该方法 exit方法处理完成时触发该方法 sentinel的…

Gitlab CI/CD笔记-第二天-GitOps的流水线常用关键词(1)

一、常用关键词 在Gitlab项目的根目录需要创建一个 .gitlab-ci.yaml的文件。 这个文件就是定义的流水线。Call :"Pipeline as code" 二、这条流水线怎么写&#xff1f; 一、掌握常用的关键词即可。 1.关键词分类 1.全局关键词 Global Keywards 2.任务关键词…

如何将jar包部署到宝塔

尝试多种方式上传&#xff0c;但启动一直失败&#xff0c;这种方式亲测是好使的 项目内修改位置 在pom.xml文件中将mysql的scope改成provided&#xff0c;如果是固定的版本号会出现问题 之后就可以打包啦&#xff0c;直接点击maven中的package 找到打包文件的位置&#xff…

人工智能贷款公司upstart:从挫折到复出,从喧嚣中崛起

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 总结&#xff1a; &#xff08;1&#xff09;对Upstart&#xff08;UPST&#xff09;所在次级抵押贷款市场来说&#xff0c;最糟糕的时期可能已经过去了&#xff0c;因为情况正在出现好转。 &#xff08;2&#xff09;Upst…

如何选择适合您需求的新闻稿件校对软件

选择适合您需求的新闻稿件校对软件时&#xff0c;可以考虑以下几个因素&#xff1a; 1.校对功能&#xff1a;了解软件的校对功能&#xff0c;包括拼写检查、语法检查、词汇和语义检查等方面。确保软件能够满足您的基本校对需求&#xff0c;并提供准确的建议和改进意见。 2.多语…

Windows11环境下VS2019调用Pytorch语义分割模型(C++版)

语义分割模型在训练时往往采用python脚本进行网络搭建和训练&#xff0c;并获得训练好的模型。为了提高效率方便整个工程项目部署&#xff0c;实际工程应用中通常希望使用C编程语言调用训练好的网络模型。查询大量网络资料并踩过无数坑后&#xff0c;经实际测试实现了在window1…

Java课题笔记~ 使用 Spring 的事务注解管理事务(掌握)

通过Transactional 注解方式&#xff0c;可将事务织入到相应 public 方法中&#xff0c;实现事务管理。 Transactional 的所有可选属性如下所示&#xff1a; propagation&#xff1a;用于设置事务传播属性。该属性类型为 Propagation 枚举&#xff0c; 默认值为 Propagation.R…