1.Future和Callable接口
Future 是一个表示异步计算结果的接口;
接口Callable与线程功能密不可分,但和Runnable的主要区别为:
1)Callable接口的call()方法可以有返回值,而Runnable接口的run()方法没有返回值。
2)Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。
执行完Callable接口中的任务后,返回值是通过Future接口进行获得的。
• 接口Future中的方法:
• cancel(boolean mayInterruptIfRunning): 尝试取消任务。
• isCancelled(): 检查任务是否被取消。
• isDone(): 用于判断该计算是否已经完成。
• get(): 获取任务的结果(会阻塞直到任务完成)。
• get(long timeout, TimeUnit unit): 在指定的时间内等待并获取任务结果。
1.1 Future get()+ Callable 示例:
方法submit(Callable<T>)可以执行参数为Callable的任务。
方法get()用于获得返回值。
创建类MyCallable.java代码如下:
package mycallable;
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
private int age;
public MyCallable(int age) {
super();
this.age = age;
}
public String call() throws Exception {
Thread.sleep(8000);
return "返回值 年龄是:" + age;
}
}
创建类Run.java代码如下:
package test.run;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import mycallable.MyCallable;
public class Run {
public static void main(String[] args) throws InterruptedException {
try {
MyCallable callable = new MyCallable(100);
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 5L,
TimeUnit.SECONDS, new LinkedBlockingDeque());
Future<String> future = executor.submit(callable);
System.out.println("main A " + System.currentTimeMillis());
System.out.println(future.get());
System.out.println("main B " + System.currentTimeMillis());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
1.2 Future isDone()+ Callable示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureIsDoneExample {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建一个Callable任务
Callable<String> callableTask = () -> {
Thread.sleep(2000); // 模拟一个耗时任务
return "Task's execution";
};
// 提交任务并获得Future对象
Future<String> future = executorService.submit(callableTask);
// 使用isDone()方法来检查任务是否完成
while (!future.isDone()) {
System.out.println("Task is still not done...");
try {
Thread.sleep(500); // 让主线程等待一段时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 任务完成后,获取结果
try {
String result = future.get(); // 这将阻塞直到结果可用
System.out.println("Task completed! Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 关闭线程池
executorService.shutdown();
}
}
代码解释:
1. 创建线程池: 使用 Executors.newSingleThreadExecutor() 创建一个单线程的线程池。
2. 创建任务: 使用 Callable 接口创建一个任务,它将在 2 秒后返回一个字符串。
3. 提交任务并获取 Future 对象: 使用 executorService.submit(callableTask) 提交任务,并获得一个表示该任务的 Future 对象。
4. 检查任务是否完成: 使用 isDone() 方法检查任务是否完成。如果任务没有完成,程序会每 500 毫秒打印一次提示信息。
5. 获取任务结果: 一旦任务完成,使用 future.get() 方法获取任务结果,并打印出来。
6. 关闭线程池: 最后,使用 executorService.shutdown() 关闭线程池。
结果输出:
• 在任务执行过程中,控制台会打印多次 “Task is still not done…”,表示任务尚未完成。
• 一旦任务完成,程序将打印任务的结果,如 “Task completed! Result: Task’s execution”。
1.3 Future cancel() 和 isCancelled()示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureCancelExample {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建一个Callable任务
Callable<String> callableTask = () -> {
try {
Thread.sleep(5000); // 模拟一个耗时任务
return "Task's execution completed";
} catch (InterruptedException e) {
return "Task was interrupted";
}
};
// 提交任务并获得Future对象
Future<String> future = executorService.submit(callableTask);
// 等待一段时间然后取消任务
try {
Thread.sleep(2000); // 主线程等待2秒
boolean isCancelled = future.cancel(true); // 尝试取消任务
// 检查任务是否被取消
if (isCancelled) {
System.out.println("Task was cancelled successfully.");
} else {
System.out.println("Failed to cancel the task.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 使用isCancelled()检查任务是否已被取消
if (future.isCancelled()) {
System.out.println("The task was cancelled and did not complete.");
} else {
try {
// 获取任务的结果
String result = future.get(); // 如果任务未被取消,则此方法会阻塞直到任务完成
System.out.println("Task completed! Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 关闭线程池
executorService.shutdown();
}
}
代码解释:
1. 创建线程池: 使用 Executors.newSingleThreadExecutor() 创建一个单线程的线程池。
2. 创建任务: 使用 Callable 接口创建一个任务,它将在 5 秒后返回一个字符串。如果任务被中断,则返回 “Task was interrupted”。
3. 提交任务并获取 Future 对象: 使用 executorService.submit(callableTask) 提交任务,并获得一个表示该任务的 Future 对象。
4. 取消任务: 主线程等待 2 秒后,调用 future.cancel(true) 取消任务。true 参数表示如果任务正在运行,则尝试中断它。
5. 检查任务是否被取消: 使用 isCancelled() 方法检查任务是否已被取消。如果任务已被取消,程序会输出相应的消息。
6. 获取任务结果: 如果任务未被取消,使用 future.get() 方法获取任务结果。如果任务已被取消,任务不会返回结果。
7. 关闭线程池: 最后,使用 executorService.shutdown() 关闭线程池。
结果输出:
• 如果任务被成功取消,控制台会输出 “Task was cancelled successfully.” 和 “The task was cancelled and did not complete.”
• 如果任务未被取消(可能因为任务已经完成),程序会输出 “Failed to cancel the task.”,并打印任务的结果。
这个示例展示了如何使用 cancel() 方法来尝试取消正在执行的任务,以及如何使用 isCancelled() 方法来检查任务是否已经取消。
1.4 Future get(long timeout,TimeUnit unit)示例
Future 接口的 get(long timeout, TimeUnit unit) 方法用于在指定的超时时间内获取异步任务的结果。如果在超时时间内任务没有完成,则抛出 TimeoutException。以下是一个示例,展示了如何使用该方法:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureGetWithTimeoutExample {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建一个Callable任务
Callable<String> callableTask = () -> {
try {
Thread.sleep(3000); // 模拟一个耗时任务(3秒)
return "Task's execution completed";
} catch (InterruptedException e) {
return "Task was interrupted";
}
};
// 提交任务并获得Future对象
Future<String> future = executorService.submit(callableTask);
try {
// 尝试在2秒内获取任务的结果
String result = future.get(2, TimeUnit.SECONDS);
System.out.println("Task completed within timeout! Result: " + result);
} catch (TimeoutException e) {
System.out.println("Task did not complete within the timeout period.");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 关闭线程池
executorService.shutdown();
}
}
}
代码解释:
1. 创建线程池: 使用 Executors.newSingleThreadExecutor() 创建一个单线程的线程池。
2. 创建任务: 使用 Callable 接口创建一个任务,该任务将模拟一个耗时 3 秒的操作。如果任务被中断,则返回 “Task was interrupted”。
3. 提交任务并获取 Future 对象: 使用 executorService.submit(callableTask) 提交任务,并获得一个表示该任务的 Future 对象。
4. 使用 get(long timeout, TimeUnit unit) 获取任务结果:
• 尝试在 2 秒内获取任务的结果。如果任务在 2 秒内完成,程序将打印任务结果。
• 如果任务没有在 2 秒内完成,将抛出 TimeoutException,并打印 “Task did not complete within the timeout period.”
5. 关闭线程池: 最后,使用 executorService.shutdown() 关闭线程池。
结果输出:
• 由于任务需要 3 秒才能完成,而超时时间为 2 秒,因此控制台会输出 “Task did not complete within the timeout period.”
• 如果将任务的睡眠时间改为小于或等于 2 秒,则程序将成功获取任务结果,并打印 “Task completed within timeout! Result: Task’s execution completed”。
这个示例展示了如何使用 get(long timeout, TimeUnit unit) 方法来设定获取任务结果的超时时间,以及如何处理超时情况。
2.RejectedExecutionHandler 被拒绝的任务处理
ejectedExecutionHandler 是 Java 中 java.util.concurrent 包的一部分,用于处理当任务无法被执行时的情况。这个接口通常与 ThreadPoolExecutor 一起使用,当线程池无法接受新的任务时,线程池会调用 RejectedExecutionHandler 的方法来处理这些被拒绝的任务。
主要使用场景:
当一个 ThreadPoolExecutor 已经达到了其最大线程数并且其任务队列也已经满了的情况下,新的任务将无法被接受。这时,就会触发 RejectedExecutionHandler 来处理这些被拒绝的任务。你可以通过实现这个接口来定义自己的拒绝策略。
方法:
RejectedExecutionHandler 接口只有一个方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
• r:这是被拒绝的任务,通常是一个 Runnable 对象。
• executor:这个参数是触发拒绝的 ThreadPoolExecutor 对象。
内置的拒绝策略:
Java 提供了一些内置的 RejectedExecutionHandler 实现,常见的有以下几种:
1. AbortPolicy(默认策略):抛出 RejectedExecutionException,阻止系统继续运行。
2. CallerRunsPolicy:由提交任务的线程来执行该任务,这样可以减缓任务提交的速度。
3. DiscardPolicy:直接丢弃任务,不做任何处理。
4. DiscardOldestPolicy:丢弃队列中最老的未处理任务,然后重新尝试提交新任务。
自定义 RejectedExecutionHandler 示例:
你可以实现 RejectedExecutionHandler 接口来自定义拒绝策略。例如,以下代码展示了如何自定义一个拒绝策略,将被拒绝的任务记录日志:
import java.util.concurrent.*;
public class CustomRejectedExecutionHandlerExample {
public static void main(String[] args) {
// 创建一个自定义的RejectedExecutionHandler
RejectedExecutionHandler customHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task " + r.toString() + " rejected. Logging this information.");
// 这里可以实现其他的处理逻辑,比如记录日志或者发送通知
}
};
// 创建一个线程池,并设置自定义的RejectedExecutionHandler
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2),
customHandler);
// 提交任务
for (int i = 0; i < 10; i++) {
executor.submit(new Task());
}
// 关闭线程池
executor.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("Task completed: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public String toString() {
return "Task@" + hashCode();
}
}
}
代码解释:
1. 自定义 RejectedExecutionHandler: 实现了 RejectedExecutionHandler 接口,定义了一个简单的处理逻辑,当任务被拒绝时,会输出一条日志信息。
2. 创建 ThreadPoolExecutor: 使用自定义的 RejectedExecutionHandler 初始化 ThreadPoolExecutor,并设置核心线程数和最大线程数均为 2,任务队列的容量为 2。
3. 提交任务: 提交 10 个任务,由于线程池和队列容量有限,多余的任务将被拒绝,并触发自定义的拒绝处理逻辑。
输出:
由于线程池和任务队列总共只能处理 4 个任务,其余 6 个任务将被拒绝,输出类似于以下内容:
Task Task@XXXXXX rejected. Logging this information.
Task completed: pool-1-thread-1
Task completed: pool-1-thread-2
3. execute()和 submit()区别
execute() 和 submit() 是 Java 中用于将任务提交给线程池的两种方法,二者在功能和用法上有一些重要区别。下面详细解释它们的区别:
execute() 方法
execute() 是 Executor 接口中定义的方法,用于提交一个 Runnable 任务给线程池执行。
主要特点:
1. 无返回值: execute() 方法没有返回值。它只负责将任务提交给线程池执行,不提供任务执行完成后的任何反馈。
2. 适用于 Runnable: execute() 方法只能接受实现了 Runnable 接口的任务,因此不支持返回执行结果的任务。
3. 异常处理: 如果 Runnable 任务在执行过程中抛出异常且未被捕获,异常将直接传播,可能导致线程池中的线程终止。
示例:
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> {
System.out.println("Task executed using execute()");
});
executor.shutdown();
submit() 方法
submit() 是 ExecutorService 接口中定义的方法,用于提交一个 Runnable 或 Callable 任务,并返回一个 Future 对象。
主要特点:
1. 返回 Future 对象: submit() 方法返回一个 Future 对象,通过它可以获取任务的执行结果、检查任务是否完成、取消任务等。
2. 支持 Runnable 和 Callable: submit() 方法可以接受 Runnable 和 Callable 两种类型的任务:
• 对于 Runnable 任务,submit() 返回一个 Future,但是 Future.get() 方法返回的结果是 null,因为 Runnable 不会返回执行结果。
• 对于 Callable 任务,submit() 返回一个 Future,可以通过 Future.get() 获取 Callable 的返回值。
3. 异常处理: 通过 submit() 提交的任务,如果在执行过程中抛出异常,异常不会直接传播到调用线程。相反,异常会被捕获并存储在 Future 对象中,可以通过调用 Future.get() 来获取该异常。
ExecutorService executor = Executors.newFixedThreadPool(2);
// 使用submit提交Runnable任务
Future<?> future1 = executor.submit(() -> {
System.out.println("Task executed using submit() with Runnable");
});
// 使用submit提交Callable任务
Future<String> future2 = executor.submit(() -> {
return "Task executed using submit() with Callable";
});
// 获取Callable任务的结果
try {
String result = future2.get();
System.out.println("Callable task result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
总结区别:
• 返回值: execute() 不返回结果,submit() 返回一个 Future 对象,可以通过 Future 获取任务的执行结果。
• 任务类型: execute() 只接受 Runnable 任务,submit() 可以接受 Runnable 和 Callable 任务。
• 异常处理: execute() 提交的任务如果抛出异常,异常会直接传播。方法execute()在默认的情况下异常直接抛出,不能捕获,但可以通过自定义Thread-Factory的方式进行捕获,而submit()方法在默认的情况下,可以catch Execution-Exception捕获异常并可以通过 Future.get() 进行处理。
4.验证Future的缺点
接口Future的实现类是FutureTask.java,而且在使用线程池时,默认的情况下也是使用FutureTask.java类作为接口Future的实现类,也就是说,如果在使用Future与Callable的情况下,使用Future接口也就是在使用FutureTask.java类。
Callable接口与Runnable接口在对比时主要的优点是,Callable接口可以通过Future取得返回值。但需要注意的是,Future接口调用get()方法取得处理的结果值时是阻塞性的,也就是如果调用Future对象的get()方法时,任务尚未执行完成,则调用get()方法时一直阻塞到此任务完成时为止。如果是这样的效果,则前面先执行的任务一旦耗时很多,则后面的任务调用get()方法就呈阻塞状态,也就是排队进行等待,大大影响运行效率。也就是主线程并不能保证首先获得的是最先完成任务的返回值,这就是Future的缺点,影响效率。
创建验证用的项目,名称为futureLast,类MyCallable.java代码如下:
package mycallable;
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
private String username;
private long sleepValue;
public MyCallable(String username, long sleepValue) {
super();
this.username = username;
this.sleepValue = sleepValue;
}
@Override
public String call() throws Exception {
System.out.println(username);
Thread.sleep(sleepValue);
return "return " + username;
}
}
类Test.java代码如下:
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import mycallable.MyCallable;
public class Test {
public static void main(String[] args) {
try {
MyCallable callable1 = new MyCallable("username1", 5000);
MyCallable callable2 = new MyCallable("username2", 4000);
MyCallable callable3 = new MyCallable("username3", 3000);
MyCallable callable4 = new MyCallable("username4", 2000);
MyCallable callable5 = new MyCallable("username5", 1000);
List<Callable> callableList = new ArrayList<Callable>();
callableList.add(callable1);
callableList.add(callable2);
callableList.add(callable3);
callableList.add(callable4);
callableList.add(callable5);
List<Future> futureList = new ArrayList<Future>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 5,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
for (int i = 0; i < 5; i++) {
futureList.add(executor.submit(callableList.get(i)));
}
System.out
.println("run first time= " + System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
System.out.println(futureList.get(i).get() + " "
+ System.currentTimeMillis());
}
// 按顺序打印的效果
// 说明一个Future对应指定的一个Callable
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
程序运行结果如图:
方法get()呈阻塞状态,这就是Future接口的缺点,根据这个特性,JDK1.5提供了CompletionService接口可以解决这个问题.