必备知识:
三种创建线程的方式
java线程池
CompletionService
是Java并发库中的一个接口,用于简化处理一组异步任务的执行和结果收集。它结合了Executor和BlockingQueue的功能,帮助管理任务的提交和完成。CompletionService的主要实现类是ExecutorCompletionService
。
ExecutorCompletionService
例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
// 提交一组任务
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int index = i;
tasks.add(() -> {
TimeUnit.SECONDS.sleep(index);
return index;
});
}
for (Callable<Integer> task : tasks) {
completionService.submit(task);
}
// 获取任务结果
for (int i = 0; i < tasks.size(); i++) {
Future<Integer> future = completionService.take();
Integer result = future.get();
System.out.println("Task completed with result: " + result);
}
// 关闭线程池
executorService.shutdown();
}
}
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
this.executor = executor;
this.completionQueue = new LinkedBlockingQueue<>();
}
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
this.executor = executor;
this.completionQueue = completionQueue;
}
@Override
public Future<V> submit(Callable<V> task) {
RunnableFuture<V> f = new FutureTask<>(task);
executor.execute(new QueueingFuture(f));
return f;
}
@Override
public Future<V> submit(Runnable task, V result) {
RunnableFuture<V> f = new FutureTask<>(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
@Override
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
@Override
public Future<V> poll() {
return completionQueue.poll();
}
@Override
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
private class QueueingFuture extends FutureTask<V> {
QueueingFuture(RunnableFuture<V> task) {
super(task);
}
@Override
protected void done() {
completionQueue.add(this);
}
}
}