Java 并发和多线程编程推荐《Java 并发编程实战》和《多线程编程实战指南》,前者是外国非常受欢迎的书籍的翻译本,后者是国人写的书,符合国人的思维模式。
进程、线程与任务
在操作系统中会运行多个程序,一个运行中的程序就是一个进程,如一个运行中的 Idea 就是一个进程,一个 Java 虚拟机就是一个进程。进程是程序向操作系统申请资源的基本单位。
一个进程中可以包含多个线程,同一个进程中的所有线程共享该进程中的资源,如内存空间、文件句柄等。线程是进程中可独立执行的最小单位。
线程所要完成的计算被称为任务。特定的线程总是执行着特定的任务。
一个程序往往需要完成许多独立的任务,也就是包含多个线程,操作系统抽象出进程和线程的改变,这样操作系统只需要管理进程,由进程来管理许多的线程,可以降低由操作系统直接管理线程的复杂度。
Java 中线程的实现
Java 是一门面向对象设计语言,所以在 Java 中线程也是一个对象,它是 Java 标准类库 java.lang.Thread。Thread 类或其子类的一个实例就是一个线程。
Thread 类的两个常用构造器是:Thread() 和 Thread(Runnable target)。对应了 Java 中创建线程的两种方式,一种是使用第 1 个构造函数,继承 Thread 类并调用 start() 方法,第二种是实现 Runnable 接口然后传给第 2 个构造函数并调用 start() 方法。
线程的处理逻辑需要写在 run() 方法中,但是不能通过调用 run() 方法来启动线程,而是需要调用 start() 方法。从 JVM 的运行时数据区来看,线程拥有程序计数器、虚拟机栈和本地方法栈,这些需要 JVM 来分配,start() 方法会请求 JVM 分配这些资源并向操作系统申请一个线程,而调用 run() 方法只会在当前线程中执行处理逻辑。由此可见,相比普通对象,创建线程对象的成本要高一些。
Java 中的线程存在层次关系,在当前线程中创建的其他线程被称为当前线程的子线程,子线程也可以有子线程,子线程部分属性的默认值会继承父线程中的属性值,如线程优先级、是否为守护线程等。虽然线程之间有父子的层次关系,但是父线程和子线程的生命周期并没有必然的联系,即父线程运行结束后,子线程可以继续运行,子线程运行结束也不妨碍其父线程的继续运行。
Thread 类
属性
线程的属性包括线程的编号(ID)、名称(name)、是否为守护线程(Daemon)和优先级(Priority)。
线程的编号由 JVM 分配,在 JVM 运行中的一个时间点中,线程的编号是唯一的,也就是说 JVM 会复用已经终止的之前的线程的编号。所以该属性不能用来做线程的唯一标识。
线程的名称可以由程序员设置,默认的线程名称和线程的编号有关,该线程定义名称有助于代码调试和问题定位。
线程分为守护线程和非守护线程,该属性的默认值与父线程的该属性的值相同,非守护线程会阻止进程的终止,但是守护线程不会,也就是说当进程中只剩下守护线程的时候,即使该守护线程还在运行,进程也会终止。守护线程适合执行一些重要性不是很高的任务,例如监控其他线程的执行情况。
Java 定义了 1~10 的 10 个优先级,默认是 5,如果线程有父线程,则默认优先级与父线程的优先级值相等。Java 中定义的优先级数量和操作系统中的并不一样,而且不恰当地设置该属性可能导致线程饥饿,所以不推荐修改该属性。
方法
- static Thread currentThread():该方法返回当前线程
- void run():用于定义线程的任务处理逻辑
- void start():启动相应线程
- void join():等待相应线程运行结束,如线程 A 调用线程 B 的 join 方法,那么线程 A 的运行会被暂停,直到线程 B 运行结束
- static void yield():线程礼让,使当前线程失去 CPU 的时间片,但是当前线程有机会重新占用 CPU 时间片,所以这个方法是不可靠的
- static void sleep(long millis):使当前线程暂停指定时间
线程的生命周期
在 Java 中,线程的状态定义在 Thread 类内部的 State 枚举类中,State 枚举类包含的值有:
- NEW:一个已创建而未启动的线程处于该状态。由于一个线程实例只能被启动一次,因此一个线程只可能有一次处于该状态。
- RUNNABLE:正在运行中的线程和正在等待 CPU 时间片的线程(就绪状态)处于该状态。
- BLOCKED:当线程发起阻塞时 IO 操作后,线程会处于该状态。处于该状态的线程并不会占用处理器资源,当阻塞式 IO 操作完成后,该线程的状态又可以转换为 RUNNABLE 状态。
- WAITING:当线程执行了某些特定方法之后就会处于这种等待其他线程执行另外一些特定操作的状态。能够使其执行线程变更为 WAITING 状态的方法包括:Object.wait()、Thread.join() 和 LockSupport.park(Object)。能够使相应线程从 WAITING 变更为 RUNNABLE 状态的相应方法包括:Object.notify() 和 LockSupport.unpark(Object)。
- TIMED_WAITING:该状态与 WAITING 类似,差别在于处于该状态的线程并非无限制等待其他线程执行特定操作,而是处于带有时间限制的等待状态。当其他线程没有在指定时间内执行该线程所期望的特定操作时,该线程的状态自动转换为 RUNNABLE。
- TERMINATED:已经执行结束的线程处于该状态。由于一个线程实例只能被启动一次,因此一个线程也只可能有一次处于该状态。
线程生命周期流转图:
线程的终止
Thread 类有有些用于线程停止的方法:stop()、suspend(),但是这些方法已经被废弃。
Java 平台为每个线程唯一一个被称为中断标记的布尔型状态变量用于表示相应线程是否接收到了中断,中断标记值为 true 表示相应线程收到了中断。目标线程可以通过 Thread.currentThread().isInterrupted() 调用来获取该线程的中断标记值,也可以通过 Thread.interrupted() 来获取并重置中断标记值,即 Thread.interrupted() 会返回当前线程的中断标记值并将当前线程中断标记重置为 false。调用一个线程的 interrupt() 相当于将该线程的中断标记置为 true。
目标线程检查中断标记后所执行的操作,被称为目标线程对中断的相应,简称中断响应。设有个发起线程 originator 和目标线程 target,那么 target 对中断的响应一般包括:
- 无影响。originator 调用 target.interrupt() 不会对 target 的运行产生任何影响。这种情形也可以称为目标线程无法对中断进行响应。InputStream.read()、ReentrantLock.lock() 以及申请内部锁等阻塞方法/操作就属于这种类型。
- 取消任务的运行。originator 调用 target.interrupt() 会使 target 在侦测到中断那一刻所执行的任务被取消,而这并不会影响 target 继续处理其他任务。
- 工作者线程停止。originator 调用 target.interrupt() 会使 target 终止,即 target 的生命周期状态变更为 TERMINATED。
Java 标准库中的许多阻塞方法如 Object.wait()、Object.notify()、Thread.sleep() 等对中断的响应方式都是抛出 InterruptedException 等异常。也有些阻塞方法如 InputStream.read()、Lock.lock() 无法中断异常。
能够响应中断的方法通常是执行阻塞前判断中断标记,若中断标记值为 true 则抛出 InterruptedException 异常。按照惯例,抛出 InterruptedException 异常的方法,通常会在其抛出该异常时将当前线程的线程中断标记重置为 false。
如果发现线程给目标线程发送中断的那一刻,目标线程已经由于执行了一些阻塞方法而被暂停(声明周期状态为 WAITING 或者 BLOCKED)了,那么此时 JVM 可能会设置会将该线程唤醒,从而使目标线程得到响应中断的机会。由此可将,给目标线程发送中断还能够产生唤醒目标线程的效果。
线程停止的原因有 run() 方法执行结束的正常停止和运行中抛出异常的异常停止。因此我们可以给线程设置一个布尔类型的线程停止标记,目标线程检测到该标记为 true 使让其 run() 方法返回,这样就实现了线程的终止。
我们之前提到的线程中断标记也是一个布尔类型的,它是否可以用来做线程停止标记呢?
由于线程中断标记可能会被目标线程的某些方法清空,因此从通用性的角度来看线程中断标记并不能作为线程停止标记!而如果只用一个布尔类型的线程停止标记,当线程执行了一些阻塞方法的时候不会检查线程停止标记,所以我们需要将线程停止标记和线程中断标记结合使用。
当需要停止目标线程的时候,除了修改线程停止标记为 true 之外,还需要给目标线程发送线程中断标记。
串行、并行和并发
串行是指一次执行一个任务,多个任务被依次执行,task1 -> task2 -> task3。则执行时间是所有任务耗时的总和。
并行是指一次执行多个任务,多个任务同时开始执行,则执行时间是最长耗时任务的执行时间。
并发是指一次执行一个任务,如先执行 task1,在 task1 执行一段时间后暂停 task1 的执行,转而执行 task 2,在 task 2 执行一段时间后暂停 task2 转而执行 task 3,依次类推,直到所有任务都执行完成。
并发描述的就是多线程运行在一个 CPU 中和情形,线程任务可能会因为 CPU 时间片到期或者发生阻塞式 IO 而暂时不会用到处理器资源,这时候线程任务还没有执行完成,为了充分利用 CPU 资源,提高程序的性能,操作系统不会等待当前线程,而是把当前线程的调用栈和当前指令等资源保存起来,然后执行另一个线程任务。
竞态
在多线程编程中,对于同样的输入,程序的输出有时候是正确的有时候是错误的。这种一个计算结果的正确性与时间有关的现象被称为竞态。竞态是多线程编程的产物,即使运行程序的 CPU 是单核的,也会导致竞态的产生。
竞态产生的条件
竞态的产生伴随着在多线程下对共享变量的访问,竞态产生的条件是一个线程读取共享变量并以该共享变量为基础进行计算的期间另外一个线程更新了该共享变量的值而导致的读取脏数据或者丢失更新的结果。
对于局部变量来说,不同的线程访问的是各自的副本,并不存在共享的情况下,所以局部变量的使用不会导致竞态!
竞态的模式
- read-modify-write
该模式描述的场景是线程 A 读取了一组共享变量并更新了共享变量的值,在还没有将修改后的值同步回主内存的时候,线程 B 从主内存读取了共享变量的旧值,这就造成了读脏数据,线程 B 继续依赖共享变量的旧值计算更新共享变量的结果,该结果是错误的结果,然后线程 B 将错误的结果同步到主内存,最后线程 A 将更新同步到主内存,线程 A 覆盖了线程 B 的更新,这就造成了丢失更新。
read-modify-write 二维表如下:
时间/线程 | 线程 A | 线程 B |
---|---|---|
t1 | 从主内存中读取共享变量 var | |
t2 | 在 CPU 中修改共享变量 var | |
t3 | 从主内存中读取共享变量 var | |
t4 | 在 CPU 中修改共享变量 var | |
t5 | 将更新后的共享变量同步会主内存 | |
t6 | 将更新后的共享变量同步会主内存 |
- check-then-act
该模式描述的场景是线程 A 读取了共享变量的值并将该值用于条件判断语句决定后续执行代码块 C1,(如使用 if 条件判断),在线程 A 执行条件判断语句之后,执行 C1 代码块之前,另一个线程 B 修改了共享变量的值导致此时基于共享变量进行判断会执行代码块 C2,但是线程 A 依旧是执行代码块 C1。
check-then-act 的二维表如下:
时间/线程 | 线程 A | 线程 B |
---|---|---|
t1 | 从主内存中读取共享变量 var | |
t2 | 判断共享变量 var,确定执行代码块 C1 | |
t3 | 从主内存中读取共享变量 var | |
t4 | 在 CPU 中修改共享变量 var(基于共享新值判断会执行代码块 C2) | |
t5 | 将更新后的共享变量同步会主内存 | |
t6 | 执行代码块 C1 |
线程安全
在 Java 多线程编程中,如果一个类在单线程环境下能够运行正常,并且在多线程环境下,在其使用方不必为其做任何改变的情况下也能运行正常,那么我们就称其是线程安全的。反之,如果一个类的单线程环境下运行正常而在多线程环境下则无法正常运行,那么这个类就是非线程安全的。
一个类如果不是线程安全的,我们就说它在多线程环境下直接使用存在线程安全问题。在 Java 标准库中定义了线程安全的类如 Vector、CopyOnWriteArrayList 和 HashTable 等,也定义了非线程安全的类如 ArrayList、HashMap 等。
线程安全问题概括来说表现为 3 个方面:原子性、可见性和有序性。
原子性
原子(Atomic)的字面意思是不可分割的。对于涉及共享变量访问的操作,若该操作从其执行线程以外的任意线程来看是不可分割的,那么该操作就是原子操作。相应地我们称该操作具有原子性。
在 Java 语言中,变量的写操作和读操作都是原子操作,这由 JVM 来保证。
Java 提供了多种方式来实现一组操作的原子性,如使用锁,使用 CAS,这也从侧面说明了锁和 CAS 保证了操作的原子性。
理解原子操作这个概念主要需要注意以下两点:
- 原子操作是针对访问共享变量的操作而言的。也就是说,仅涉及局部变量访问的操作无所谓是否是原子的,或者干脆把这一类操作都看成原子操作。
- 原子操作时从该操作的执行线程以外的线程来描述的,也就是说它只有在多线程环境下有意义。
可见性
在多线程环境下,一个线程对某个共享变量进行更新之后,后续访问该变量的线程可能无法立即读取到这个更新的结果,甚至永远也无法读取这个更新的结果。这就是线程安全问题的另外一种表现形式:可见性(Visibility)。
由于 CPU 处理速度和内存的访问速度存在数量级之间的差距,所以 CPU 并不是直接访问的主内存,而是通过寄存器和高速缓存来间接访问,变量会先从主内存加载到高速缓存,CPU 从高速缓存中读取变量的值。在更新的时候,CPU 会先将更新写入高速缓存,在未来的某个时间点同步到主内存中,额外的高速缓冲导致了多线程之间的可见性问题。
Java 提供了 Volatile 关键字来保证线程对共享变量的可见性。被标注了 Volatile 关键字的变量在线程从高速缓冲中读取之前会先从主内存中更新该变量的值,在线程将变量写入高速缓存之后会立即将更新后的值同步到主内存中。
同时,JVM 保证了以下场景的可见性:
- 父线程在启动子线程之前对共享变量的更新对于子线程来说是可见的。
- 一个线程终止后该线程对共享变量的更新对于调用该线程的 join 方法的线程而言是可见的。
但这两种场景在工作中大概率用不到,因为现在谁还用 new 创建线程,都是用线程池了(狗头。
有序性
有序性指在什么情况下一个处理器上运行的一个线程所执行的内存访问操作在另外一个处理器上运行的其他线程看起来是乱序的。
重排序是对内存访问有关的操作所做的一种优化,它可以在不影响单线程正确性的情况下提升程序的性能。但是,它可能对多线程程序的正确性产生影响,即它可能会导致线程安全问题。
Java 程序中的重排序会发生两个地方,分别是使用 javac 编译源代码和 JIT 编译器翻译字节码的时候。
如 Java 中实例化对象的操作:Object obj = new Object
,在处理器中对应了三个操作,分别是:
- 分配 Object 实例所需的内存空间,并获得一个指向该空间的引用 ref
- 调用 Object 构造函数初始化 Object 实例
- 将 Object 实例引用 ref 复制给实例变量 obj
这三个操作在处理器中可能不是顺序执行的,操作 3 可能在操作 2 之前执行,所以 obj 指向的实例可能还没有初始化完成,而使用该未初始化完成的实例将造成不可预知的错误。
指令重排序能够提高性能,所以不能完全禁止指令重排序,但是操作系统提供了指令来部分禁止指令重排序,即可以让多线程访问共享变量的代码片段禁止指令重排序。
Java 中保证有序性的方式有:锁、volatile 关键字。
锁将代码分为三个区域:获取锁之前,临界区和获取锁之后。这三个区域内部还是允许指令重排序的,但是不允许区域之间指令重排序。
volatile 关键字通过使用内存屏障可以禁止访问共享变量之前的区域指令重排序。
线程同步机制
Java 平台提供的线程同步机制包括锁、volatile 关键字、CAS、final 关键字、static 关键以及一些相关的 API,如 Object.wait()/Object.notify() 等。
锁
Java 平台中的锁分为内部锁和显示锁。内部锁是通过 synchronized 关键字实现的,显示锁是通过 java.concurrent.locks.Lock 接口的实现类实现的。
线程在其获得锁之后和释放锁之前这段时间内锁执行的代码被称为临界区。
锁能够保护共享数据以实现线程安全,其作用包括保障原子性、可见性和有序性。
锁通过互斥保障原子性。所谓互斥,就是指一个锁依次只能被一个线程持有。
我们知道,可见性的保障是通过写线程冲刷处理器缓存和读线程刷新处理器缓存这两个动作实现的。锁的获得隐含着刷新处理器缓存这个动作,而锁的释放隐含着冲刷处理器缓存这个动作。因此,锁能够保障可见性。
锁也能够保障有序性,锁会将代码分为获取锁之前、临界区、释放锁之后三个区域,区域内部允许指令重排序,但是区域之间禁止指令重排序。
可重入锁
一个线程在持有一个锁的时候能够再次获取该锁。
读写锁
读写锁允许多个线程同时读取共享变量,但是只允许一次只有一个线程对共享变量进行更新。
volatile 关键字
volatile 关键字保证了共享变量的可见性和有序性。
CAS
CAS(Compare and Swap)是对一种处理器指令的称呼。CAS 是一个原子的 if-then-act 操作。
即 CAS 保障了原子性。
static 关键字和 final 关键字
Java 中类的初始化实际上采用了延迟加载的基础,即一个类被 JVM 加载之后,该类的所有静态变量的值都仍然是其默认值,直到有个线程初次访问了该类的任意一个静态变量才使这个类被初始化----类的静态初始化块(“static{}”)被执行,类的所有静态变量被赋予初始值。
public class ClassLazyInitDemo {
public staic void main(String[] args) {
System.in.out.println(Collaborator.class.hashCode()); //语句1
System.in.out.println(Collaborator.number); //语句2
System.in.out.println(Collaborator.flag);
}
static class Collaborator {
static int number = 1;
static boolean flag = true;
static {
System.in.out.println("Collaborator initializing...");
}
}
}
上述 Demo 中,语句1 仅仅使该类被 JVM 加载,而并没有使其初始化(即 static 块执行),只有执行语句2 的时候才初始化。
static 关键字在多线程环境下由其特殊的涵义,它能够保证一个线程即使在未使用其他同步机制的情况下也总是可以读取到一个类的静态变量的初始值而不是默认值。但是这种可见性保障仅限于线程初次读取该变量。在该场景中 static 关键字保障的是可见性。
对于引用型静态变量,static 关键字还能够保障一个线程读取到该变量的初始值时,这个值所指向的对象已经初始化完毕。在该场景中 static 关键字保障的是有序性。
final 关键字保障其他线程访问共享变量的时候总是能够读取到变量的初始值而不是默认值。在该场景中 final 关键字保障的是可见性。
对于引用型 final 字段,final 关键字还进一步确保该字段所引用的对象已经初始化完毕。在该场景中 final 关键字保障的是有序性。
挖掘可并发点
要实现多线程编程的目标 ---- 并发计算,我们首先需要找到程序中哪些处理是可以并发化,即由串行改为并发的。这些可并发化的处理被称为可并发点。
基于数据的分割实现并发化
如果程序的原始输入数据的规模比较大,那么可以采用基于数据的分割。其基本思想就是将原始输入数据按照一定的规则分解为若干规模较小的子输入,并使用工作者线程对这些子输入进行处理,每个工作者线程处理后会形成子输出,最后我们将所有的子输出合并在一起就是整个并发任务的输出。
在基于数据的分割方式中,主线程按照一定的规则将原始输入分割成小的子输出,然后创建工作者线程接收每一个子输出,工作者线程会独立完成所有的处理步骤,然后得到部分的输出结果,主线程在等待所有的工作者线程都处理完成之后,合并所有的子结果即可得到总的输出结果。主线程在分割输入和合并输出的过程中可能会带来额外的性能损耗和程序复杂性。
基于数据的分割产生的工作者线程是同质工作者线程,即任务处理逻辑相同的线程。
输入、输出与工作者线程之间的关系:
基于任务的分割实现并发化
基于任务分割的基本思想是将原始任务按照一定的规则分解成若干子任务,并使用专门的工作者线程去执行这些子任务,这时,工作者线程之间存在依赖关系,后一个工作线程的输入往往是前一个工作者线程的输出,这引入了线程之间的数据交互,可能会增加额外的复杂性。
基于任务的分割产生的是异质工作者线程,即任务处理逻辑各异的线程。
输出、输出与工作者线程之间的关系:
Java 线程同步实用类
Object.wait()/Object.notify()
在 Java 平台中,Object.wait()/Object.wait(long) 以及 Object.notify()/Object.notifyAll() 可用于实现等待和通知,Object.wait() 可以让线程进入 WAITING 状态,Object.notify() 可以唤醒一个进入 WAITING 状态的线程。相应的,Object.wait() 的执行线程就被称为等待线程;Object.notify() 的执行线程就被称为通知线程。由于 Object 类是 Java 中任何对象的父类,因此使用 Java 中的任何对象都能够实现等待和通知。
使用 Object.wait() 实现线程等待的模板代码如下:
// 在调用 wait 方法前需获得相应对象的内部锁
synchronized(someObject) {
while(保护条件不成立) {
// 调用 Object.wait() 暂停当前线程
someObject.wait();
}
// 代码执行到这里说明保护条件已经满足
// 执行目标动作
doAction();
}
其中保护条件是一个包含共享变量的布尔表达式。当共享变量被其他线程(通知线程)更新之后使相应的保护条件得以成立时,这些线程会通知等待线程。由于一个线程只有在持有一个对象的内部锁的情况下才能够调用该对象的 wait 方法,因此 Object.wait() 调用总是放在相应对象所引导的临界区之中。
注意:等待线程在被唤醒、继续运行到其再次持有相应对象的内部锁的这段时间内,由于其他线程可能抢先获得相应的内部锁并更新了相关共享变量而导致该线程所需的保护条件不成立。所以,对保护条件的判断以及 Object.wait() 调用应该放在循环语句之中,以确保目标动作只有在保护条件成立的情况下才能够执行!
上述模板代码的执行流程如下:
- 当前线程获取到 someObject 的内部锁,进入同步代码块。
- 判断保护条件是否成立
- 保护条件不成立,调用 someObject.wait() 方法暂停当前线程,因为 someObject.notify() 需要在同步块中执行,因此当前线程在暂停之后会释放持有的 someObject 内部锁,此时 wait() 方法还没有返回
- 其他线程在同步代码块中更新保护条件,并调用 someObject.notify()/someObject.notifyAll() 来通知唤醒等待线程
- 由于同一个对象的同一方法可以被多个线程执行,因此 someObject 对象上可能存在多个等待线程,所以当前线程被唤醒后会先尝试获取 someObject 的内部锁,如果获取到了内部锁,while 语句体中 someObject.wait() 才会返回
- 再次判断保护条件是否成立,如果成立,则执行目标动作 doAction()
- 最后退出同步代码块,释放 someObject 对象的内部锁
使用 Object.notify() 实现通知,其代码模板如下:
// 在调用 notify() 方法前需获得相应对象的内部锁
synchronized(someObject) {
// 更新等待线程的保护条件涉及的共享变量
updateSharedState();
// 唤醒等待线程
someObject.notify();
}
包含上述模板代码的方法被称为通知方法,它包含两个要素:更新共享变量、唤醒等待线程。由于一个线程只有在持有一个对象的内部锁的情况下才能够执行该对象的 notify 方法,因此 Object.notify() 调用总是放在相应对象内部锁所引导的临界区之中。因此 Object.wait() 在暂停其执行线程的同时必须释放相应的内部锁;否则通知线程无法获得相应的内部锁,也就无法执行相应对象的 notify 方法来通知等待线程!
notify() 方法调用应该尽可能放在靠近临界区结束的地方,以便等待线程在其被唤醒之后能够尽快再次获得相应的内部锁。
因为 wait()/notify() 可能会造成过早唤醒、信号丢失和欺骗性唤醒等问题,所以在日常工作中并不常使用它来实现线程同步,Java 标准类库提供了更高级的线程同步实用类,我们使用这些来解决工作中遇到的线程同步问题。
CountDownLatch
CountDownLatch 可以用来实现一个或多个线程等待其他线程完成一组特定的操作之后才继续运行。这组操作被称为先决操作。
CountDownLatch 内部会维护一个用于表示未完成的先决操作数量的计数器。CountDownLatch.countDown() 每被执行一次就会使相应实例的计数器值减少 1。当计数器不为 1 时,CountDownLatch.await() 的执行线程会暂停,这些线程就被称为相应 CountDownLatch 上的等待线程。CountDownLatch.countDown() 相当于一个通知方法,它会在计数器值达到 0 的时候唤醒相应实例上的所有等待线程。计数器的初始值是在 CountDownLatch 的构造参数中指定的,如下声明所示:public CountDownLatch(int count)
。
CountDownLatch 的使用是一次性的,当计数器的值达到 0 之后,该计数器的值就不再发生变化。
CyclicBarrier
有时候多个线程可能需要互相等待对方执行到代码中的某个地方,这时这些线程才能够继续执行。
使用 CyclicBarrier 实现等待的线程被称为参与方。除最后一个参与方外,任何参与方执行 CyclicBarrier.await() 都会导致线程被暂停。最后一个线程执行 CyclicBarrier.await() 会使得使用相应 CyclicBarrier 实例的其他所有参与方被唤醒,而最后一个线程自身并不会被暂停。
CyclicBarrier 使可以重复使用的。
阻塞队列
JDK 1.5 中引入的接口 java.util.concurrent.BlockingQueue 定义了一种线程安全的队列----阻塞队列,阻塞队列可用于线程之间传输数据,经典用例是在生产者-消费者模式中作为线程之间的传输通道。
阻塞队列按照其存储空间的容量是否受限制来划分,可分为有界队列和无界队列。有界队列的存储容量限制是由应用程序指定的,无界队列的最大存储容量为 Integer.MAX_VALUE。
一般而言,一个方法或操作如果能够导致其执行线程被暂停,那么我们就成相应的方法或操作为阻塞方法或者阻塞操作。在阻塞队列中,即有阻塞方法也有非阻塞方法,其中 take()/put() 方法对就是阻塞方法,poll()/offer() 方法对就是非阻塞方法。
Semaphore
JDK 1.5 中引入的标准库类 java.util.concurrent.Semaphore 被称为信号量。
Semaphore.acquire()/release() 分别用于申请配额和返还配额。Semaphore.acquire() 在成功获得一个配额后会立即返回,如果当前的可用配额不足,那么 Semaphore.acquire() 会暂停当前线程,直到其他线程通过 Semaphore.release() 返还了配额。
Semaphore.acquire() 和 Semaphore.release() 总是配对使用,Semaphore.release() 调用总是应该放在一个 finally 块中,以避免当前线程获取的配额无法返还。模板代码如下:
public void template(){
semaphore.acqiure();
try{
doSomething();
} finally {
semaphore.release();
}
}
PipedInputStream 和 PipedOutputStream
PipedInputStream 和 PipedOutputStream 是 InputStream 和 OutputStream 的子类,它们可用于实现线程间的直接输入和输出,而不必借用文件、数据库、网络连接等其他数据交换中介。
PipedInputStream 和 PipedOutputStream 适合在两个线程间使用,即适用于单生产者–单消费者的情形。
Exchanger
JDK 1.5 中引入的标准库类 Java.util.concurrent.Exchanger 可以用来实现双缓冲。Exchanger 相当于一个只有两个参与者的 CyclicBarrier。
生产者线程执行 Exchanger.exchange(V) 时将参数 x 指定为一个已经填充完毕的缓冲区,消费者线程执行 Exchanger.exchange(V) 时将参数 x 指定为一个空的阔这已经使用过的缓冲区。生产者线程在执行 Exchanger.exchange(V) 之后将进入等待状态,直到有消费者线程执行 Exchanger.exchange(V) 方法。
线程活性故障
死锁
两个或者更多线程互相等待对方释放需要的锁而被永远暂停的现象被称为死锁。
避免死锁的方法有:降低锁的粒度、指定获取多个锁时的顺序。
锁死
唤醒线程由于某种原因终止,导致需要被唤醒的线程永远处于等待状态的现象被称为锁死。
饥饿
指线程有机会获取需要的资源,但是由于并发量太大,等待获取相同资源的线程太多,每次都没有抢到资源的持有权而导致一致获取不到资源的现象被称为线程饥饿。
线程安全的对象设计
多线程编程下发生线程安全问题的原因是多个线程访问同一个对象的共享变量,为了保障线程安全我们需要在访问共享变量的代码区域添加锁,添加锁保证了多线程在并发访问共享变量的时候从并发执行降级为串行执行,这也减低了系统的吞吐量。我们通过对类进行一些设计,可以让多线程在访问对象的时候即使不使用任何线程同步机制也能够保证线程安全,这些对象被称为线程安全对象。其中包括无状态对象、不可变对象和线程特有对象。
无状态对象
无状态对象不含任何实例变量,且不包含任何静态变量或者其包含的静态变量都是只读的。
不可变对象
不可变对象是指一经创建其状态就保持不变的对象。不可变对象具有固有的线程安全性。
一个严格意义上的不可变对象要同时满足以下所有条件:
- 类本身使用 final 修饰,这是为了防止通过创建子类来改变其定义的行为。
- 所有字段都是用 final 修饰的,因为 final 修饰不仅仅是从语义上说明被修饰字段的值不可改变,更重要的是这个语义在多线程环境下保证了被修饰字段的初始化安全,即 final 修饰的字段在对其他线程可见时,它必定是初始化完成的。
- 对象在初始化过程中没有逸出,防止其他类在对象初始化过程中修改其状态
- 任何字段,若其引用了其他状态可变的对象,如集合、数组等,则这些字段必须是 private 修饰的,并且这些字段值不能对外暴露。若有相关方法要返回这些字段值,则应该进行防御性复制。
线程特有对象
每个线程创建各自的实例,一个实例只能被一个线程访问的对象被称为线程特有对象。在 Java 中,我们使用 ThreadLocal 来实现线程特有对象。
并发集合
JDK 1.5 的 java.util.concurrent 包中引入了一些线程安全的集合对象,它们被称为并发集合,这些对象通常作为同步集合的替代品,它们与常用的非线程安全集合对象之间的对应关系如下表所示:
非线程安全对象 | 并发集合类 | 共同接口 | 遍历实现方式 |
---|---|---|---|
ArrayList | CopyOnWriteArrayList | List | 快照 |
HashSet | CopyOnWriteArraySet | Set | 快照 |
LinkedList | ConcurrentLinkedQueue | Queue | 准实时 |
HashMap | ConcurrentHashMap | Map | 准实时 |
TreeMap | ConcurrentSkipListMap | SortedMap | 准实时 |
TreeSet | ConcurrentSkipListSet | SortedSet | 准实时 |
线程管理
线程的未捕获异常
如果线程的 run 方法抛出未捕获的异常,那么随着 run 方法的退出,相应的线程也提前终止。对于线程的这种异常终止,JDK 1.5 为了解决这个问题引入了 UncaughtExceptionHandler 接口(未捕获异常处理器),该接口定义在 Thread 类内部,它只包含一个方法,所以它是一个函数式接口:
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* Method invoked when the given thread terminates due to the
* given uncaught exception.
* <p>Any exception thrown by this method will be ignored by the
* Java Virtual Machine.
* @param t the thread
* @param e the exception
*/
void uncaughtException(Thread t, Throwable e);
}
Thread 类中定义两个 UncaughtExceptionHandler 了,一个是实例变量,一个是静态变量:
// null unless explicitly set
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
// null unless explicitly set
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;
实例变量引用的 UncaughtExceptionHandler 是每个线程独有的,静态变量引用的 UncaughtExceptionHandler 是所有线程公有的,Thread 在抛出未捕获异常后会先使用实例变量引用的 UncaughtExceptionHandler,如果实例变量为 null(没有定义),则会使用静态变量的 UncaughtExceptionHandler。这两个变量都定义了 getter/setter 方法,用户可以自定义线程的未捕获异常处理器。
在线程的 run 方法抛出未捕获异常而终止线程之前,JVM 会运行 UncaughtExceptionHandler.uncaughtException() 方法,我们可以在该方法中做一些有意义的事情,比如将线程异常终止的相关信息记录到日志文件中,甚至为异常终止的线程创建并启动一个替代线程。
线程池
线程是一种昂贵的资源,其开销主要包括以下几个方面。
- 线程的创建与启动的开销。与普通的对象相比,Java 线程还占用了额外的存储空间----栈空间。并且,线程的启动会产生相应的线程调度开销。
- 线程的销毁。
- 线程调度的开销。线程的调度会导致上下文切换,从而增加处理器资源的消耗,使得应用程序本身可以使用的处理器资源减少。
在 Java 中我们会为大对象定义对象池,来避免频繁的创建大对象,Thread 也是一个对象,我们也可以使用对象池来维护一定数量的 Thread,这就是线程池,不过它的实现方式与普通的对象池不同,线程池内部可以预先创建一定数量的工作者线程,客户端代码并不需要向线程池借用线程而是将其需要执行的任务作为一个对象提交给线程池,线程池可能将这些任务缓存在工作队列之中,而线程池内部的各个工作者线程则不断地从队列中取出任务并执行。因此,线程池可以被看做基于生产者----消费者模式的一种服务,该服务内部维护的工作者线程相当于消费者,线程池的客户端线程相当于生产者线程,客户端代码提交给线程池的任务相当于 “产品”,线程池内部用于缓存任务的队列相当于传输通道。
Java 中线程池的顶级接口是 java.util.concurrent.Executor,常用的实现类是 java.util.concurrent.ThreadPoolExecutor,创建线程池是需要定义七大参数,反映在代码中是 ThreadPoolExecutor 中包含七个参数的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePollSize:线程池的核心线程数
- maximumPoolSize:线程池的最大线程数
- keepAliveTime:空闲线程的存活时间
- unit:时间单位
- workQueue:工作队列
- threadFactory:线程工厂
- handler:拒绝策略
将一个任务提交到线程池后,任务的生命周期如下:
- 如果提交任务时,线程中当前线程数小于核心线程数,则直接创建线程来执行任务
- 如果提交任务时,线程中当前线程数大于核心线程数,则将任务添加到工作队列中
- 如果工作队列已满,则直接创建线程来执行任务
- 如果工作队列已满,且当前线程数等于最大线程数,则执行配置的拒绝策略
submit 和 execute
线程池中定义了两种提交任务的方法,分别时 submit 和 execute,其定义如下:
void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
execute 方法没有返回值,使用 execute 方法提交的任务在线程抛出未捕获异常后会执行配置的未捕获异常处理器,submit 方法返回一个 Future 对象,它代表执行任务后返回结果的抽象,使用 submit 方法提交的任务在线程抛出未捕获异常后不会执行配置的未捕获异常处理器,因为线程池将未捕获异常传递给了 Future,当通过调用 Future.get() 方法获取任务结果的时候即可获取未捕获异常。如果在工作者线程还没有执行结束的时候调用 Future.get() 方法,则会阻塞调用线程,直到工作者线程执行结束之后,Future.get() 才会返回。通过 Future.isDone() 方法可以判断工作者线程时候已经执行结束,Future.cancel(boolean mayInterruptIfRunning) 方法可以取消任务的执行,当此时任务还在等待队列中时,则会移除并不再执行该任务,mayInterruptIfRunning 为 ture 时会给工作者线程发送中断请求。
实用工具类
JDK 标准类库定义了线程池的实用工具类 java.util.concurrent.Executors,使用它可以快速创建线程池。
- Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
即一个核心线程数为 0,最大线程数为 Integer.MAX_VALUE,工作者线程允许的最大空闲空间为 60 秒,内部以 SynchronousQueue 为工作队列的一个线程池。
从以上定义我们可以看出该线程池中的任务不会进入等待队列而是直接创建线程池来执行,且线程池中的线程数量可以看成是无限的,线程在空闲了 60s 后会被线程池回收。
- Executors.newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
即一个以无界队列为工作队列,核心线程数等于最大线程数等于 nThreads,且空闲工作者线程不会被自动清理的线程池。这是一种线程池大小一旦达到其核心线程池大小就既不会增加也不会减少工作者线程的固定大小的线程池。因此,这样的线程池实例一旦不再需要,我们必须主动将其关闭。
- Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
该线程池类似于 Executors.newFixedThreadPoll(1) 所返回的线程池。该线程池便于我们实现单生产者----单消费者模式。
Java 多线程程序的调试与测试
一个真实的 Java 系统运行时往往有上百个线程在运行,如果没有相应的工具能够对这些线程进行监视,那么这些线程对于我们来说就是黑盒。对线程进行监视的主要途径时获取并查看程序的线程转储(Thread Dump)。一个线程转储包含了获取这个线程转储的那一刻该程序的线程信息。这些信息包括程序中有哪些线程以及这些线程的具体信息。
获取线程转储的方式如下所示:
- 执行命令:jstack -l PID
- jvisualvm 工具
- JMC 工具