线程并行的逻辑
一个线程问题
起手先来看一个线程问题:
public class NumberExample {
private int cnt = 0;
public void add() {
cnt++;
}
public int get() {
return cnt;
}
}
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
NumberExample example = new NumberExample ();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
此处启用1000个线程对NumberExample的一个对象example进行操作,输出的结果却总是小于1000,此处即是线程出现了问题。追溯到底层,线程出现问题的根源无非三种:
- 原子性
- 有序性
- 可见性
原子性: 分时复用引起
原子性的定义是一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。但是由于CPU分时复用(线程切换)的存在,会破坏一段代码我们概念里的原子性,CPU在切换线程时,会把我们理解的“一个原子操作”(一个完整逻辑)打乱
int i = 1;
// 线程1执行
i += 1;
// 线程2执行
i += 1;
上述 i+=1,在CPU中需要三条指令:
- 将变量 i 从内存读取到 CPU寄存器;
- 在CPU寄存器中执行 i + 1 操作;
- 将最后的结果i写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。
线程1执行了第一条指令后,就切换到线程2执行,假如线程2执行了这三条指令后,再切换会线程1执行后续两条指令,将造成最后写到内存中的i值是2而不是3。
可见性: CPU缓存引起
可见性:一个线程对共享变量的修改,另外一个线程能够立刻看到。
举个简单的例子:
//线程1执行的代码
int i = 0;
i = 10;
//线程2执行的代码
j = i;
假若执行线程1的是CPU1,执行线程2的是CPU2。由上面的分析可知,当线程1执行 i =10这句时,会先把i的初始值加载到CPU1的高速缓存中,然后赋值为10,那么在CPU1的高速缓存当中i的值变为10了,却没有立即写入到主存当中。
此时线程2执行 j = i,它会先去主存读取i的值并加载到CPU2的缓存当中,注意此时内存当中i的值还是0,那么就会使得j的值为0,而不是10.
这就是可见性问题,线程1对变量i修改了之后,线程2没有立即看到线程1修改的值。
有序性: 重排序引起
有序性即程序执行的顺序按照代码的先后顺序执行。
int i = 0;
boolean flag = false;
i = 1;
flag = true;
上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗? 不一定,为什么呢? 这里可能会发生指令重排序(Instruction Reorder)。
在执行程序时为了提高性能,编译器和处理器常常会对指令做重排序。重排序分三种类型:
- 编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
- 指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism, ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
- 内存系统的重排序。由于处理器使用缓存和读 / 写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
Java中的线程实现机制
不像GC,JVM提供了一套安全自动的机制,多线程角度Java并没有一套自动且安全的线程机制,只是提供了一些基础支持和技术点,将灵活多变的线程问题交给程序员。Java提供支持的主要核心还是对于原子性、有序性、可见性的处理。
原子性
x = 10;
在Java中直接将数值10赋值给x,也就是说线程执行这个语句的会直接将数值10写入到工作内存中
y = x;
x++;
但是上述两个语句,其中 y=x 包含2个操作,它先要去读取x的值,再将x的值写入工作内存,虽然读取x的值以及 将x的值写入工作内存 这2个操作都是原子性操作,但是合并在一起就不是原子性了。 x++ 则包括3个操作:读取x的值,进行加1操作,写入新的值。
Java内存模型只保证了基本读取和赋值是原子性操作,如果要实现更大范围操作的原子性,可以通过synchronized和Lock来实现。由于synchronized和Lock能够保证任一时刻只有一个线程执行该代码块,那么自然就不存在原子性问题了,从而保证了原子性。
可见性
Java提供了volatile关键字来保证可见性。当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。而普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。
而 volatile 变量的内存可见性是基于内存屏障(Memory Barrier)实现,内存屏障又叫内存栅栏,是一个 CPU 指令。在程序运行时,为了提高执行性能,编译器和处理器会对指令进行重排序,JMM 为了保证在不同的编译器和 CPU 上有相同的结果,通过插入特定类型的内存屏障来禁止+ 特定类型的编译器重排序和处理器重排序,插入一条内存屏障会告诉编译器和 CPU:不管什么指令都不能和这条 Memory Barrier 指令重排序。
public class Test {
private volatile int number;
public void update() {
number = 100;
}
}
public class App{
public static void main(String[] args) {
Test test = new Test();
test.update();
}
}
通过 hsdis 和 jitwatch 工具可以得到编译后的核心部分汇编代码:
0x0000000002951563: and $0xffffffffffffff87,%rdi
0x0000000002951567: je 0x00000000029515f8
0x000000000295156d: test $0x7,%rdi
0x0000000002951574: jne 0x00000000029515bd
0x0000000002951576: test $0x300,%rdi
0x000000000295157d: jne 0x000000000295159c
0x000000000295157f: and $0x37f,%rax
0x0000000002951586: mov %rax,%rdi
0x0000000002951589: or %r15,%rdi
0x000000000295158c: lock cmpxchg %rdi,(%rdx) //在 volatile 修饰的共享变量进行写操作的时候会多出 lock 前缀的指令
0x0000000002951591: jne 0x0000000002951a15
0x0000000002951597: jmpq 0x00000000029515f8
0x000000000295159c: mov 0x8(%rdx),%edi
0x000000000295159f: shl $0x3,%rdi
0x00000000029515a3: mov 0xa8(%rdi),%rdi
0x00000000029515aa: or %r15,%rdi
lock 前缀的指令在多核处理器下会引发两件事情:
- 将当前处理器缓存行的数据写回到系统内存。
- 写回内存的操作会使在其他 CPU 里缓存了该内存地址的数据无效。
为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存(L1,L2 或其他)后再进行操作,但操作完不知道何时会写到内存。如果对声明了 volatile 的变量进行写操作,JVM 就会向处理器发送一条 lock 前缀的指令,将这个变量所在缓存行的数据写回到系统内存。为了保证各个处理器的缓存是一致的,实现了缓存一致性协议(MESI),每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里。
所有多核处理器下还会完成:当处理器发现本地缓存失效后,就会从内存中重读该变量数据,即可以获取当前最新值。
volatile 变量通过这样的机制就使得每个线程都能获得该变量的最新值。
另外,通过synchronized和锁机制也能够保证可见性,synchronized和锁机制能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中。因此可以保证可见性。关于锁和synchronized的内容,后续会单独梳理
有序性
在Java里面,有一套Happens-Before 规则,具体包括:
- 单一线程原则:在一个线程内,在程序前面的操作先行发生于后面的操作。
- 管程锁定规则:一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。
- volatile 变量规则:对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作
- 线程启动规则:Thread 对象的 start() 方法调用先行发生于此线程的每一个动作
- 线程加入规则:Thread 对象的结束先行发生于 join() 方法返回
- 线程中断规则:对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 interrupted() 方法检测到是否有中断发生。
- 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。
- 传递性:如果操作 A 先行发生于操作 B,操作 B 先行发生于操作 C,那么操作 A 先行发生于操作 C。
在Happens-Before规则的基础上通过volatile关键字来保证一定的“有序性”。基于volatitle变量规则,对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。
//假设线程A执行writer方法,线程B执行reader方法
class VolatileTest {
int a = 0;
volatile boolean flag = false;
public void writer() {
a = 1;
flag = true;
}
public void reader() {
if (flag) {
int i = a;
……
}
}
}
public static void main(String[] args) {
final VolatileTest example = new VolatileTest();
// 创建线程A,执行writer方法
Thread threadA = new Thread(() -> example.writer(example));
// 创建线程B,执行reader方法
Thread threadB = new Thread(() -> {
// 等待线程A设置flag为true
example.reader(example);
});
threadA.start();
threadB.start();
}
}
根据 happens-before 规则,上面过程会建立 3 类 happens-before 关系。
- 根据程序次序规则:1 happens-before 2 且 3 happens-before 4。
- 根据 volatile 规则:2 happens-before 3。
- 根据 happens-before 的传递性规则:1 happens-before 4。
因为以上规则,当线程 A 将 volatile 变量 flag 更改为 true 后,线程 B 能够迅速感知。
另外可以通过synchronized和锁机制来保证有序性,很显然,synchronized和锁保证每个时刻是有一个线程执行同步代码,相当于是让线程顺序执行同步代码,自然就保证了有序性。当然JMM是通过Happens-Before 规则来保证有序性的。
线程的状态
新建(New)
创建后尚未启动。
可运行(Runnable)
可能正在运行,也可能正在等待 CPU 时间片。
包含了操作系统线程状态中的 Running 和 Ready。
阻塞(Blocking)
等待获取一个排它锁,如果其线程释放了锁就会结束此状态。
无限期等待(Waiting)
等待其它线程显式地唤醒,否则不会被分配 CPU 时间片。
进入方法 | 退出方法 |
---|---|
没有设置 Timeout 参数的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
没有设置 Timeout 参数的 Thread.join() 方法 | 被调用的线程执行完毕 |
LockSupport.park() 方法 | - |
限期等待(Timed Waiting)
无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。
调用 Thread.sleep() 方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。
调用 Object.wait() 方法使线程进入限期等待或者无限期等待时,常常用“挂起一个线程”进行描述。
睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。
阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,通过调用 Thread.sleep() 和 Object.wait() 等方法进入。
进入方法 | 退出方法 |
---|---|
Thread.sleep() 方法 | 时间结束 |
设置了 Timeout 参数的 Object.wait() 方法 | 时间结束 / Object.notify() / Object.notifyAll() |
设置了 Timeout 参数的 Thread.join() 方法 | 时间结束 / 被调用的线程执行完毕 |
LockSupport.parkNanos() 方法 | - |
LockSupport.parkUntil() 方法 | - |
死亡(Terminated)
可以是线程结束任务之后自己结束,或者产生了异常而结束
Java中线程使用方法
创建线程
有三种使用线程的方法:
- 实现 Runnable 接口;
- 实现 Callable 接口;
- 继承 Thread 类。
实现 Runnable 接口
实现Java中的Runnalbel,主要是实现 run() 方法,然后通过 Thread 调用 start() 方法来启动线程。
public class MyRunnable implements Runnable {
public void run() {
System.out.print("this is thread")
}
}
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start();
}
实现 Callable 接口
与 Runnable
接口不同,Runnable
任务没有返回结果,而 Callable
任务可以返回一个 Future
对象,该对象可以用于获取任务的结果。同样是使用Thread调用start方法启动线程
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
继承 Thread 类
Thread类本身即是实现了Runable 接口,继承Thread类实现run方法即可
public class MyThread extends Thread {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
线程控制
线程休眠—Sleep
sleep(millisec) 是Thread对象的方法,会休眠当前正在执行的线程,sleep() 可能会抛出 InterruptedException,因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。
public class SleepExample {
public static void main(String[] args) {
// 创建第一个线程
Thread thread1 = new Thread(() -> {
System.out.println("Thread 1 is running.");
try {
for(int i = 0; i < 10;i++){
Thread.sleep(2000);
System.out.println("Thread 1 out:"+i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建第二个线程
Thread thread2 = new Thread(() -> {
for(int i = 10; i < 20;i++){
System.out.println("Thread 2 out:"+i);
}
});
// 启动线程
thread1.start();
thread2.start();
}
}
线程让出资源—yield
yield()
方法是 Thread
类的一个静态方法,它可以让当前正在执行的线程放弃当前的CPU时间片,从而允许相同优先级的其他线程有机会执行。这个方法并不会使线程进入阻塞状态,而是让线程回到可运行状态(running state),并且可能在下一次时间片被调度执行。
注意:该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。
线程通信
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。
Join
在Java中,join
是一个线程Thread对象的方法,它允许一个线程等待另一个线程完成执行。具体来说,当一个线程A调用另一个线程B的 join()
方法时,线程A会暂停执行,直到线程B完成其任务并终止。
public class ThreadJoinTest {
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
System.out.println("Thread 1 is running.");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 1 has finished.");
});
Thread thread2 = new Thread(() -> {
try {
thread1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 2 starts after Thread 1 has finished.");
});
thread1.start();
thread2.start();
}
}
thread2
调用了 thread1.join()
,这意味着 thread2
会等待 thread1
完成执行后才开始运行。
join的使用关键点
-
阻塞当前线程:调用
join()
的线程将被阻塞,直到被调用join()
的线程完成执行。 -
确保线程执行顺序:
join
方法可以用来确保线程的执行顺序,使得调用join()
的线程在被调用线程执行完成后再继续执行。 -
重载方法:
join()
方法有两个重载版本:public final void join()
:无限期地等待该线程终止。public final synchronized void join(long millis)
:等待该线程终止的时间最长为指定的毫秒数,或者该线程终止。
-
异常处理:如果线程在等待期间被中断,
join()
方法会抛出一个InterruptedException
。 -
同步机制:由于
join()
是一个同步方法,因此它可以用来实现线程间的同步。 -
使用场景:
join
方法通常用于确保在执行某些操作之前,必须先完成其他线程中的某些任务。
wait() notify() notifyAll()
调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。
这些方法都是定义在 Object
类中的,因此它们可以被任何Java对象使用。
public class WaitNotifyExample {
private int ticket = 0;
public synchronized void sellTicket() {
while (ticket <= 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ticket--;
System.out.println("Sold a ticket, remaining: " + ticket);
notify();
}
public synchronized void printTickets() {
while (ticket > 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("No tickets left.");
notifyAll();
}
public static void main(String[] args) {
WaitNotifyExample example = new WaitNotifyExample();
Thread seller = new Thread(() -> {
for (int i = 0; i < 5; i++) {
example.sellTicket();
}
});
Thread printer = new Thread(() -> {
example.printTickets();
});
seller.start();
printer.start();
}
}
sellTicket()
方法在没有票可卖时调用 wait()
,而 printTickets()
方法在有票时调用 wait()
。当票被卖出或打印完时,相应的方法会调用 notify()
来唤醒等待的线程。
wait() 和 sleep() 的区别
- wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
- wait() 会释放锁,sleep() 不会。
线程池
Executor框架主要用于管理线程和执行异步任务。它提供了一种更简单、更强大的方式去管理线程,比手动创建和管理线程更加高效和方便。
Executor框架包含:
- Executor - 一个接口,定义了执行提交的
Runnable
任务的方法。 - Executors - 一个工厂类,用于创建不同类型的线程池。
newCachedThreadPool()
: 创建一个可根据需要创建新线程的线程池,但是会回收空闲线程。newFixedThreadPool(int nThreads)
: 创建一个具有固定数量线程的线程池。newSingleThreadExecutor()
: 创建一个单线程的执行器,即只有一个线程执行任务。newScheduledThreadPool(int corePoolSize)
: 创建一个支持定时及周期性任务执行的线程池。
-
ThreadPoolExecutor - 一个具体的线程池实现,允许更精细的控制。
- 可以设置核心线程数、最大线程数、工作队列、线程存活时间等。
-
ScheduledExecutorService - 一个扩展了
ExecutorService
的接口,支持延迟和周期性的任务执行。 -
ExecutorService - 一个扩展了
Executor
的接口,提供了额外的方法来控制任务的生命周期,例如:submit(Runnable task)
: 提交一个任务用于执行,并返回一个Future
对象,表示异步执行的结果。shutdown()
: 启动有序关闭。awaitTermination(long timeout, TimeUnit unit)
: 等待线程池关闭。invokeAll(Collection<? extends Callable<T>> tasks)
: 执行给定的任务集合。
-
Future - 一个接口,表示异步计算的结果。可以用来检查任务是否完成,取消任务,以及获取计算结果。
-
Callable - 一个接口,类似于
Runnable
,但它可以返回结果和抛出异常。
public class ExecutorExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务到线程池
for (int i = 0; i < 5; i++) {
int finalI = i;
executor.submit(() -> {
System.out.println("Task " + finalI + " executed by " + Thread.currentThread().getName());
});
}
// 关闭线程池
executor.shutdown();
}
}
同其他“池”一样,实际在Java中使用多线程多数以线程池的形式,使用线程池的好处有:
- 资源优化:线程池可以重用线程,减少了创建和销毁线程的开销。
- 提高响应速度:任务提交后可以立即返回,线程池中的空闲线程可以立即执行任务。
- 提高吞吐量:通过合理配置线程池,可以提高任务执行的吞吐量。
- 管理任务:可以统一管理任务的执行,包括提交任务、取消任务、等待任务完成等。