类图:定义了一些重要的接口和实现类
线程池的几种状态:
ThreadPoolExecutor构造方法
1.救急线程
线程池中会有核心线程和救急线程;救急线程数=最大线程数-核心线程数。而救急线程会在阻塞队列已经占满的情况下,执行下一个即将要被拒绝策略执行任务。且救急线程在执行完任务后的KeepAliveTime时间内,如果没有执行新的任务,那么就会从线程池remove这个线程。不同于核心线程是一直存在于线程池中的。
救急线程是懒惰创建的,只有当有界阻塞队列满的时候才会创建救急线程执行任务。如果阻塞队列是无界的,那么永远不会创建救急线程。
2.核心线程
会优先使用核心线程来执行任务,且核心线程是懒惰创建。当核心线程执行完任务后,并不会主动结束自己,而是还在运行中。需要特殊处理使之结束。
3.拒绝策略
当救急线程被占用完后再来新的任务会由拒绝策略完成。
4.JDK提供线程池执行过程
4.JDK提供的拒绝策略
NewFixedThreadPool 固定大小线程池
1.介绍
2.代码示例
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j(topic = "TC41")
public class TC41 {
public static void main(String[] args) throws InterruptedException{
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger threadPoolNumber = new AtomicInteger(1);
//使用ThreadFactory自定义线程名
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Pool_t"+threadPoolNumber.getAndIncrement());
}
});
pool.execute(()->{
log.debug("1");
});
pool.execute(()->{
log.debug("2");
});
pool.execute(()->{
log.debug("3");
});
//result: 且程序不会结束,因为核心线程只是执行完任务,但并没有结束
//11:10:02.144 [Pool_t2] DEBUG TC41 - 2
//11:10:02.144 [Pool_t1] DEBUG TC41 - 1
//11:10:02.147 [Pool_t2] DEBUG TC41 - 3
}
}
NewCachedThreadPool 带缓冲的线程池
1.介绍
若每个任务执行时间都很长的话会创建太多线程,消耗CPU影响性能。
2.SynchronousQueue
SynchronousQueue指t1线程在Queue里放任务时放不进去,只能阻塞在外面,当t2线程来取的时候,就可以把任务取走了。意味着:该队列就是为了阻塞一个任务且不用拒绝策略,只等待线程来执行。当SynchronousQueue配合NewCachedThreadPool执行时,就是每当阻塞一个任务就创建一个救急线程。
SynchronousQueue使用代码示例
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.SynchronousQueue;
@Slf4j(topic = "TC42")
public class TC42 {
public static void main(String[] args) {
SynchronousQueue queue = new SynchronousQueue();
new Thread(()->{
try {
log.debug("put 1....");
//当执行put()后被阻塞住,当take()执行完后,put()才执行完毕
queue.put(1);
log.debug("putted 1....");
log.debug("put 2....");
queue.put(2);
log.debug("putted 2....");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
log.debug("get 1...");
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
log.debug("get 2...");
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();
}
}
NewSingleThreadExecutor 单线程执行器
线程异常后,线程池会重新创建新的线程
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "TC43")
public class TC43 {
public static void main(String[] args) throws InterruptedException {
test1();
//result: 线程1抛出异常停止后,线程池又创建了一个线程2去执行后面的codes
//14:15:44.489 [pool-1-thread-1] DEBUG TC43 - 1
//Exception in thread "pool-1-thread-1" 14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 2
//14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 3
}
public static void test1() throws InterruptedException{
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(()->{
log.debug("1");
int i=1/0;
});
service.execute(()->{
log.debug("2");
});
service.execute(()->{
log.debug("3");
});
}
}
提交任务
Future<T> Submit
有一个Future类型的返回值
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "TC44")
public class TC44 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future = pool.submit(()->{
log.debug("running");
Thread.sleep(1000);
return "ok";
});
log.debug("{}",future.get());
}
}
List<Future<T>> invokeAll()
执行集合中所有线程,并返回所有返回值到一个集合里。
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j(topic = "TC45")
public class TC45 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
()->{
log.debug("begin...");
Thread.sleep(1000);
return "1";
},
()->{
log.debug("begin...");
Thread.sleep(500);
return "2";
},
()->{
log.debug("begin...");
Thread.sleep(2000);
return "3";
}
));
futures.forEach(f->{
try {
log.debug("{}",f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
Object invokeAny()
集合中哪个线程结束最早返回哪个线程,其余线程停止执行任务。
0.5S后打印结果,因为第二个callable线程结束最早只等待了0.5S.
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "TC46")
public class TC46 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
Object o = pool.invokeAny(Arrays.asList(
()->{
log.debug("begin 1....");
Thread.sleep(1000);
log.debug("end 1....");
return "1";
},
()->{
log.debug("begin 2....");
Thread.sleep(500);
log.debug("end 2....");
return "2";
},
()->{
log.debug("begin 3....");
Thread.sleep(800);
log.debug("end 3....");
return "3";
}
));
log.debug("{}",o.toString());
}
}
关闭线程池
shutdown()
不会等待正在运行的线程,等它们运行结束,就会自己停止
shutdownNow()
其他终结方法