自定义线程池
1. 简介
1.1 引入原因
1 . 一个任务过来,一个线程去做。如果每次过来都创建新线程,性能低且比较耗费内存
2 . 线程数多于cpu核心,线程切换,要保存原来线程的状态,运行现在的线程,势必会更加耗费资源
线程数少于cpu核心,不能很好的利用多线程的性能
3 . 充分利用已有线程,去处理原来的任务
1.2. 线程池组件
1 . 消费者( 线程池) : 保存一定数量线程来处理任务
2 . 生产者: 客户端源源不断产生的新任务
3 . 阻塞队列( blocking queue) : 平衡消费者和生产者之间,用来保存任务 的一个等待队列
- 生产任务速度较快,多余的任务要等
- 生产任务速度慢,那么线程池中存活的线程等
2. 自定义线程池
2.1 不带超时
阻塞队列
package com. erick. multithread. d6 ;
import java. util. ArrayDeque ;
import java. util. Deque ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class BlockingQueue < T > {
private int capacity;
private Deque < T > blockingQueue = new ArrayDeque < > ( ) ;
private ReentrantLock lock = new ReentrantLock ( true ) ;
private Condition producerRoom = lock. newCondition ( ) ;
private Condition consumerRoom = lock. newCondition ( ) ;
public BlockingQueue ( int capacity) {
this . capacity = capacity;
}
public T getTask ( ) {
try {
lock. lock ( ) ;
while ( blockingQueue. isEmpty ( ) ) {
System . out. println ( "阻塞队列为空,消费者等待" ) ;
try {
consumerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public void addTask ( T t) {
try {
lock. lock ( ) ;
while ( blockingQueue. size ( ) == capacity) {
System . out. println ( "阻塞队列已满,生产者等待" ) ;
try {
producerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public int getSize ( ) {
try {
lock. lock ( ) ;
return blockingQueue. size ( ) ;
} finally {
lock. unlock ( ) ;
}
}
}
线程池
package com. erick. multithread. d6 ;
import java. util. HashSet ;
import java. util. Set ;
public class ErickThreadPool < T > {
private BlockingQueue < T > blockingQueue;
private final Set < Worker > pool = new HashSet < > ( ) ;
private int coreThreadSize;
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize) {
blockingQueue = new BlockingQueue < > ( blockQueueCapacity) ;
this . coreThreadSize = coreThreadSize;
}
public synchronized void executeTask ( Runnable task) {
if ( pool. size ( ) < coreThreadSize) {
Worker worker = new Worker ( task) ;
pool. add ( worker) ;
System . out. println ( "创建新的线程来执行任务" ) ;
worker. start ( ) ;
} else {
System . out. println ( "线程池已满,生产者暂时等待" ) ;
blockingQueue. addTask ( ( T ) task) ;
}
}
class Worker extends Thread {
private Runnable task;
public Worker ( Runnable task) {
this . task = task;
}
@Override
public void run ( ) {
while ( task != null || ( task = ( Runnable ) blockingQueue. getTask ( ) ) != null ) {
try {
task. run ( ) ;
} catch ( Exception e) {
System . out. println ( "线程执行任务出错" ) ;
} finally {
task = null ;
}
}
synchronized ( pool) {
System . out. println ( "线程销毁" ) ;
pool. remove ( this ) ;
}
}
}
}
测试代码
package com. erick. multithread. d6 ;
import java. util. Date ;
public class Test {
public static void main ( String [ ] args) {
ErickThreadPool pool = new ErickThreadPool ( 10 , 3 ) ;
for ( int i = 0 ; i < 5 ; i++ ) {
pool. executeTask ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + ":" + new Date ( ) ) ;
}
} ) ;
}
}
}
2.2 超时等待
上面线程池中的worker线程获取blockingqueue的时候,即使阻塞队列中没有任务,也会一直死等,并不会结束
阻塞队列
package com. erick. multithread. d6 ;
import java. util. ArrayDeque ;
import java. util. Deque ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class BlockingQueue < T > {
private int capacity;
private Deque < T > blockingQueue = new ArrayDeque < > ( ) ;
private ReentrantLock lock = new ReentrantLock ( true ) ;
private Condition producerRoom = lock. newCondition ( ) ;
private Condition consumerRoom = lock. newCondition ( ) ;
public BlockingQueue ( int capacity) {
this . capacity = capacity;
}
public T getTask ( ) {
try {
lock. lock ( ) ;
while ( blockingQueue. isEmpty ( ) ) {
System . out. println ( "阻塞队列为空,消费者等待" ) ;
try {
consumerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public T getTask ( long timeout, TimeUnit timeUnit) {
try {
lock. lock ( ) ;
long nanos = timeUnit. toNanos ( timeout) ;
while ( blockingQueue. isEmpty ( ) ) {
if ( nanos < 0 ) {
return null ;
}
try {
nanos = consumerRoom. awaitNanos ( nanos) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public void addTask ( T t) {
try {
lock. lock ( ) ;
while ( blockingQueue. size ( ) == capacity) {
System . out. println ( "阻塞队列已满,生产者等待" ) ;
try {
producerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public int getSize ( ) {
try {
lock. lock ( ) ;
return blockingQueue. size ( ) ;
} finally {
lock. unlock ( ) ;
}
}
}
线程池
package com. erick. multithread. d6 ;
import java. util. HashSet ;
import java. util. Set ;
import java. util. concurrent. TimeUnit ;
public class ErickThreadPool < T > {
private BlockingQueue < T > blockingQueue;
private final Set < Worker > pool = new HashSet < > ( ) ;
private int coreThreadSize;
private long timeout;
private TimeUnit timeUnit;
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize) {
blockingQueue = new BlockingQueue < > ( blockQueueCapacity) ;
this . coreThreadSize = coreThreadSize;
}
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit) {
this ( blockQueueCapacity, coreThreadSize) ;
this . timeUnit = timeUnit;
this . timeout = timeout;
}
public synchronized void executeTask ( Runnable task) {
if ( pool. size ( ) < coreThreadSize) {
Worker worker = new Worker ( task) ;
pool. add ( worker) ;
System . out. println ( "创建新的线程来执行任务" ) ;
worker. start ( ) ;
} else {
System . out. println ( "线程池已满,生产者暂时等待" ) ;
blockingQueue. addTask ( ( T ) task) ;
}
}
class Worker extends Thread {
private Runnable task;
public Worker ( Runnable task) {
this . task = task;
}
@Override
public void run ( ) {
while ( task != null || ( task = ( Runnable ) blockingQueue. getTask ( timeout, timeUnit) ) != null ) {
try {
task. run ( ) ;
} catch ( Exception e) {
System . out. println ( "线程执行任务出错" ) ;
} finally {
task = null ;
}
}
synchronized ( pool) {
System . out. println ( "线程销毁" ) ;
pool. remove ( this ) ;
}
}
}
}
测试代码
package com. erick. multithread. d6 ;
import java. util. Date ;
import java. util. concurrent. TimeUnit ;
public class Test {
public static void main ( String [ ] args) {
ErickThreadPool pool = new ErickThreadPool ( 10 , 3 , 5 , TimeUnit . SECONDS ) ;
for ( int i = 0 ; i < 5 ; i++ ) {
pool. executeTask ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + ":" + new Date ( ) ) ;
}
} ) ;
}
}
}
2.3 生产者-超时设置
当阻塞队列中已满,并且核心线程都在工作的时候,生产者线程提供的任务就会进行等待 让任务生产者自己决定该如何执行
- 死等
- 带超时等待
- 让调用者放弃执行任务
- 让调用者抛出异常
- 让调用者自己执行任务
阻塞队列
package com. erick. multithread. d6 ;
import java. util. ArrayDeque ;
import java. util. Deque ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class BlockingQueue < T > {
private int capacity;
private Deque < T > blockingQueue = new ArrayDeque < > ( ) ;
private ReentrantLock lock = new ReentrantLock ( true ) ;
private Condition producerRoom = lock. newCondition ( ) ;
private Condition consumerRoom = lock. newCondition ( ) ;
public BlockingQueue ( int capacity) {
this . capacity = capacity;
}
public T getTask ( ) {
try {
lock. lock ( ) ;
while ( blockingQueue. isEmpty ( ) ) {
System . out. println ( "阻塞队列为空,消费者等待" ) ;
try {
consumerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public T getTask ( long timeout, TimeUnit timeUnit) {
try {
lock. lock ( ) ;
long nanos = timeUnit. toNanos ( timeout) ;
while ( blockingQueue. isEmpty ( ) ) {
if ( nanos < 0 ) {
return null ;
}
try {
nanos = consumerRoom. awaitNanos ( nanos) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public void addTask ( T t) {
try {
lock. lock ( ) ;
while ( blockingQueue. size ( ) == capacity) {
System . out. println ( "阻塞队列已满,生产者等待" ) ;
try {
producerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public boolean addTask ( T t, long timeout, TimeUnit timeUnit) {
try {
lock. lock ( ) ;
long nanos = timeUnit. toNanos ( timeout) ;
while ( blockingQueue. size ( ) == capacity) {
if ( nanos < 0 ) {
return false ;
}
try {
nanos = producerRoom. awaitNanos ( nanos) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
return true ;
} finally {
lock. unlock ( ) ;
}
}
public void tryPut ( RejectPolicy < T > rejectPolicy, T task) {
try {
lock. lock ( ) ;
if ( blockingQueue. size ( ) == capacity) {
rejectPolicy. reject ( this , ( Runnable ) task) ;
return ;
}
System . out. println ( "加入阻塞队列" ) ;
blockingQueue. addFirst ( task) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public int getSize ( ) {
try {
lock. lock ( ) ;
return blockingQueue. size ( ) ;
} finally {
lock. unlock ( ) ;
}
}
}
interface RejectPolicy < T > {
void reject ( BlockingQueue < T > blockingQueue, Runnable task) ;
}
线程池
package com. erick. multithread. d6 ;
import java. util. HashSet ;
import java. util. Set ;
import java. util. concurrent. TimeUnit ;
public class ErickThreadPool < T > {
private BlockingQueue < T > blockingQueue;
private final Set < Worker > pool = new HashSet < > ( ) ;
private int coreThreadSize;
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy < T > rejectPolicy;
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize) {
blockingQueue = new BlockingQueue < > ( blockQueueCapacity) ;
this . coreThreadSize = coreThreadSize;
}
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit, RejectPolicy < T > rejectPolicy) {
this ( blockQueueCapacity, coreThreadSize) ;
this . timeUnit = timeUnit;
this . timeout = timeout;
this . rejectPolicy = rejectPolicy;
}
public synchronized void executeTask ( Runnable task) {
if ( pool. size ( ) < coreThreadSize) {
Worker worker = new Worker ( task) ;
pool. add ( worker) ;
System . out. println ( "创建新的线程来执行任务" ) ;
worker. start ( ) ;
} else {
System . out. println ( "线程池已满,生产者???" ) ;
blockingQueue. tryPut ( rejectPolicy, ( T ) task) ;
}
}
class Worker extends Thread {
private Runnable task;
public Worker ( Runnable task) {
this . task = task;
}
@Override
public void run ( ) {
while ( task != null || ( task = ( Runnable ) blockingQueue. getTask ( timeout, timeUnit) ) != null ) {
try {
task. run ( ) ;
} catch ( Exception e) {
System . out. println ( "线程执行任务出错" ) ;
} finally {
task = null ;
}
}
synchronized ( pool) {
System . out. println ( "线程销毁" ) ;
pool. remove ( this ) ;
}
}
}
}
测试代码
package com. erick. multithread. d6 ;
import java. util. Date ;
import java. util. concurrent. TimeUnit ;
public class Test {
public static void main ( String [ ] args) {
ErickThreadPool pool = new ErickThreadPool ( 1 , 2 , 5 , TimeUnit . SECONDS , new ProducerException ( ) ) ;
for ( int i = 0 ; i < 10 ; i++ ) {
pool. executeTask ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + ":" + new Date ( ) ) ;
try {
TimeUnit . SECONDS . sleep ( 5 ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
}
}
}
class StillWait implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
blockingQueue. addTask ( task) ;
}
}
class WaitWithTimeOut implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
blockingQueue. addTask ( task, 1 , TimeUnit . SECONDS ) ;
}
}
class ProducerGiveUp implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
System . out. println ( "调用者抛弃任务" ) ;
}
}
class ProducerExecute implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
System . out. println ( "调用者自己执行任务" ) ;
new Thread ( task) . start ( ) ;
}
}
class ProducerException implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
throw new RuntimeException ( "核心线程已在工作,阻塞队列已满" ) ;
}
}
JDK线程池
1. 类图
2. 线程状态
ThreadPoolExecutor 使用int的高3位来表示线程池状态,低29位表示线程数量
3. ThreadPoolExecutor
3.1 构造方法
int corePoolSize:
int maximumPoolSize:
long keepAliveTime:
TimeUnit unit:
BlockingQueue < Runnable > workQueue:
ThreadFactory threadFactory:
RejectedExecutionHandler handler:
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
3.2 核心线程和救急线程
1 . 核心线程: 执行完任务后,会继续保留在线程池中
2 . 救急线程:如果阻塞队列已满,并且没有空余的核心线程。那么会创建救急线程来执行任务
2.1 任务执行完毕后,这个线程就会被销毁( 临时工)
2.2 必须是有界阻塞,如果是无界队列,则不需要创建救急线程
3 . 拒绝策略: 有界队列,核心线程满负荷,阻塞队列已满,无空余救急线程,才会执行拒绝
3.3 JDK拒绝策略
如果线程达到最大线程数,救急线程也满负荷,且有界队列也满了,JDK 提供了4种拒绝策略
AbortPolicy: 调用者抛出RejectedExecutionException, 默认策略
CallerRunsPolicy: 调用者运行任务
DiscardPolicy: 放弃本次任务
DiscardOldestPolicy: 放弃阻塞队列中最早的任务,本任务取而代之
- Dubbo: 在抛出异常之前,记录日志,并dump线程栈信息,方便定位问题
- Netty: 创建一个新的线程来执行任务
- ActiveMQ: 带超时等待( 60s) , 尝试放入阻塞队列
4. Executors类工厂方法
默认的构造方法来创建线程池,参数过多,JDK提供了工厂方法,来创建线程池
4.1 固定大小
核心线程数 = 最大线程数,救急线程数为0 阻塞队列:无界,可以存放任意数量的任务
任务量已知,但是线程执行时间较长
执行任务后,线程并不会结束
public static ExecutorService newFixedThreadPool ( int nThreads) {
public static ExecutorService newFixedThreadPool ( int nThreads) {
return new ThreadPoolExecutor ( nThreads, nThreads,
0L , TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ) ;
}
package com. erick. multithread. d7 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ThreadFactory ;
import java. util. concurrent. atomic. AtomicInteger ;
public class Demo01 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 , new ThreadFactory ( ) {
private AtomicInteger num = new AtomicInteger ( 0 ) ;
@Override
public Thread newThread ( Runnable r) {
return new Thread ( r, "erick-pool" + num. getAndIncrement ( ) ) ;
}
} ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ;
}
}
4.2 带缓冲
核心线程数为0, 最大线程数为Integer的无限大 全部是救急线程,等待时间是60s,60s后就会消亡 SynchronousQueue: 没有容量,没有线程来取的时候是放不进去的 整个线程池数会随着任务数目增长,1分钟后没有其他活动会消亡
1 . 时间较短的线程
2 . 数量大,任务执行时间长,会造成 OutOfMmeory问题
public static ExecutorService newCachedThreadPool ( ) {
return new ThreadPoolExecutor ( 0 , Integer . MAX_VALUE ,
60L , TimeUnit . SECONDS ,
new SynchronousQueue < Runnable > ( ) ) ;
}
4.3. 单线程
线程池大小始终为1个,不能改变线程数 相比自定义一个线程来执行,线程池可以保证前面任务的失败,不会影响到后续任务
自定义线程: 执行多个任务时,一个出错,后续都能不能执行了
单线程池: 一个任务失败后,会结束出错线程。重新new一个线程来执行下面的任务
单线程池: 保证所有任务都是串行
newFixedThreadPool: 初始化后,还可以修改线程大小
newSingleThreadExecutor: 不可以修改
public static ExecutorService newSingleThreadExecutor ( ) {
return new FinalizableDelegatedExecutorService
( new ThreadPoolExecutor ( 1 , 1 ,
0L , TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ) ) ;
}
package com. nike. erick. d07 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class Demo01 {
public static void main ( String [ ] args) {
method03 ( ) ;
}
private static void method01 ( ) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
}
private static void method02 ( ) {
ExecutorService pool = Executors . newCachedThreadPool ( ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
}
private static void method03 ( ) {
ExecutorService pool = Executors . newSingleThreadExecutor ( ) ;
pool. execute ( ( ) -> {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
} ) ;
pool. execute ( ( ) -> {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
} ) ;
pool. execute ( ( ) -> {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
} ) ;
}
}
5. 提交任务
5.1. execute
void execute( Runnable command )
5.2. submit
Future < ? > submit ( Runnable task) ;
< T > Future < T > submit ( Runnable task, T result) ;
< T > Future < T > submit ( Callable < T > task) ;
3. invokeAll
< T > List < Future < T > > invokeAll ( Collection < ? extends Callable < T > > tasks)
throws InterruptedException ;
4. invokeAny
package com. nike. erick. d07 ;
import java. util. ArrayList ;
import java. util. Collection ;
import java. util. List ;
import java. util. concurrent. Callable ;
import java. util. concurrent. ExecutionException ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. Future ;
import java. util. concurrent. TimeUnit ;
public class Demo02 {
public static void main ( String [ ] args) throws InterruptedException , ExecutionException {
ExecutorService pool = Executors . newFixedThreadPool ( 10 ) ;
method05 ( pool) ;
}
public static void method01 ( ExecutorService pool) {
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ;
}
public static void method02 ( ExecutorService pool) throws InterruptedException {
Future < ? > result = pool. submit ( new Thread ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( result. isDone ( ) ) ;
System . out. println ( result. isCancelled ( ) ) ;
}
public static void method03 ( ExecutorService pool) throws InterruptedException , ExecutionException {
Future < String > submit = pool. submit ( ( ) -> "success" ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( submit. isDone ( ) ) ;
System . out. println ( submit. isCancelled ( ) ) ;
System . out. println ( submit. get ( ) ) ;
}
public static void method04 ( ExecutorService pool) throws InterruptedException {
Collection tasks = new ArrayList ( ) ;
for ( int i = 0 ; i < 10 ; i++ ) {
int round = i;
tasks. add ( ( Callable ) ( ) -> {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
return "success:" + round;
} ) ;
}
List results = pool. invokeAll ( tasks) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( results) ;
}
public static void method05 ( ExecutorService pool) throws InterruptedException , ExecutionException {
ExecutorService service = Executors . newFixedThreadPool ( 1 ) ;
Collection < Callable < String > > tasks = new ArrayList < > ( ) ;
tasks. add ( ( ) -> {
System . out. println ( "first task" ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
return "success" ;
} ) ;
tasks. add ( ( ) -> {
System . out. println ( "second task" ) ;
TimeUnit . SECONDS . sleep ( 2 ) ;
return "success" ;
} ) ;
tasks. add ( ( ) -> {
System . out. println ( "third task" ) ;
TimeUnit . SECONDS . sleep ( 3 ) ;
return "success" ;
} ) ;
String result = pool. invokeAny ( tasks) ;
System . out. println ( result) ;
}
}
6. 关闭线程池
6.1 shutdown
将线程池的状态改变为SHUTDOWN状态 不会接受新任务,已经提交的任务不会停止 不会阻塞调用线程的执行
void shutdown ( ) ;
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. TimeUnit ;
public class Demo04 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " third running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. shutdown ( ) ;
System . out. println ( "main thread ending" ) ;
}
}
6.2. shutdownNow
不会接受新任务 没执行的任务会打断 将等待队列中的任务返回
List < Runnable > shutdownNow ( ) ;
package com. dreamer. multithread. day09 ;
import java. util. List ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. TimeUnit ;
public class Demo04 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " third running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
List < Runnable > leftOver = pool. shutdownNow ( ) ;
System . out. println ( leftOver. size ( ) ) ;
System . out. println ( "main thread ending" ) ;
}
}
线程池拓展
1. 异步模式之工作线程
1.1 Worker Thread
让有限的工作线程来轮流异步处理无限多的任务 分类:不同的任务类型应该使用不同的线程池
1.2 饥饿现象
- 两个工人是同一个线程池中的两个线程, 为客人点餐和后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点餐,等菜做好,上菜,在此期间,处理点餐的工人必须等待
- A工人处理了点餐任务,B工人把菜做好,然后上菜,配合正常
- 同时来了两个客人,A和B工人都去处理点餐了,没人做饭了,出现线程数不足导致的资源饥饿
正常
package com. erick. multithread. d7 ;
import java. util. Arrays ;
import java. util. List ;
import java. util. Random ;
import java. util. concurrent. * ;
public class Demo02 {
private static List < String > MENU = Arrays . asList ( "宫保鸡丁" , "地三鲜" , "辣子鸡丁" , "红烧肉" ) ;
private static Random random = new Random ( ) ;
private static String cooking ( ) {
return MENU . get ( random. nextInt ( MENU . size ( ) ) ) ;
}
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 ) ;
pool. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( "开始处理点餐" ) ;
Future < String > cook = pool. submit ( new Callable < String > ( ) {
@Override
public String call ( ) throws Exception {
System . out. println ( "开始做菜" ) ;
return cooking ( ) ;
}
} ) ;
try {
String result = cook. get ( ) ;
System . out. println ( "上菜:" + result) ;
} catch ( InterruptedException | ExecutionException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
}
}
线程池饥饿
package com. erick. multithread. d7 ;
import java. util. Arrays ;
import java. util. List ;
import java. util. Random ;
import java. util. concurrent. * ;
public class Demo02 {
private static List < String > MENU = Arrays . asList ( "宫保鸡丁" , "地三鲜" , "辣子鸡丁" , "红烧肉" ) ;
private static Random random = new Random ( ) ;
private static String cooking ( ) {
return MENU . get ( random. nextInt ( MENU . size ( ) ) ) ;
}
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 ) ;
pool. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( "开始处理点餐" ) ;
Future < String > cook = pool. submit ( new Callable < String > ( ) {
@Override
public String call ( ) throws Exception {
System . out. println ( "开始做菜" ) ;
return cooking ( ) ;
}
} ) ;
try {
String result = cook. get ( ) ;
System . out. println ( "上菜:" + result) ;
} catch ( InterruptedException | ExecutionException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
pool. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( "开始处理点餐" ) ;
Future < String > cook = pool. submit ( new Callable < String > ( ) {
@Override
public String call ( ) throws Exception {
System . out. println ( "开始做菜" ) ;
return cooking ( ) ;
}
} ) ;
try {
String result = cook. get ( ) ;
System . out. println ( "上菜:" + result) ;
} catch ( InterruptedException | ExecutionException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
}
}
解决方法
最简单的方法: 增加线程池的线程数量,但是不能从根本解决问题 解决方法:不同的任务类型,使用不同的线程池
2. 线程数量
过小,导致cpu资源不能充分利用,浪费性能 过大,线程上下文切换浪费性能,每个线程也要占用内存导致占用内存过多
2.1 CPU密集型
如果线程的任务主要是和cpu资源打交道,比如大数据运算,称为CPU密集型 线程数量: 核心数+1 +1: 保证某线程由于某些原因(操作系统方面)导致暂停时,额外线程可以启动,不浪费CPU资源
2.2. IO密集型
IO操作,RPC调用,数据库访问时,CPU是空闲的,称为IO密集型 更加常见: IO操作,远程RPC调用,数据库操作 线程数 = 核数 * 期望cpu利用率 * (CPU计算时间 + CPU等待时间) / CPU 计算时间
3. 调度功能
3.1 延时执行
package com. dreamer. multithread. day09 ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ScheduledExecutorService ;
import java. util. concurrent. TimeUnit ;
public class Demo05 {
public static void main ( String [ ] args) {
ScheduledExecutorService pool = Executors . newScheduledThreadPool ( 2 ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
}
} , 1 , TimeUnit . SECONDS ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
}
} , 5 , TimeUnit . SECONDS ) ;
}
}
package com. dreamer. multithread. day09 ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ScheduledExecutorService ;
import java. util. concurrent. TimeUnit ;
public class Demo06 {
public static void main ( String [ ] args) {
ScheduledExecutorService pool = Executors . newScheduledThreadPool ( 1 ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
}
} , 1 , TimeUnit . SECONDS ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
}
} , 2 , TimeUnit . SECONDS ) ;
}
}
3.2 定时执行
- 如果任务的执行时间大于时间间隔,就会紧接着立刻执行
- 上一个任务执行完毕后,再延迟一定的时间才会执行
package com. dreamer. multithread. day09 ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ScheduledExecutorService ;
import java. util. concurrent. TimeUnit ;
public class Demo07 {
public static void main ( String [ ] args) {
ScheduledExecutorService pool = Executors . newScheduledThreadPool ( 2 ) ;
pool. scheduleWithFixedDelay ( new Runnable ( ) {
@Override
public void run ( ) {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "task is running" ) ;
}
} , 3 , 2 , TimeUnit . SECONDS ) ;
}
}
4. 异常处理
4.1 不处理异常
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class Demo08 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( new Runnable ( ) {
@Override
public void run ( ) {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " task running" ) ;
}
} ) ;
}
}
4.2 任务执行者处理
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class Demo08 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( new Runnable ( ) {
@Override
public void run ( ) {
try {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " task running" ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
return ;
}
}
} ) ;
}
}
4.3 线程池处理
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutionException ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. Future ;
import java. util. concurrent. TimeUnit ;
public class Demo08 {
public static void main ( String [ ] args) throws InterruptedException , ExecutionException {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
Future < ? > result = pool. submit ( new Runnable ( ) {
@Override
public void run ( ) {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " task running" ) ;
}
} ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( result. get ( ) ) ;
}
}