1、线程的创建
创建线程的方式有两种,
- 第一种是通过继承 Thread 类,重写run 方法;
- 第二种是通过实现 Runnable 接口
通过源码发现,创建线程只有一种方式那就是构造 Thread 类,而实现线程的执行单元则有两种方式,第一种是重写 Thread 类的 run 方法,第二种是实现 Runnable 接口的 run方法,并且将 Runnable 实例用作构造 Thread 的参数。
1.2、 拓展
Thread 类的 run 方法是不能和共享的,也就是说 A 线程不能把 B 线程的 run 方法当作自己的执行单元,而使用 Runnable 接口则很容易就能实现这一点,使用同一个Runnable 的实例构造不同的 Thread 实例。
2、线程的生命周期
2.1、NEW
用new 创建一个Thread对象时,它并不处于执行状态,因为调动start方法去启动该线程;此时线程状态为new状态,它只是thread对象的状态,在没有用start方法去启动该线程之前,该线程根本不存在,此时它与new 的一个普通对象没有区别。
new 状态时,通过start 进入Runnable状态。
2.2、Runnable
此时真正在JVM进程中创建了一个线程
此时是可执行状态,真正执行起来还要等CPU的调度。
严格来讲,Runnable 的线程只能意外终止或者进入 Running状态。
2. 3、Running
一旦 CPU 通过轮询或其他方式从任务可执行队列中选中了线程,那么此时它才能真正
地执行 run 方法里的逻辑代码。
可以发生如下状态转化:
- 直接进入Terminated状态,比如调用JDK已经不推荐使用的stop方法或者意外死亡
- 进入Blocked状态,如调用sleep、wait方法 join();或阻塞IO操作,获取某个锁资源从而加入到该锁的阻塞队列
- 进入Runnable状态 由于CPU调度器轮询放弃执行;线程主动调用yield方法,放弃CPU执行权
2. 4、Blocked
线程在Blocked状态可以切换到如下状态:
- 直接进入Terminated状态,比如调用JDK已经不推荐使用的stop方法或者意外死亡
- 进入Runnable状态。线程阻塞结束、获取到了某个锁资源、完成了指定时间的休眠、wait中的线程被其他线程notify/notifyall唤醒;线程在阻塞中被打断,比如其他线程调用了interrupt方法
2. 5、Terminated
Terminated是一个线程的最终转态,不会再切换到其他任何状态。
以下情况会导致线程进入Terminated状态。
- 线程正常结束
- 线程运行出错,意外结束
- JVM Crash,导致所有线程都结束
3、 深入理解 Thread 构造器
3.1Thread 命名
3.1.1 线程的默认命名
如果没有为线程显示的指定一个名字,那么线程将会以“Thread-”作为前缀与一个自增数字进行组合,这个自增数字在整个 JVM 进程中将会不断自增,且加了 synchronized 不会出现重复的情况:
for (int i = 0; i < 20; i++) {
new Thread(() -> System.out.println(Thread.currentThread().getName())).start();
}
3.1.2 修改线程的名字
修改线程的名称可以借助 setName 方法进行修改
new Thread(() -> {
System.out.println(Thread.currentThread().getName()); //Thread-0
Thread.currentThread().setName("测试线程");
System.out.println(Thread.currentThread().getName());//测试线程
}).start();
3.2 线程的父子关系
Thread 的所有构造函数,最终都会去调一个私有构造器,我们截取片段代码对其进行分析,不难发现新创建的任何一个线程的都会有一个父线程:
- 线程的创建是由另一个线程完成的;
- 被创建的线程属于创建它的线程的子线程。
- main 函数所在的线程是由 JVM 创建的,也就是 main 线程,那就意味着我们前面创建的所有线程,其父线程都是 main 线程。
3.3 Thread 与 ThreadGroup
在 Thread 的构造函数中,可以显示的指定线程的 Group
- main 线程所在的 ThreadGroup 名称为 main
- 如果在构造 Thread 的时候没有显示的指定一个ThreadGroup,那么子线程将会被加入父线程所在的线程组。
- 在默认设置中,当然除了子线程会和父线程同属于一个 Group 之外,它还会和父线程拥有同样的优先级,同样的 daemon,
3.4 守护线程
守护线程是一种比较特殊的线程,一般用于处理一些后台的工作,比如 JDK 的垃圾回收线程
如果JVM中有非守护线程存在,则JVM的进程不会退出
// t1.setDaemon(true); // 是否设置为守护线程
3.4.1 守护线程的作用
守护线程经常用作与执行一些后台任务,因此有时它也被称为后台线程,当你希望关闭某些线程的时候,或者退出 JVM 进程的时候,一些线程能够自动关闭,此时就可以考虑用守护线程为你完成这样的工作。
4、ThreadAPI 的详细介绍
4.1 sleep
4.1.1sleep 方法介绍
public static void sleep (long millis) throws InterruptedException
public static void sleep (long millis, int nanos) throws InterruptedException
sleep 方法会使当前线程进入指定毫秒数的休眠,暂停执行,虽然给定了一个休眠的时间,但是最终要以系统的定时器和调度器的精度为准,休眠有一个非常重要的特性,那就是其不会放弃 monitor 锁的所有权
Thread.sleep 只会导致当前线程进入指定时间的休眠。
4.1.2 使用 TimeUniT 替代 Thread.sleep
在 JDK1.5 以后,JDK 引入了一个枚举 TimeUnit,其对 sleep 方法提供了很好的封装,使用它可以省去时间单位的换算步骤,比如线程想休眠 3 小时 24 分 17 秒 88 毫秒,使用TimeUnit 来实现就非常简便优雅了
Thread.sleep(12257088L);
TimeUnit.HOURS.sleep(3);
TimeUnit.MINUTES.sleep(3);
TimeUnit.SECONDS.sleep(3);
TimeUnit.MILLISECONDS.sleep(3);
同样的时间表达,TimeUnit 显然清晰很多,强烈建议在使用 Thread.sleep 的地方,完全使用 TimeUnit 来代替,因为 sleep 能做的事情,TimeUnit 全部都能完成
4.1.3 sleep(0)
Thread.sleep(0) 是你的线程暂时放弃 cpu,也就是释放一些未用的时间片给其他线程或进程使用,就相当于一个让位动作。在线程中,调用 sleep(0)可以释放 cpu 时间,让线程马上重新回到就绪队列而非等待队列,sleep(0)释放当前线程所剩余的时间片(如果有剩余的话),这样可以让操作系统切换其他线程来执行,提升效率。
4.2、线程 yield
yield 方法属于一种启发式的方法,其会提醒调度器我愿意放弃当前的 CPU 资源,如果 CPU 的资源不紧张,则会忽略这种提醒。调用 yield 方法会使当前线程从 Running 状态切换到 Runnable 状态。
yield 只是一个提示(hint),CPU 调度器并不会担保每次都能满足 yield 提示。
在 JDK1.5 以前的版本中 yield 的方法事实上是调用了 sleep(0),但是他们之间存在着本质的区别,具体如下:
- sleep 会导致当前线程暂停指定的时间,没有 CPU 时间片的消耗;
- yield 只是对 CPU 调度器的一个提示,如果 CPU 调度器没有忽略这个提示,它会导致线程上下文的切换;
- sleep 会使线程短暂 block,会在给定的时间内试放 CPU 资源;
- yield 会使 Running 状态的 Thread 进入 Runnable 状态(如果 CPU 调度器没有忽略这个提示的话);
- sleep 几乎百分之百的完成了给定时间的休眠,而 yield 的提示并不能一定保证。
4.3、设置线程优先级
public final void setPriority (int newPriority)
public final int getPriority()
4.3.1、线程优先级介绍
理论上是优先级比较高的线程会获取优先被 CPU 调度的机会,但是事实上往往并不会如你所愿,设置线程的优先级同样也是一个 hint(暗示)操作。
- 对于 root 用户,他会 hint 操作系统你想要设置的优先级别,否则它会被忽略。
- 如果 CPU 比较忙,设置优先级可能会获得更多的 CPU 时间片,但是闲时优先级的高低几乎不会有任何作用。所以,不要在程序设计当中企图使用线程优先级绑定某些特定的业务,或者让业务严重依赖于线程优先级,这可能会让你大失所望。
4.3.2、线程优先级源码分析
设置线程优先级源码:
通过源码的分析,我们可以看出,线程的优先级不能小于 1 也不能大于 10,如果指定的线程优先级大于线程所在 group 的优先级,那么指定的优先级将会失败,取而代之的是 group 的最大优先级。
线程默认的优先级和它的父类保持一致,一般情况下都是 5,因为 main 线程的优先级就是 5,所以它派生出来的线程都是 5。
4.3.3、关于优先级的一些总结
一般情况下,不会对线程设定优先级别,更不会让某些业务严重的依赖线程的优先级别,比如权重,借助优先级设定某个任务的权重,这种方式是不可取的,一般定义线程的时候使用默认的优先级就好了。
4.4、获取线程 ID
public long getId() 获取线程的唯一 ID,线程的 ID 在整个 JVM 进程中都会是唯一的,并且是从 0 开始逐次递增。如果你在 main 线程(main 函数)中创建了一个唯一的线程,并且调用 getId()后发现其并不等于 0,也许你会纳闷,不应该是从 0 开始的吗?
这是因为在一个 JVM 进程启动的时候,实际上是开辟了很多个线程,自增序列已经有了一定的消耗,因此我们自己创建的线程绝非第 0 号线程。
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName()); //Thread-0
Thread.currentThread().setName("测试线程");
System.out.println(Thread.currentThread().getName());//测试线程
});
thread.start();
System.out.println(thread.getId());//11
4.5、获取当前线程
public static Thread currentThread() 用于返回当前执行线程的引用
Thread t1 = new Thread(){
@Override
public void run() {
System.out.println(Thread.currentThread() == this);//true
}
};
t1.start();
String name = Thread.currentThread().getName();
System.out.println("main".equals(name)); //true
4.6、线程 interrupt
线程 interrupt,是一个非常重要的 API,也是经常使用的方法,与线程中断相关,相关的 API 有以下几个。
- public void interrupt()
- public static boolean interrupted()
- public boolean isInterrupted()
4.6.1、interrupt
以下方法的调用会使得当前线程进入阻塞状态,而调用当前线程的 interrupt 方法,就可以打断阻塞。
- Object 的 wait 方法;
- Object 的 wait(long)方法;
- Object 的 wait(lOng, int)方法;
- Object 的 sleep(long)方法;
- Thread 的 sleep(long)方法;
- Thread 的 join 方法;
- Thread 的 join(long)方法;
- Thread 的 join(long, int)方法;
- InterruptIbleChannel 的 io 操作;
- Selector 的 wakeup 方法。
上述若干方法都会使得当前线程进入阻塞状态,若另外的一个线程调用被阻塞线程的interrupt 方法,则会打断这种阻塞,因此这种方法有时会被称为可中断方法,注意,打断一个线程并不等于该线程的生命周期结束,仅仅是打断了当前线程的阻塞状态。
一旦线程在阻塞的情况下被打断,都会抛出一个称为 InterruptedException 的异常,这个异常就像一个 signal(信号)一样通知当前线程被打断了。
Thread t1 = new Thread(() -> {
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
System.out.println("阻塞状态被中断");
e.printStackTrace();
}
});
t1.start();
// 短暂的阻塞是为了保证 t1 线程已启动
TimeUnit.MILLISECONDS.sleep(100);
// 中断 t1 线程的阻塞状态
t1.interrupt();
在一个线程内部存在着 名为interrupt flag 的标识,如果一个线程被 interrupt,那么它的 flag 将被设置,但是如果当前线程正在执行可中断方法被阻塞时,调用 interrupt 方法将其中断,反而会导致flag 被清除,关于这点我们在后面还会做详细的介绍。另外有一点需要注意的是,如果一个线程已经是死亡状态,那么尝试对其的 interrupt 会直接被忽略。
4.6.2、isInterrupted
isInterrupted 是 Thread 的一个成员方法,它主要判断当前线程是否被中断,该方法仅仅是对 interrupt 标识的一个判断,并不会影响标识发生任何改变,这个与我们即将学习到的 interrupted 是存在差别的。
Thread t1 = new Thread() {
@Override
public void run() {
while (true) {
}
}
};
t1.setDaemon(true);
t1.start();
TimeUnit.MILLISECONDS.sleep(100);
System.out.printf("Thread is interrupted ? %s\n", t1.isInterrupted()); //Thread is interrupted ? false
t1.interrupt();
System.out.printf("Thread is interrupted ? %s\n", t1.isInterrupted()); //Thread is interrupted ? true
上面的代码中定义了一个线程,并且在线程的执行单元中(run 方法)写了一个空的死循环,为什么不写 sleep 呢?因为 sleep 是可中断方法,会捕获到中断信号,从而干扰我们程序的结果。下面是程序运行的结果,记得手动结束上面的程序运行,或者你也可以将上面定义的线程指定为守护线程,这样就会随着主线程的结束导致 JVM 中没有非守护线程而自动退出。
可中断方法捕获到了中断信号之后,为了不影响线程中其他方法的执行,将线程的 interrupt 标识复位是一种很合理的设计
4.6.3、interrupted
interrupted 是一个静态方法,虽然其也用于判断当前线程是否被中断,但是它和成员方法 isInterrupted 还是又很大的区别,调用该方法会直接擦除掉线程的 interrupt标识,需要注意的是,如果当前线程被打断了,那么第一次调用 interrupted 方法会返回true,并且立即擦除了 interrupt 标识;第二次包括以后的调用永远都会返回 false,除非在此期间线程又一次被打算。
Thread t1 = new Thread() {
@Override
public void run() {
while (true) {
System.out.println(Thread.interrupted());
}
}
};
t1.setDaemon(true);
t1.start();
// 短暂的阻塞是为了保证 t1 线程已启动
TimeUnit.MILLISECONDS.sleep(2);
// 中断 t1 线程的阻塞状态
t1.interrupt();
在很多的 false 包围中发现了一个 true,也就是说 interrupted 方法判断到了其被中断,立即擦除了中断标识,并且只有这一次返回 true,后面的都将会是 false。
4.6.4、interrupt 注意事项
打开 Thread 的源码,不难发现, isInterrupted方法和 interrupted 方法都调用了同一个本地方法:
其中参数 ClearInterrupted 主要用来控制是否擦除线程 interrupt 的标识。
isInterrupted 方法的源码中该参数为 false,表示不想擦除
而 interrupted 静态方法中该参数则为 true,表示想要擦除:
4.7、线程 join
Thread 的 join 方法同样是一个非常重要的方法,使用它的特性可以实现很多比较强大的功能,Thread 的 API 为我们提供了三个不同的 join 方法,具体如下。
- public final void join() throws InterruptedException
- public final void join (long millis) throws InterruptedException
- public final void join (long millis, int nanos)throws InterruptedException
4.7.1、线程 join 方法详解
join 某个线程 A,会使当前线程 B 进入等待,直到线程 A 结束生命周期,或者到达给定的时间,那么在此期间 B 线程是处于 Blocked 的,而不是 A 线程。
// 1. 定义两个线程
Thread t1 = new Thread(() -> printNum());
Thread t2 = new Thread(() -> printNum());
// 2. 启动这两个线程
t1.start();
t2.start();
// 3. 执行这两个线程的 join 方法
t1.join();
t2.join();
// 4. main 线程循环输出
printNum();
}
private static void printNum() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "#" + i);
}
结合 Java8 的语法,创建了两个线程,分别启动,并且调用了每个线程的join 方法(注意:join 方法是被主线程调用的,因此在第一个线程还没结束生命周期的时候,第二个线程的 join 不会得到执行,但是此时,第二个线程也已经启动了),运行上面的程序,你会发现线程一和线程二会交替的输出直到他们结束生命周期,main 线程的循环才会开始运行,程序输出如下:
join 方法会使当前线程永远的等待下去,直到期间被另外的线程中断,或者 join 的线程执行结束,当然你也可以使用 join 的另外两个重载方法,指定毫秒数,在指定的时间到达之后,当前线程也会退出阻塞。
4.8、如何关闭一个线程
五、线程安全与数据同步
共享资源指的是多个线程同时对同一份资源进行访问(读写操作),被多个线程访问的资源就称为共享资源,如何保证多个线程访问到的数据是一致的,则被称为数据同步或者资源同步。
5.1 数据同步
5.1.1 数据不一致问题引入
写了一个简单的营业大厅叫号机程序,当时我们设定的最大号码是50,测试的时候就会出现数据不一致的情况,具体如下:
/**
* 简单的营业大厅叫号机程序,用来测试数据不一致问题
*/
public class CounterWindowRunnable implements Runnable{
// 最多受理 50 笔业务
private static final int MAX = 500;
// 起始号码,不做 static 修饰
private int index = 1;
@Override
public void run() {
while (index <= MAX) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.format("请【%d】号到【%s】办理业务\n", index++,
Thread.currentThread().getName());
}
}
public static void main(String[] args) {
final CounterWindowRunnable task = new CounterWindowRunnable();
new Thread(task, "一号窗口").start();
new Thread(task, "二号窗口").start();
new Thread(task, "三号窗口").start();
new Thread(task, "四号窗口").start();
}
}
多次运行上述程序,每次都会有不一样的发现,但是总结起来主要有三个问题,具体如下:
- 第一,某个号码被略过没有出现。
- 第二,某个号码被多次显示。
- 第三,号码超过了最大值 500。
多次运行上面的程序,找出了数据不一致的几种情况,如下图所示。
5.1.2、数据不一致问题原因分析
A 号码被忽略
如图所示,线程的执行是由 CPU 时间片轮询调度的,假设此时线程 1 和 2 都执行到了 index=65 的位置,其中线程 2 将 index 修改为 66 之后未输出之前,CPU 调度器将执行权利交给了线程 1,线程 1 直接将其累加到了 67,那么 66 就被忽略了。
B 、号码重复出现
线程 1 执行 index+1,然后 CPU 执行权落入线程 2 手里,由于线程 1 并没有给 index赋予计算后的结果 393,因此线程 2 执行 index+1 的结果仍然是 393,所以会出现重复号码的情况。
C、号码超过了最大值
当 index=499 的时候,线程 1 和线程 2 都看到条件满足,线程 2 短暂停顿,线程 1 将 index 增加到了 500,线程 2 恢复运行后又将 500增加到了 501,此时就出现了超过最大值的情况。
注:
我们虽然使用了时序图的方式对数据同步问题进行了分析,但是这样的解释还是不够严谨,后面我们会讲解 Java 的内存模型以及 CPU 缓存等知识,到时候会更加清晰和深入的讲解数据不一致的问题。
5.2、初识 synchronized 关键字
synchronized 提供了一种排他机制,也就是在同一时间只能有一个线程执行某些操作。
synchronized 关键字可以实现一个简单的策略来防止线程干扰和内存一致性错误,如果一个对象对多个线程是可见的,那么对该对象的所有读或者写都将通过同步的方式来进行,具体表现如下:
- synchronized 关键字提供了一种锁的机制,能够确保共享变量的互斥访问,从而防止数据不一致问题的出现。
- synchronized 关键字包括 monitor enter 和 monitor exit 两个 JVM 指令,它能够保证在任何时候任何线程执行到 monitor enter 成功之前都必须从主内存中获取数据,而不是从缓存中,在 monitor exit 运行成功之后,共享变量被更新后的值必须刷入主内存(后面会重点介绍)
- synchronized 的指令严格遵守 java happens-before 规则,一个 monitor exit 指令之前必定要有一个 monitor enter(后面会详细介绍)
5.2.1 synchronized 关键字的用法
synchronized 可以用于对代码块或方法进行修饰,而不能够用于对 class 以及变量进行修饰。
同步方法语法:
[default|public|private|protected] synchronized [static] type method()
示例代码:
public synchronized void sync() {
...
}
public synchronized static void staticSync() {
.
同步代码块示例:
private final Object MUTEX = new Object();
public void sync() {
synchronized (MUTEX) {
...
}
}
使用synchronized 关键字优化叫号小程序
/**
* 简单的营业大厅叫号机程序,测试synchronized关键字
*/
public class CounterWindowRunnable2 implements Runnable{
// 最多受理 50 笔业务
private static final int MAX = 500;
// 起始号码,不做 static 修饰
private int index = 1;
private static final Object NUTEX = new Object();
@Override
public void run() {
synchronized (NUTEX){
while (index <= MAX) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.format("请【%d】号到【%s】办理业务\n", index++,
Thread.currentThread().getName());
}
}
}
public static void main(String[] args) {
final CounterWindowRunnable2 task = new CounterWindowRunnable2();
new Thread(task, "一号窗口").start();
new Thread(task, "二号窗口").start();
new Thread(task, "三号窗口").start();
new Thread(task, "四号窗口").start();
}
}
程序运行发现不会有数据不一致的问题了。
5.2.2、线程堆栈分析
synchronized 关键字提供了一种互斥机制,也就是说在同一时刻,只能有一个线程访问同步资源,很多资料、书籍将 synchronized( mutex)称为锁,其实这种说法是不严谨的,准确地讲应该是某线程获取了与 mutex 关联的 monitor 锁(当然写程序的时候知道它想要表达的语义即可),下面我们来看一个简单的例子对其进行说明:
**
* 线程堆栈分析
*/
public class Mutex{
public static final Object MUTEX = new Object();
public void accessResource() {
synchronized (MUTEX) {
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
final Mutex mutex = new Mutex();
for (int i = 0; i < 5; i++) {
new Thread(mutex::accessResource).start();
}
}
}
上面的代码中定义了一个方法 accessResource,并且使用同步代码块的方式对accessResource 进行了同步,同时定义了 5 个线程调用 accessResource 方法,由于同步代码块的互斥性,只能有一个线程获取了 mutex monitor 的锁,其他线程只能进入阻塞状态,等待获取 mutex monitor 锁的线程对其进行释放,运行上面的程序然后打开JConsole 工具监控,如下图所示。
随便选中程序中创建的某个线程,会发现只有个线程在 IMED WAITING(sleeping)状态,其他线程都进人了 BLOCKED 状态,如图所示:
使用 jstack 命令打印进程的线程堆栈信息,选取其中几处关键的地方对其进行分析。Thread-0 持有 monitor <0x000000071118f160> 的锁并且处于休眠状态中,那么其他线程将会无法进入accessResource 方法,如图所示。
Thread-1 线程进入 BLOCKED 状态并且等待着获取 monitor <0x000000071118f160>的锁,其他的几个线程同样也是 BLOCKED 状态,如下图所示。
5.2.3 、使用 synchronized 需要注意的问题
这里罗列了几个初学者容易出现的错误:
A.与 monitor 关联的对象不能为空
private static final Object MUTEX = null;
private void accessResource() {
synchronized (MUTEX) {
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Mutex 为 null,很多人还是会犯这么简单的错误,每一个对象和一个 monitor 关联,对象都为 nul 了, monitor 肯定无从谈起。
B.synchronized 作用域太大
由于 synchronized 关键字存在排他性,也就是说所有的线程必须串行地经过synchronized 保护的共享区域,如果 synchronized 作用域越大,则代表着其效率越低,甚至还会丧失并发的优势。
@Override
public synchronized void run() {
//
}
上面的代码对整个线程的执行逻辑单元都进行了 synchronized 同步,从而丧失了并发的能力, synchronized 关键字应该尽可能地只作用于共享资源(数据)的读写作用域。
C.不同的 monitor 企图锁相同的方法
public class Task implements Runnable {
private final Object MUTEX = new Object();
@Override
public void run() {
synchronized (MUTEX) {
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(Task::new).start();
}
}
}
上面的代码构造了五个线程,同时也构造了五个 Runnable 实例, Runnable 作为线程逻辑执行单元传递给 Thread,然后你将发现, synchronized 根本互斥不了与之对应的作用域,线程之间进行 monitor lock 的争抢只能发生在与 monitor 关联的同一个引用上,上面的代码每一个线程争抢的 monitor 关联引用都是彼此独立的,因此不可能起到互斥的作用。
D.多个锁的交叉导致死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
5.3、This Monitor 和 Class Monitor 的详细介绍
5.3.1、this monitor
在下面的代码 ThisMonitor 中,两个方法 methodl 和 method2 都被 synchronized关键字修饰,启动了两个线程分别访问 methodl 和 method2。
public class ThisMonitor {
public synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " enter to method1");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void method2() {
System.out.println(Thread.currentThread().getName() + " enter to method2");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ThisMonitor thisMonitor = new ThisMonitor();
new Thread(thisMonitor::method1, "T1").start();
new Thread(thisMonitor::method2, "T2").start();
}
}
运行程序将会发现只有一个方法被调用,另外一个方法根本没有被调用,分析线程的堆栈信息,执行 jdk 自带的 stack pid 命令,如图所示。
T1 线程获取了<0x000000071118f478>monitor 的 lock 并且处于休眠状态,而 T2 线程企图获取<0x000000071118f478> monitor的 lock 时陷入了 BLOCKED 状态,可见使用 synchronized 关键字同步类的不同实例方法,争抢的是同一个 monitor 的 lock。
而与之关联的引用则是 ThisMonitor 的实例引用,为了证实我们的推论,将上面的代码稍作修改,如下所示:
public class ThisMonitor02 {
public synchronized void method1() {
System.out.println(Thread.currentThread().getName() + " enter to method1");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void method2() {
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " enter to method2");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThisMonitor02 thisMonitor = new ThisMonitor02();
new Thread(thisMonitor::method1, "T1").start();
new Thread(thisMonitor::method2, "T2").start();
}
method1 保持方法同步的方式, method2 则采用了同步代码块的方式,并且使用的是 this 的 monitor,运行修改后的代码将会发现效果完全一样。
JDK官方有这样的描述:
https://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
When a thread invokes a synchronized method, it automatically acquires the intrinsic lock for that method’s object and releases it when the method returns. The lock release occurs even if the return was caused by an uncaught exception.
当线程调用同步方法时,它会自动获取该方法对象的内部锁,并在方法返回时释放它。即使返回是由未捕获的异常引起的,也会发生锁释放。
5.3.2、class monitor
有两个类方法(静态方法)分别使用 synchronized。对其进行同步。
public class ClassMonitor {
public synchronized static void method1() {
System.out.println(Thread.currentThread().getName() + " enter to method1");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized static void method2() {
System.out.println(Thread.currentThread().getName() + " enter to method2");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(ClassMonitor::method1, "T1").start();
new Thread(ClassMonitor::method2, "T2").start();
}
}
运行上面的例子,在同一时刻只能有一个线程访问 ClassMonitor 的静态方法,我们仍旧使用 jstack 命令分析其线程堆栈信息,如图所示。
T1 线程持有<0x000000071118ed60>monitor 的锁在正在休眠,而 T2 线程在试图获取<0x000000071118ed60> monitor 锁的时候陷入了 BLOCKED 状态,因此我们可以得出用 synchronized 同步某个类的不同静态方法争抢的也是同一个 monitor 的 lock,再仔细对比堆栈信息会发现与 5.3.1 节中关于monitor 信 息 不 一 样 的 地 方 在 于 (a java.lang.Class for
com.bjsxt.chapter05.demo05.ClassMonitor),由此可以推断与该 monitor关联的引用是ClassMonitord.class 实例。
对上面的代码稍作修改,然后运行会发现具有同样的效果
public class ClassMonitor02 {
public synchronized static void method1() {
System.out.println(Thread.currentThread().getName() + " enter to method1");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void method2() {
synchronized (ClassMonitor02.class) {
System.out.println(Thread.currentThread().getName() + " enter to method2");
try {
TimeUnit.MINUTES.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main (String[]args){
new Thread(ClassMonitor02::method1, "T1").start();
new Thread(ClassMonitor02::method2, "T2").start();
}
}
其中静态方法 methodl 继续保持同步方法的方式,而 method2 则修改为同步代码块的方式,使用 ClassMonitor.class 的实例引用作为 monitor,同样在 JDK 官方文档中对 ClassMonitor 也有比较权威的说明:
https://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html You might wonder what happens when a static synchronized method is invoked, since a static method is associated with a class, not an object. In this case, the thread acquires the intrinsic lock for the Class object associated with the class. Thus access to class’s static fields is controlled by a lock that’s distinct from the lock for any instance of the class.
您可能想知道调用静态同步方法时会发生什么,因为静态方法与类而不是对象相关联。在这种情况下,线程获取与类相关联的Class对象的内部锁。因此,对类的静态字段的访问由一个锁控制,该锁不同于类的任何实例的锁。
6、线程间通信
与网络通信等进程间通信方式不一样,线程间通信又称为进程内通信,多个线程实现互斥访问共享资源时会互相发送信号或等待信号,比如线程等待数据到来的通知,线程收到变量改变的信号等。来学习一下 Java 提供的原生通信 API,以及这些通信机制背后的内幕。
6.1、 单线程通信
6.1.1、初识 wait 和 notify
假设我们现在需要两个线程,一个负责生产数据,一个负责消费数据,理想状态下我们
希望一边生产一边消费。
/**
* 测试单线程通信
*/
public class ProduceConsumerVersion2 {
private final Object LOCK = new Object();
private int i = 0;
private boolean isProduce = false;
public void produce() {
synchronized (LOCK) {
// 如果生产了数据则等待消费者消费,否则生产数据
if (isProduce) {
try {
LOCK.wait();// 等待消费者消费
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
i++;
System.out.println("P -> " + i);
isProduce = true;
LOCK.notify();// 生产者已生产,通知消费者消费
}
}
}
public void consumer() {
synchronized (LOCK) {
// 如果生产者生产了数据则消费,否则等待生产者生产数据
if (isProduce) {
System.out.println("C ->" + i);
isProduce = false;
LOCK.notify();// 消费者已消费,通知生产者生产
} else {
try {
LOCK.wait();// 等待生产者生产
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProduceConsumerVersion2 pc = new ProduceConsumerVersion2();
new Thread(() -> {
while (true)
pc.produce();
}).start();
new Thread(() -> {
while (true)
pc.consumer();
}).start();
}
}
生产者生产数据以后会通知消费者消费,并进入阻塞等待状态;消费者消费以后会唤醒生产者生产数据,并进入阻塞等待状态,如此循环往复。从结果上看貌似已经实现了我们的需求,但是在多个消费者和生产者的情况下可能效果又会不一样了。
6.2、多线程通信
创建多个线程进行通信,,代码如下:
import java.util.stream.Stream;
/**
* 测试多线程通信
*/
public class ProduceConsumerVersion3 {
private final Object LOCK = new Object();
private int i;
private boolean isProduced;
// 生产者
public void produce() {
synchronized (LOCK) {
// 如果已生产者数据等待消费者消费,否则生产数据
if (isProduced) {
try {
LOCK.wait();// 等待消费者消费
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
i++;// 生产数据
System.out.println("P -> " + i);
isProduced = true;
LOCK.notify();// 唤醒消费者进行消费
}
}
}
// 消费者
public void consumer() {
synchronized (LOCK) {
// 如果已生产者数据就消费数据,否则等待生产者生产数据
if (isProduced) {
System.out.println("C -> " + i);
isProduced = false;
LOCK.notify();// 唤醒生产者生产数据
} else {
try {
LOCK.wait();// 等待生产者生产数据
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProduceConsumerVersion3 pc = new ProduceConsumerVersion3();
Stream.of("P1", "P2").forEach(p -> {
new Thread(() -> {
while (true) {
pc.produce();
}
}).start();
});
Stream.of("C1", "C2").forEach(p -> {
new Thread(() -> {
while (true) {
pc.consumer();
}
}).start();
});
}
}
运行代码后,出现以下结果:
6.2.1、notifyAll
通过 notifyAll 可以解决以上问题,我们只需要唤醒所有其他线程即可,然后等待 CPU调度获得锁资源继续执行。
import java.util.stream.Stream;
/**
* 测试多线程通信 notifyAll
*/
public class ProduceConsumerVersion4 {
private final Object LOCK = new Object();
private int i;
private boolean isProduced;
// 生产者
public void produce() {
synchronized (LOCK) {
// 如果已生产者数据等待消费者消费,否则生产数据
if (isProduced) {
try {
LOCK.wait();// 等待消费者消费
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
i++;// 生产数据
System.out.println("P -> " + i);
isProduced = true;
LOCK.notifyAll();// 唤醒消费者进行消费
}
}
}
// 消费者
public void consumer() {
synchronized (LOCK) {
// 如果已生产者数据就消费数据,否则等待生产者生产数据
if (isProduced) {
System.out.println("C -> " + i);
isProduced = false;
LOCK.notifyAll();// 唤醒生产者生产数据
} else {
try {
LOCK.wait();// 等待生产者生产数据
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProduceConsumerVersion4 pc = new ProduceConsumerVersion4();
Stream.of("P1", "P2").forEach(p -> {
new Thread(() -> {
while (true) {
pc.produce();
}
}).start();
});
Stream.of("C1", "C2").forEach(p -> {
new Thread(() -> {
while (true) {
pc.consumer();
}
}).start();
});
}
}
运行代码后发现之前的问题已经解决。
6.3、wait 和 sleep 的区别
- sleep 是属于 Thread 的方法,wait 属于 Object;
- sleep 方法不需要被唤醒,wait 需要。
- sleep 方法不需要 synchronized,wait 需要;
- sleep 不会释放锁,wait 会释放锁并将线程加入 wait 队列;
7、脏读
对于对象的同步和异步的方法,我们在设计自己的程序的时候,一定要考虑问题的整体,不然就会出现数据不一致的错误,很经典的错误就是脏读( dirtyread)示例代码如下:
import java.util.concurrent.TimeUnit;
/**
* 测试脏读
* 业务整体需要使用完整的 synchronized,保持业务的原子性。
*/
public class DirtyRead {
private String username = "bjsxt";
private String password = "123";
public synchronized void setValue(String username, String password) {
this.username = username;
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.password = password;
System.out.println("setValue 最终结果:username = " + username + " , password = " + password);
}
public void getValue() {
System.out.println("getValue 方法得到:username = " + this.username + " , password = " + this.password);
}
public static void main(String[] args) {
final DirtyRead dr = new DirtyRead();
Thread t1 = new Thread(() -> dr.setValue("z3", "456"));
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
dr.getValue();
}
}
在我们对一个对象的方法加锁的时候,需要考虑业务的整体性,即为 setValue/getValue 方法同时加锁 synchronized 同步关键字,保证业务(service)的原子性,不然会出现业务错误(也从侧面保证业务的一致性).
8、volatile 关键字
8.1、volatile 关键字的概念
自 Java1.5 版本起, volatile 关鍵字所扮演的角色越来越重要,该关鍵字也成为并发包的基础,所有的原子数据类型都以此作为修饰,相比 synchronized 关键字,volatile被称为“轻量级锁”,能实现部分 synchronized 关键字的语义。volatile 概念: volatile 关键字的主要作用是使变量在多个线程间可见。下面我们通过一个案例来了解一下 volatile:
import java.util.concurrent.TimeUnit;
/**
*了解一下 volatile
* @author tianqq
*/
public class VolatileDemo extends Thread {
private boolean isRunning = true;
// private volatile boolean isRunning = true;
private void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
@Override
public void run() {
System.out.println("进入 run 方法..");
int i = 0;
while (isRunning) {
//..
}
System.out.println("线程停止");
}
public static void main(String[] args) throws InterruptedException {
VolatileDemo rt = new VolatileDemo();
rt.start();
TimeUnit.SECONDS.sleep(2);
rt.setRunning(false);
System.out.println("isRunning 的值已经被设置了 false");
}
}
在 Java 中,每一个线程都会有一块工作内存区,其中存放着所有线程共享的主内存中的变量值的拷贝。当线程执行时,他在自己的工作内存区中操作这些变量。为了存取一个共享的变量个线程通常先获取锁定并去清除它的内存工作区,把这些共享变量从所有线程的共享内存区中正确的装入到他自己所在的工作内存区中,当线程解锁时保证该工作内存区中变量的值写回到共享内存中。一个线程可以执行的操作有使用(use)、赋值(assign)、装载(load)、存储(store)、锁定(lock)、解锁(unlock). 而主内存可以执行的操作有读(read)、写(write)、锁定(lock)、解锁(unlock)每个操作都是原子的。
volatile 的作用就是强制线程到主内存(共享内存)里去读取变量,而不去线程工作
内存区里去读取,从而实现了多个线程间的变量可见。也就是满足线程安全的可见性。
8.2、volatile 关键字的非原子性
volatile-关键字虽然拥有多个线程之间的可见性,但是却不具备同步性(也就是原子性),可以算上是一个轻量级的 synchronized,性能要比 synchronized 强很多,不会造成阻塞(在很多开源的架构里,比如 netty 的底层代码就大量使用 volatile,可见netty 性能一定是非常不错的。)这里需要注意:一般 volatile 用于只针对于多个线程可见的变量操作,并不能代替 synchronized 的同步功能,示例代码如下:
import java.util.concurrent.atomic.AtomicInteger;
/**
* * volatile 关键字不具备 synchronized 关键字的原子性(同步)
* @author tianqq
*/
public class VolatileNoAtomic extends Thread{
private static AtomicInteger count = new AtomicInteger(0);
private static void addCount() {
for (int i = 0; i < 1000; i++) {
count.incrementAndGet();
}
System.out.println(count);
}
@Override
public void run() {
addCount();
}
public static void main(String[] args) {
VolatileNoAtomic[] arr = new VolatileNoAtomic[10];
for (int i = 0; i < 10; i++) {
arr[i] = new VolatileNoAtomic();
}
for (int i = 0; i < 10; i++) {
arr[i].start();
}
}
}
volatile 关键字只具有可见性,没有原子性。要实现原子性建议使用 atomic 类的系列对象,支持原子性操作(注意 Atomica 类只保证本身方法原子性,并不保证多次操作的原子性)示例如下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* atomic 类的系列对象
* @author tianqq
*/
public class AtomicUse {
private static AtomicInteger count = new AtomicInteger(0);
/**
* 多个 addAndGet 在一个方法内是非原子性的,
* 需要加 synchronized 进行修饰,保证 4 个 addAndGet 整体原子性
*/
public synchronized int multiAdd() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.addAndGet(1);
count.addAndGet(2);
count.addAndGet(3);
count.addAndGet(4); // +10
return count.get();
}
public static void main(String[] args) throws InterruptedException {
final AtomicUse au = new AtomicUse();
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 100; i++) {
ts.add(new Thread(() -> System.out.println(au.multiAdd())));
}
for (Thread t : ts) {
t.start();
}
TimeUnit.SECONDS.sleep(2);
System.out.println("count = " + count);// 1000
}
}
9、模拟阻塞 Queue
BlockingQueue 即阻塞队列,它是基于 ReentrantLock,依据它的基本原理,我们可以实现 Web 中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:
在 Java 中,BlockingQueue 是一个接口,它有很多实现类:
它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于 take 与 put 操作的原理,却是类似的。
- put(an Object):把 an Object 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断,直到 BlockingQueue 里面有空间再继续。
- take:取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。
下面我们通过 LinkedList 模拟一个阻塞 Queue,代码如下:
package threadLocal;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* LinkedList 模拟一个阻塞 Queue
* @author tianqq
*/
public class MyQueue {
// 1 需要一个承装元素的集合
private LinkedList<Object> list = new LinkedList<>();
// 2 需要一个计数器
private AtomicInteger count = new AtomicInteger(0);
// 3 需要制定上限和下限
private final int minSize = 0;
private final int maxSize;
// 4 构造方法
public MyQueue(int size) {
this.maxSize = size;
}
// 5 初始化一个对象 用于加锁
private final Object LOCK = new Object();
// put(an Object): 把 an Object 加到 BlockingQueue 里,
// 如果 BlockQueue 没有空间,则调用此方法的线程被阻断,直到 BlockingQueue 里面有空间再继续.
public void put(Object obj) {
synchronized (LOCK) {
// 空间满了,线程阻塞
while (count.get() == this.maxSize) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 1 加入元素
list.add(obj);
// 2 计数器累加
count.incrementAndGet();
// 3 通知另外一个线程(唤醒)
LOCK.notify();
System.out.println("新加入的元素为:" + obj);
}
}
// take: 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到BlockingQueue 有新的数据被加入.
public Object take() {
Object ret = null;
synchronized (LOCK) {
while (count.get() == this.minSize) {
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 1 做移除元素操作
ret = list.removeFirst();
// 2 计数器递减
count.decrementAndGet();
// 3 唤醒另外一个线程
LOCK.notify();
}
return ret;
}
public int getSize() {
return this.count.get();
}
public static void main(String[] args) {
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("当前容器的长度:" + mq.getSize());// 5
Thread t1 = new Thread(() -> {
mq.put("f");
mq.put("g");
}, "t1");
t1.start();
Thread t2 = new Thread(() -> {
Object o1 = mq.take();
System.out.println("移除的元素为:" + o1);
Object o2 = mq.take();
System.out.println("移除的元素为:" + o2);
}, "t2");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
运行结如下:
10、ThreadLocal
Threadlocal 概念:线程局部变量,是一种多线程间并发访问变量的解决方案。与其synchronized 等加锁的方式不同, ThreadLocal 完全不提供锁,而使用以空间换时间的手段,为每个线程提供变量的独立副本,以保障线程安全。从性能上说, ThreadLocal 不具有绝对的优势,在并发不是很高的时候,加锁的性能会更好,但作为一套与锁完全无关的线程安全解决方案,在高并发量或者竞争激烈的场景,使用 ThreadLocal 可以在一定程度上减少锁竞争。
public class ConnThreadLocal {
public static ThreadLocal<String> th = new ThreadLocal<>();
public void setTh(String value) {
th.set(value);
}
public void getTh() {
System.out.println(Thread.currentThread().getName() + ":" + th.get());
}
public static void main(String[] args) throws InterruptedException {
final ConnThreadLocal ct = new ConnThreadLocal();
Thread t1 = new Thread(() -> {
ct.setTh("张三");
ct.getTh();
}, "t1");
Thread t2 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
ct.getTh();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t1.start();
t2.start();
}
}
11、同步类容器
同步类容器都是线程安全的,同步容器类包括 Vector 和 HashTable,二者都是早期JDK 的一部分,此外还包括在 JDK1.2 当中添加的一些功能相似的类,这些同步的封装类是由 Collections.synchronizedXxx 等工厂方法创建的。
但在某些场景下可能需要加锁来保护复合操作。复合类操作如:迭代(反复访问元素,遍历完容器中所有的元素)、跳转(根据指定的顺序找到当前元素的下一个元素)、以及条件运算。这些复合操作在多线程并发地修改容器时,可能会表现出意外的行为,最经典的便是 ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题,示例代码如下:
/**
* 同步容器类
* @author tianqq
*/
public class ConcurrentModificationExceptionDemo {
public static void main(String[] args) {
Vector<String> list = new Vector<>();
list.add("111");
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "-----");
for (Iterator<String> iterator = list.iterator(); iterator.hasNext(); ) {
String element = iterator.next();
System.out.println("element = " + element);
}
});
Thread t2 = new Thread(() -> {
list.remove(0);
});
t1.start();
t2.start();
}
}
通过阅读源码发现,同步类容器其底层的机制无非就是用传统的 synchronized 关键字对每个公用的方法都进行同步,使得每次只能有一个线程访问容器的状态。这很明显不满足我们今天互联网时代高并发的需求,在保证线程安全的同时,也必须要有足够好的性能。
12、并发容器类
JDK5.0以后提供了多种并发类容器来替代同步类容器从而改善性能。同步类容器的状态都是串行化的。他们虽然实现了线程安全,但是严重降低了并发性,在多线程环境时,严重降低了应用程序的吞吐量。
并发类容器是专门针对并发设计的,使用 ConcurrentHashMap 来代替传统的HashTable,而且在ConcurrentHashMap中,添加了一些常见复合操作的支持。以及使用了CopyWriterArrayList代替Voctor,并发的 CopyOnWriteArraySet,,以及并发的 Queue,ConcurrentLinkedQueue 和 LinkedBlockingQueue,前者是高性能的队列,后者是以阻塞形式的队列,具体实现 Queue 还有很多,例如 ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueue 等。
12.1、CurrentHashMap
由于 HashMap 是线程不同步的,虽然处理数据的效率高,但是在多线程的情况下存在着安全问题,因此设计了 CurrentHashMap 来解决多线程安全问题。
HashMap 在 put 的时候,插入的元素超过了容量(由负载因子决定)的范围就会触发扩容操作,就是 rehash,这个会重新将原数组的内容重新 hash 到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行 put 操作,如果 hash 值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在 get 时会出现死循环,所以 HashMap 是线程不安全的。
12.1.1 、JDK8 的 ConcurrentHashMap
JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。Node 是 ConcurrentHashMap 存储结构的基本单元,继承于 HashMap 中的 Entry,用于存储数据,Node 数据结构很简单,就是一个链表,但是只允许对数据进行查找,不允许进行修改。
12.2、CopyOnWrite 容器
Copy-On-Writef 简称 COW,是一种用于程序设计中的优化策略。
JDK 里的 COW 容器有两种: CopyOnWriteArrayList 和 CopyOnWriteArraySet,
COW 容器非常有用,可以在非常多的并发场景中使用到。
什么是 CopyOnWrite 容器?
CopyOnWrite 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。
13、并发 Queue
在并发队列上 JDK 提供了两套实现,一个是以 ConcurrentLinkedQueue 为代表的高性能队列,一个是以 BlockingQueue 接口为代表的阻塞队列,无论哪种都继承自 Queue。
A、ConcurrentLinkedQueue
是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 ConcurrentLinkedQueue 性能好于 BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许 null 元素。
ConcurrentLinkedQueue 重要方法:
add()和 offer()都是加入元素的方法(在 ConcurrentLinkedQueue 中,这俩个方法没有任何区别)
poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
// 高性能无阻塞无界队列:ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
clq.offer("a");
clq.offer("b");
clq.offer("c");
clq.offer("d");
clq.add("e");
System.out.println(clq.size()); // 5
System.out.println(clq.poll()); // a 从头部取出元素,并从队列里删除
System.out.println(clq.size()); // 4
System.out.println(clq.peek()); // b
System.out.println(clq.size()); // 4
B 、BlockingQueue 接口
该接口有以下实现类:
B.1 ArrayBlockingQueue:
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用。
B.2 LinkedBlockingQueue:
基于链表的阻塞队列,同 ArrayBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue 之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。他是一个无界队列。
B.3 SynchronousQueue:
一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。
B.3 PriorityBlockingQueue:
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compatory 对象来决定,也就是说传入队列的对象必须实现 Comparable 接口,在实现PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
B.4 DelayQueue:
带有延迟时间的 Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。 DelayQueue 中的元素必须实现 Delayed 接口, DelayQueue 是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
package threadLocal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author tianqq
*/
public class UseQueue {
public static void main(String[] args) throws Exception {
// 高性能无阻塞无界队列:ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
clq.offer("a");
clq.offer("b");
clq.offer("c");
clq.offer("d");
clq.add("e");
System.out.println(clq.size()); // 5
System.out.println(clq.poll()); // a 从头部取出元素,并从队列里删除
System.out.println(clq.size()); // 4
System.out.println(clq.peek()); // b
System.out.println(clq.size()); // 4
System.out.println("------------------------------------------------");
// 阻塞有界队列
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
// array.add("f");
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
System.out.println("------------------------------------------------");
// 阻塞无界队列,可声明长度
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
lbq.offer("a");
lbq.offer("b");
lbq.offer("c");
lbq.offer("d");
lbq.offer("e");
lbq.add("f");
System.out.println(lbq.size());
lbq.forEach(System.out::println);
System.out.println("------------------------------------------------");
List<String> list = new ArrayList<>();
System.out.println(lbq.drainTo(list, 3));
System.out.println(list.size());
list.forEach(System.out::println);
System.out.println("------------------------------------------------");
// 无缓冲队列
final SynchronousQueue<String> q = new SynchronousQueue<>();
Thread t1 = new Thread(() -> {
try {
System.out.println(q.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Thread t2 = new Thread(() -> q.add("asdasd"));
t2.start();
}
}