文章目录
- 了解多线程
- CPU
- 进程(Process)
- 线程
- 多线程开发
- 多线程优点
- 实现方式
- 继承Thread类
- 实现Runnable接口
- 实现Callable接口
- 线程状态转换
- 线程状态
- 线程调度
- 调整线程优先级
- 线程睡眠
- 线程等待
- 线程让步
- 线程加入
- 线程唤醒
- 线程同步
- 线程同步方式
- 多线程间通信
- 线程池
- 了解线程池
- 定义
- 常见接口和子类
- 优缺点
- ThreadPoolExecutor类
- 线程池状态
- 线程池工作原理
- 线程池参数
- 工作流程
- 参数设置
- 工作队列大小设置
- 线程池种类(不同的线程池创建方法->不同创建参数)
了解多线程
多线程其实就是,老板请多个员工同时做事,一个进程就相当于一个项目组去做一个项目,多个线程组成了一个进程,就相当于这个项目组有多个员工同时做事.
CPU
CPU的中文名称是中央处理器,是进行逻辑运算用的,主要由运算器、控制器、寄存器三部分组成,从字面意思看运算就是起着运算的作用,控制器就是负责发出CPU每条指令所需要的信息,寄存器就是保存运算或指令的一些临时文件,这样可以保证更高的速度,也就是我们的线程运行在CPU之上。
- 单核 :单核的CPU是一种假的多线程,因为在一个时间单元内,也只能执行一个线程的任务。同时间段内有几个多线程需要CPU去运行时,CPU也只能交替去执行多个线程中的一个线程,但是由于其执行速度特别快,因此感觉不出来。
- 多核 :多核的CPU才能更好的发挥多线程的效率。
进程(Process)
- CPU从硬盘中读取一段程序到内存中,该执行程序的实例就叫做进程.
- 一个程序如果被CPU多次读取到内存中,变成多个独立的进程.将程序运行起来,我们称之为进程.进程是执行程序的一次执行过程,他是动态的概念.进程存在生命周期,也就是说程序随着程序的终止而销毁.进程之间是通过TCP/IP端口实现交互的.
理解:一个应用程序(一个进程就是一个软件),一个程序至少包含一个进程,一个进程中至少包含一条线程.
线程
- 当cpu处理数据时,某一个时刻点任何cpu都只能处理一个程序.
- 线程是进程中的实际运作单位,是进程的一条流水线,是程序的实际执行者,是最小的执行单位。通常在一个进程中可以包含若干个线程。线程是CPU调度和执行的最小单位。
- 一个进程可以有多个线程,如视频可以同时看图像、听声音、看弹幕,等等;
- 很多线程都是模拟出来的,真正的多线程是指有多个CPU,即多核,如服务器,如果是模拟出来的多线程,即一个CPU的情况下,在同一个时间点,CPU只能执行一个代码,因为切换很快,所以就有同时执行的错觉。
对与java程序来说,当在DOS命令窗口中输入:
java HelloWorld 回车之后。会先启动JVM,而JVM就是一个进程。
JVM再启动一个主线程调用main方法(main方法就是主线程)。
同时再启动一个垃圾回收线程负责看护,回收垃圾.
**注意 :**使用多线程机制之后,main方法结束只是主线程结束了,其他线程还没结束,但没有主线程也不能运行。最起码,现在的java程序中至少有两个线程并发,一个是 垃圾回收线程,一个是 执行main方法的主线程。
多线程开发
- 并发
同一对象被多个线程同时操作;(这是一种假并行。即一个CPU的情况下,在同一个时间点,CPU只能执行一个代码,因为切换的很快,所以就有同时执行的错觉)。
特点 :同时安排若干个任务,这些任务可以彼此穿插着进行;有些任务可能是并行的,比如买菜、发邮件和去洗脚的某些路是重叠的,这时你的确同时在做三件事;但进菜市场和发邮件和接娃三者是互斥的,每个时刻只能完成其中一件。换句话说,并发允许两个任务彼此干扰。
- 并行
你(线程)做你的事,我(线程)做我的事,咱们互不干扰并同时进行。
- 串行
一个程序处理当前进程,按顺序接着处理下一个进程,一个接着一个进行
特点 : 前一个任务没搞点,下一个任务就只能等着。
多线程优点
- 提高应用程序的响应。堆图像化界面更有意义,可以增强用户体验。
- 提高计算机系CPU的利用率
- 改善程序结构,将即长又复杂的进程分为多个线程,独立运行,利于理解和修改。
何时需要多线程
- 程序需要同时执行两个或多个任务
- 程序需要实现一些需要等待的任务时,如用户输入、文件读写操作、网络操作、搜索等。
- 需要一些后台运行的程序时。
实现方式
继承Thread类
步骤
- 定义Thread类的子类,并且重写该类的run()方法,该run()方法将作为线程的执行体.
- 创建Thread子类的实力,即创建了线程对象.
- 调用线程对象的start()方法来启动线程.
start()方法的调用后并不是立即执行多线程代码,而是使得该线程变为可运行状态(Runnable),什么时候巡行是由操作系统决定的.
start()方法不能重复调用,重复调用的话,会出现java.lang.IllegalThreadStateException异常。
中间线程睡眠随机时间
public class Main {
public static void main(String[] args) {
Thread1 thread1 = new Thread1("A");
Thread1 thread2 = new Thread1("B");
thread1.start();
thread2.start();
}
}
class Thread1 extends Thread{
private String name;
public Thread1(String name){
this.name = name;
}
public void run(){
for (int i = 0; i < 3; i++) {
System.out.println(name+"运行: "+i);
try{
int x = (int)(Math.random()*10)*100;
System.out.println(name+"睡眠: "+x);
sleep(x);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
A运行: 0
B运行: 0
A睡眠: 500
B睡眠: 100
B运行: 1
B睡眠: 700
A运行: 1
A睡眠: 0
A运行: 2
A睡眠: 500
B运行: 2
B睡眠: 400
中间线程睡眠固定时间
public void run(){
for (int i = 0; i < 3; i++) {
System.out.println(name+"运行: "+i);
try{
System.out.println(name+"睡眠: "+200);
sleep(200);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
A运行: 0
B运行: 0
A睡眠: 200
B睡眠: 200
B运行: 1
B睡眠: 200
A运行: 1
A睡眠: 200
B运行: 2
B睡眠: 200
A运行: 2
A睡眠: 200
实现Runnable接口
- 定义Runnable接口的实现类,并实现该接口的run()方法,该run()方法将作为线程执行体。
- 创建Runnable实现类的实例,并将其作为Thread的target来创建Thread对象,Thread对象为线程对象。
- 调用线程对象的start()方法来启动线程
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
public class test8 {
public static void main(String[] args) {
new Thread(new Thread2("C")).start();
new Thread(new Thread2("D")).start();
}
}
class Thread2 implements Runnable {
private String name;
public Thread2(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println(name + "运行: " + i);
try {
int x = (int) (Math.random() * 10) * 100;
System.out.println(name + "睡眠: " + x);
Thread.sleep(x);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
其实该方法本质上和继承Thread类差不多,看似只是将Run方法单独实现了一遍,但是从继承和实现的角度来说有很多好处.
实现Runnable接口比继承Thread类所具有的优势:
- 适合多个相同的程序代码的线程去处理同一个资源.
- 可以避免java中的单继承的限制.(通过多实现接口中的方法)
- 增加程序的健壮性,代码可以被多个线程共享,代码和数据独立.
- 线程池只能放入实现Runnable或Callable类线程,不能直接放入继承Thread的类.
实现Callable接口
- 创建Callable接口的实现类,并实现Call()方法,该call()方法将作为线程方法体,且该call()方法有返回值.然后再创建Callbale实现类的实例.
- 使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。
- 使用FutureTask对象作为Thread对象的target创建并启动新线程。
- 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
public class FutureTask<V> implements RunnableFuture<V>{
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
...
}
可以看到FutureTask是间接继承于Runnable接口的.Callabled的作用基本上就是使用call()方法代替了run()方法,只不过call()方法可以有返回值
public static void main(String[] args) throws Exception{
//1. 创建一个FutureTask类,未来任务类
//需要给一个Callable()接口实现类对象
FutureTask task = new FutureTask(new Callable() {
//call()方法就相当于run方法.只不过这个有方法值
@Override
public Object call() throws Exception {
//线程执行一个任务,执行之后可能会有一个执行结果
//模拟执行
System.out.println("begin");
Thread.sleep(1000);
System.out.println("sleep:"+1000);
System.out.println("end");
int a = 100;
int b = 200;
return a + b;
}
});
//创建线程对象
Thread t =new Thread(task);
//启动线程
t.start();
// 这里是main方法,这是在主线程中。
// 在主线程中,怎么获取t线程的返回结果?
// get()方法的执行会导致“当前线程阻塞”
Object obj = task.get();
System.out.println("线程执行结果:" + obj);
// main方法这里的程序要想执行必须等待get()方法的结束
// 而get()方法可能需要很久。因为get()方法是为了拿另一个线程的执行结果
// 另一个线程执行是需要时间的。
System.out.println("hello world!");
}
begin
sleep:1000
end
线程执行结果:300
hello world!
线程状态转换
线程状态
- 新建状态(New)
新创建了一个线程对象.
- 就绪状态(Runnable)
线程对象创建后,其他线程调用了该对象的start()方法,该状态的线程位于可运行线程池中,变得可运行,等待获取cpu的使用权.
- 运行状态(Running)
就绪状态的线程获取了cpu,执行程序代码
- 阻塞状态(Blocked)
阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行.知道线程进入就绪状态,才有机会转到运行状态.
阻塞分类
- 等待阻塞:运行的线程执行wait()方法,JVM会把该线程放入等待池中.(wait会释放持有的锁)
- 同步阻塞:运行的线程再获取对象的同步锁,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中.
- 其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态.当sleep()状态超时,join()等待线程终止或者超时,或者I/O处理完毕时,线程重新转入就绪状态.
锁:说白了就是线程对cpu的使用权
注意: sleep是不会释放持有的锁
- 死亡状态(Dead)
线程执行完了或者因异常退出了run()方法,该线程结束生命周期.
线程调度
调整线程优先级
- java线程有优先级,优先级高的线程会获得较多的运行机会.
- Thread类的setPriority()和getPriority()方法分别用来设置和获取线程的优先级.
- java线程的优先级用整数表示,取值范围是1-0;
线程睡眠
Thread.sleep(long millis)方法,使线程转到阻塞状态。millis参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪(Runnable)状态。sleep()平台移植性好。
线程等待
Object类中的wait()方法,导致当前的线程等待,直到其他线程调用此对象的 notify() 方法或 notifyAll() 唤醒方法。这个两个唤醒方法也是Object类中的方法,行为等价于调用 wait(0) 一样。
sleep()和wait()的区别
sleep()是Thread类中的静态方法,而wait()是Object类中的成员方法;
sleep()可以在任何地方使用,而wait()只能在同步方法或同步代码块中使用;
sleep()不会释放锁,而wait()会释放锁,并需要通过notify()/notifyAll()重新获取锁。
线程让步
Thread.yield() 方法,暂停当前正在执行的线程对象,把执行机会让给相同或者更高优先级的线程。
线程加入
join()方法,等待其他线程终止。在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态。
线程唤醒
Object类中的notify()方法,唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程。选择是任意性的,并在对实现做出决定时发生。线程通过调用其中一个 wait 方法,在对象的监视器上等待。 直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争;例如,唤醒的线程在作为锁定此对象的下一个线程方面没有可靠的特权或劣势。类似的方法还有一个notifyAll(),唤醒在此对象监视器上等待的所有线程。
notify()、notifyAll()的区别
notify()
用于唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。
notifyAll()
用于唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。
线程同步
定义
即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作, 其他线程才能对该内存地址进行操作,而其他线程又处于等待状态,
线程同步方式
- 同步方法
即有synchronized关键字修饰的方法,由于java的每个对象都有一个内置锁,当用此关键字修饰方法时, 内置锁会保护整个方法。
在调用该方法前,需要获得内置锁,否则就处于阻塞状态。
需要注意, synchronized关键字也可以修饰静态方法,此时如果调用该静态方法,将会锁住整个类。
- 同步代码块
即有synchronized关键字修饰的语句块,被该关键字修饰的语句块会自动被加上内置锁,从而实现同步。
需值得注意的是,同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。
- Lock锁同步
因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,而通过Lock就可以办到。
总的来说Lock要比synchronized提供的功能更多,可定制化的程度也更高,Lock不是Java语言内置的,而是一个类。
lock()、tryLock()和lockInterruptibly()方法是用来获取锁的
unlock()方法是用来释放锁的。
tryLock()顾名思义,是用来尝试获取锁的,并且该方法有返回值,表示获取成功与否,获取成功返回true,失败返回false,从方法可以发现,该方法如果没有获取到锁时不会继续等待的,而是会直接返回值。
tryLock()的重载方法tryLock(long time, TimeUnit unit)功能类似,只是这个方法会等待一段时间获取锁,如果过了等待时间还未获取到锁就会返回false,如果在等待时间之内拿到锁则返回true。
- ReentrantLock
Java 5新增了一个java.util.concurrent包来支持同步,其中ReentrantLock类是可重入、互斥、实现了Lock接口的锁,它与使用synchronized方法和快具有相同的基本行为和语义,并且扩展了其能力。
需要注意的是,ReentrantLock还有一个可以创建公平锁的构造方法,但由于能大幅度降低程序运行效率,因此不推荐使用。
- volatile
volatile关键字为域变量的访问提供了一种免锁机制,使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新,因此每次使用该域就要重新计算,而不是使用寄存器中的值。
需要注意的是,volatile不会提供任何原子操作,它也不能用来修饰final类型的变量。
- 原子变量
在java的util.concurrent.atomic包中提供了创建了原子类型变量的工具类,使用该类可以简化线程同步。
例如AtomicInteger 表可以用原子方式更新int的值,可用在应用程序中(如以原子方式增加的计数器),但不能用于替换Integer。
可扩展Number,允许那些处理机遇数字类的工具和实用工具进行统一访问。
多线程间通信
-
wait()、notify()、notifyAll()
-
如果线程之间采用synchronized来保证线程安全,则可以利用wait()、notify()、notifyAll()来实现线程通信。
-
这三个方法都不是Thread类中所声明的方法,而是Object类中声明的方法。
public class Object { public final native void notify(); public final native void notifyAll(); public final native void wait(long timeout) throws InterruptedException; ... }
-
wait()方法可以让当前线程释放对象锁并进入阻塞状态。
-
notify()方法用于唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。
-
notifyAll()用于唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。
-
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了已就绪(将要竞争锁)的线程,阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待CPU的调度。反之,当一个线程被wait后,就会进入阻塞队列,等待被唤醒。
-
-
await()、signal()、signalAll()
- 如果线程之间采用Lock来保证线程安全,则可以利用await()、signal()、signalAll()来实现线程通信。
public interface Lock{ ... Condition newCondition(); }
- 这三个方法都是Condition接口中的方法,该接口是在Java 1.5中出现的,它用来替代传统的wait+notify实现线程间的协作,它的使用依赖于 Lock。
public interface Condition{ void await() throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; void signal(); void signalAll(); }
- 相比使用wait+notify,使用Condition的await+signal这种方式能够更加安全和高效地实现线程间协作。
- Conditon中的await()对应Object的wait(),Condition中的signal()对应Object的notify(),Condition中的signalAll()对应Object的notifyAll()。
-
BlockingQueue
阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
使用场景: 多线程并发处理,线程池!
写入如果队列满了就必须阻塞等待,如果队列为空则必须阻塞等待生产
BlockingQueue 不接受null值 试图添加一个null元素时会抛出异常
BlockingQueue 可以是限定容量的 超过给定容量时是无法添加的
阻塞队列核心方法
方法类型 | 抛出异常 | 特殊值( 有返回值) | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add | offer | put | offer |
移除 | remove | poll | take | poll |
判断队列首 | element | peek | - | - |
- Java 5提供了一个BlockingQueue接口,虽然BlockingQueue也是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程通信的工具。
public interface BlockingQueue<E> extends Queue<E>{
}
- BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。
- 程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好地控制线程的通信。线程之间需要通信,最经典的场景就是生产者与消费者模型,而BlockingQueue就是针对该模型提供的解决方案。
案例1:抛出异常(add(),remove(),element())
当无法放入或者无法取出时会抛出异常
//指定队列大小为3
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
//为队列添加值 add()添加成功返回true
System.out.println(arrayBlockingQueue.add(1));
System.out.println(arrayBlockingQueue.add(2));
System.out.println(arrayBlockingQueue.add(3));
//找到队首
System.out.println(arrayBlockingQueue.element());
//超过队列大小 add会抛出异常 Queue full
System.out.println(arrayBlockingQueue.add("4"));
true
true
true
1
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at test9.main(test9.java:19)
//remove取出一个元素 返回取出的值 如果队列为空 remove会抛出异常
// NoSuchElementException
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
true
true
true
1
1
2
3
Exception in thread "main" java.util.NoSuchElementException
at java.util.AbstractQueue.remove(AbstractQueue.java:117)
at test9.main(test9.java:25)
案例2:特殊值(有返回值)(offer(),poll(),peek())
当是否可以放入或者是否可以取出时会返回true or false.
//队列的大小
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
//offer 添加一个元素 返回一个boolean值 成功返回true失败返回true
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
System.out.println(blockingQueue.offer(4));
System.out.println("----------------");
//检测队首元素
System.out.println(blockingQueue.peek());
//poll 取出一个元素 返回一个元素 队列为空时 取出null
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
true
true
true
false
----------------
1
1
2
3
null
案例3:阻塞(put(),take(),-)
放不进去了会一直阻塞直到有空位
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
try {
//put添加元素 没有返回值 满了一直阻塞
//队列大小为二 第三个元素放不进去 阻塞两秒过后就会结束
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("3");
blockingQueue.put("4");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//取出元素 空了一直阻塞 返回值取出的元素
System.out.println(blockingQueue.take());;
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
案例4:有时间限制阻塞(offer(),poll(),-)
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
try {
//阻塞时间2秒.超过两秒就继续执行(参数 插入的数值 超时时间 和 单位 )
blockingQueue.offer("1",2, TimeUnit.SECONDS);
blockingQueue.offer("2",2, TimeUnit.SECONDS);
blockingQueue.offer("3",2, TimeUnit.SECONDS);
//无法插入时,超过时间限制就不再阻塞,继续执行.
blockingQueue.offer("4",2, TimeUnit.SECONDS);
System.out.println("------");
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
//无法取出时,超过时间限制就返回null
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
------
1
2
3
null
线程池
了解线程池
定义
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互。在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会启动一个空闲的线程来执行它们的run()或call()方法,当run()或call()方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个Runnable对象的run()或call()方法。
从Java 5开始,Java内建支持线程池。Java 5新增了一个Executors工厂类来产生线程池,该工厂类包含如下几个静态工厂方法来创建线程池。创建出来的线程池,都是通过ThreadPoolExecutor类来实现的。
常见接口和子类
我们先来看看Executor的UML图(常用的几个接口和子类):
Executor
是一个接口其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,Runnable任务开辟在新线程中的使用方法为:new Thread(new RunnableTask())).start(),但在Executor中,可以使用Executor而不用显示地创建线程:executor.execute(new RunnableTask());
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
ExecutorService
是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。
通过 ExecutorService.submit() 方法返回的 Future 对象,可以调用isDone()方法查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get()获取结果,在这种情况下,get()将阻塞,直至结果准备就绪,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。
public interface ExecutorService extends Executor {
void shutdown();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}
优缺点
优点
- 降低资源消耗:通过重复利用已经创建的线程降低线程创建和销毁造成的消耗.
- 提高响应速度:当日到达时,任务可以不用等到线程创建就能立即执行.
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.
缺点
- 非核心线程的创建时机
- 非核心线程创建的触发时机是:当前线程池中核心线程已满,且没有空闲的线程,还有任务等待队列已满,满足上面的所有条件,才会去创建线程去执行新提交的任务,如果线程池中的线程数量达到 maxinumPoolSize 的值,此时还有任务进来,就会执行拒绝策略,抛弃任务或者其他.
- 如果拒绝策略是抛弃任务的话,有一种场景,就会造成大量任务的丢弃,就是瞬时冲高的情况下。
- 排队任务调度策略
- 当线程池中核心线程数量已达标,且没有空闲线的情况下,在产生的任务,会加入到等待队列中去,这样一直持续下去,等到等待队列已满,在来的任务,会创建非核心线程去执行新提交的任务,那么就产生一种结果,在等待队列中的任务是先提交的任务,反而没有在此时提交的任务先执行。
- 任务的执行顺序和任务的提交顺序不一致,如果业务需求的任务是有先后依赖关系的,就会降低线程的调度效率
ThreadPoolExecutor类
线程池状态
线程池有五种状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- RUNNING
能接受新提交的任务,并且也能处理阻塞队列中的任务。
- SHUTDOWN
关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。
- STOP
不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
- TIDYING
如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
-
TERMINATED
在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。在ThreadPoolExecutor类中的terminated()是如下定义的.但是在进入该方法前有条件.
protected void terminated() { }
进入TERMINATED的条件如下:
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //加锁就是为了保证锁里的代码能狗正常运行,而不会被抢资源 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
对该方法解读如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
最后将该锁解锁mainLock.unlock();
SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED
以下是线程状态转换图
线程池工作原理
线程池参数
在学习线程池工作原理之前,我们先来看看线程池创建的具体参数.以下是ThreadPoolExecutor类的初始构造方法.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看到一共有七个参数可以设置.
-
int corePoolSize(核心工作线程数):
当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时。
-
int maximumPoolSize(最大线程数)
线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。
-
long keepAliveTime(多余线程存活时间)
当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
this.keepAliveTime = unit.toNanos(keepAliveTime);
-
TimeUnit unit
线程存活时间的时间单位.
- BlockingQueue workQueue(任务队列容量(阻塞队列))
用于传输和保存等待执行任务的阻塞队列。
决定了缓存任务的排队策略.
ThreadPoolExecutor线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue 和 ArrayBlockingQueue。
有界队列:
SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
无界队列:
LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。
注意:keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。
- ThreadFactory threadFactory(线程创建工厂)
可以不指定
线程创建工厂用来按自定义方式来创建线程,说白了就是给创建线程时给个自定义名字和自定义run()方法.如果不指定的话会按照Executors.defaultThreadFactory()默认的defaultThreadFactory线程工厂来创建线程.
默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY
的优先级,以及名为 “pool-XXX-thread-”
的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。
- RejectedExecutionHandler handler(拒绝策略)
可以不指定
当线程池和队列都满了,线程池拒绝添加新任务时采取的策略,再加入线程会执行此策略.
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
可以自定义策略.
ThreadPoolExecutor类有四个已经定义好的策略:
策略 | BB |
---|---|
ThreadPoolExecutor.AbortPolicy() | 抛出RejectedExecutionException异常。默认策略 |
ThreadPoolExecutor.CallerRunsPolicy() | 由向线程池提交任务的线程来执行该任务 |
ThreadPoolExecutor.DiscardPolicy() | 抛弃当前的任务 |
ThreadPoolExecutor.DiscardOldestPolicy() | 抛弃最旧的任务(最先提交而没有得到执行的任务 |
较为实用的方法还是AbortPolicy提供的处理方式:抛出异常,由开发人员进行处理
这些参数在线程池创建完毕后还可以通过以下set方法进行修改.
public void allowcoreThreadTime0ut(boolean value)
public void setKeepAliveTime(long time,TimeUnit unit)
public void setMaximumPoolSize(int maximumPoolSize)
public void setCorePoolsize(int corePoolsize)
public void setThreadFactory(ThreadFactory threadFactory)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
工作流程
线程池工作流程如图所示
- 判断核心线程池是否已满,没满则创建一个新的工作线程来执行任务。
- 判断任务队列是否已满,没满则将新提交的任务添加在工作队列。
- 判断整个线程池是否已满,没满则创建一个新的工作线程来执行任务,已满则执行饱和(拒绝)策略。
参数设置
首先得了解决定值
- tasks :每秒的任务数,假设为500~1000
- taskcost:每个任务花费时间,假设为0.1s
- responsetime:系统允许容忍的最大响应时间,假设为1s
再通过计算得到参考参数
- corePoolSize = 每秒需要多少个线程处理?
- threadcount = tasks/(1/taskcost) = tasks*taskcout = (500 ~ 1000)*0.1 = 50~100 个线程。
- corePoolSize设置应该大于50。
- 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可。
- workQueue
- workQueue= (coreSizePool/taskcost)*responsetime
- workQueue= 80/0.1*1 = 800。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行。
- 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
- maxPoolSize 最大线程数
- 在生产环境上我们往往设置成corePoolSize一样,这样可以减少在处理过程中创建线程的开销。
- Handler
- 根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理。
- keepAliveTime和allowCoreThreadTimeout采用默认通常能满足。
工作队列大小设置
根据任务的难易程度来设置工作队列大小
-
CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。
-
IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。
-
混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
线程池种类(不同的线程池创建方法->不同创建参数)
Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
- newCachedThreadPool()
创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newFixedThreadPool(int nThreads)
创建一个可重用的、具有固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor()
创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为1。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newScheduledThreadPool(int corePoolSize)
创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。corePoolSize指池中所保存的线程数,即使线程是空闲的也被保存在线程池内。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- newSingleThreadScheduledExecutor()
创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
以上五个方法都可以添加一个新的参数(ThreadFactory threadFactory),准确来说是ThreadPoolExecutor的构造函数可以添加这个参数.
它的作用是创建线程并指定具体的线程名字,方便追溯和分析。ThreadFactory实际上是一个接口,实例化有两种方式,一种是第三方实现,另一种是自定义实现。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
- ExecutorService newWorkStealingPool(int parallelism)
创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
- ExecutorService newWorkStealingPool():该方法是newWorkStealingPool(int parallelism)的简化版本。如果当前机器有4个CPU,则目标并行级别被设置为4,也就是相当于为前一个方法传入4作为参数。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}