JUC高并发编程
1.JUC概述
1.1 什么是JUC
JUC就是java.util.concurrent工具包的简称。这是一个处理线程的工具包,JDK1.5开始出现的。
1.2 线程和进程概念
进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程——资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。
1.3 线程的状态
1.3.1 线程状态枚举类:Thread.State
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,(新建)
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,(准备就绪)
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,(阻塞)
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,(不见不散)
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,(过时不候)
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;(终结)
}
1.3.2 wait/sleep 的区别:
- sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何对象实例都 能调用。
- sleep 不会释放锁,它也不需要占用锁。wait 会释放锁,但调用它的前提 是当前线程占有锁(即代码要在 synchronized 中)。
- 它们都可以被 interrupted 方法中断。
管程:在操作系统中叫Monitor监视器,在Java中叫锁。他是一种同步机制,保证同一个时间,只有一个线程访问被保护数据或者代码。jvm同步基于进入和退出,使用管程对象实现的。
1.4 并发与并行
1.4.1 串行模式
串行表示所有任务都一一按照先后顺序进行。串行意味着必须先装完一车柴才能运行这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤。
串行是一次只能取得一个任务,并执行这个任务。
1.4.2 并行模式
并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条段队列,所以并行缩短了任务对列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多喝CPU。
1.4.3 并发
==并发(concurrent)值的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。==但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,==并发的重点在于它是一种现象,并发描述的是多进程同时运行的现象。==但实际上,对于单核心CPU来说,同一时刻只能运行一个线程。所以,这里的"同时运行"表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来来了,但实际上这些程序中的进程不是一直霸占CPU的,二是执行一会停一会。
要解决大并发问题,通常是将大任务分解成多个小任务,由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。这可能会出现一些现象:
- 可能出现一个小任务执行了多次,还没开始下个任务的情况。这时一般会采用队列或类似的数据结构不来存放各个小任务的成果。
- 可能出现还没准备好第一步就执行第二步的可能。这时,一般采用多路复用或异步的方式,比如只有准备好产生了时间通知才执行某个任务。
- 可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这些小任务,这时很可能要配合多路复用才能达到较高的效率。
1.5 管程
管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现),但是这样并不能保证进程以设计的顺序执行。
JVM 中同步时基于进程和退出管程对象实现的,每个对象都会有一个管程对象,管程会随着java对象一同创建和销毁。
执行线程首先要保持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取用一个管程。
1.6 用户线程和守护线程
用户线程:平时用到的普通线程,自定义线程
守护线程:运行在后台,是一种特殊的线程,比如垃圾回收
当主线程结束后,用户线程还在运行,JVM 存活
如果没有用户线程,都是守护线程,JVM 结束
2.Lock接口
2.1 Synchronized
2.1.1 synchronized 关键字
synchronized 是Java中的关键字,是一种同步锁。它修饰的对象有以下集中:
-
修饰一个代码块。被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象;
-
修饰一个方法。被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;
-
修改一个静态的方法。其作用范围是整个静态方法,作用的对象是这个类的所有对象;
-
修改一个类。其作用的范围是synchronized后面括号括起来的部分,作用主的对象是这个类的所有对象。
如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执 行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里 获取锁的线程释放锁只会有两种情况: 1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有; 2)线程执行发生异常,此时 JVM 会让线程自动释放锁。 那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep 方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一 下,这多么影响程序执行效率。 因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等 待一定的时间或者能够响应中断),通过 Lock 就可以办到。
2.2 什么是 Lock:
Lock
实现提供了比使用 synchronized
方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition
对象。
Lock 与 synchronized 的区别:
- Lock 不是Java语言内置的,synchronized 是 Java 语言的关键字,因此是内置特性Lock 是一个类,通过这个类可以实现同步访问;
- Lock 和 synchronized 有一点非常大的不同,采用 synchronized 不需要用户手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,若果没有主动释放锁,有可能导致出现死锁现象。
2.2.1 Lock 接口
public interface Lock {
// 获取锁
void lock();
// 如果当前线程未被中断,则获取锁
void lockInterruptibly() throws InterruptedException;
// 仅在调用时锁为空闲状态才获取锁
boolean tryLock();
// 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 返回绑定到此Lock实例的新Condition实例
Condition newCondition();
}
下面来逐个讲述 Lock 接口中每个方法的使用
2.2.2 lock()
获取锁。
如果锁不可用,处于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在 try{}catch{}
块中进行,并且将释放锁的操作放在 finally
块中进行,以保证锁一定会被释放,防止死锁的发生。通常使用 Lock来进行同步的话,是以下面这种形式去使用的:
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
2.2.3 newCondition
返回绑定到此 Lock
实例的新 Condition
实例。
关键字 synchronized
与 wait()/notify()
这两个方法一起使用可以实现等待/通知模式,Lock 锁的 newCondition()
方法返回的Condition
对象,Condition
类也可以实现等待/通知模式。
用 notify() 通知时,JVM 会随机唤醒某个等待的线程,使用 Condition 类可以进行选择性通知,Condition 比较常用的两个方法:
- void signal()
唤醒一个等待线程。
如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。 - void await()会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重
新获得锁并继续执行。
2.3 ReentrantLock
一个可重入的互斥锁 Lock
,它具有与使用 synchronized
方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。
ReentrantLock
将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用 lock
的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。可以使用 isHeldByCurrentThread()
和 getHoldCount()
方法来检查此情况是否发生。
2.4 ReadWriteLock
ReadWriteLock
维护了一对相关的锁
,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁
可以由多个 reader 线程同时保持。写入锁
是独占的。
public interface ReadWriteLock{
// 返回用于读取操作的锁。
Lock readLock();
// 返回用于写入操作的锁。
Lock writeLock()
}
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock
实现了 ReadWriteLock 接口。ReentrantReadWriteLock 里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和 writeLock()用来获取读锁和写锁。
下面通过几个例子来看一下 ReentrantReadWriteLock 具体用法。
假如有多个线程要同时进行读操作的话,先看一下 synchronized 达到的效果:
public class Test {
private ReentrantReadWriteLock rwl = new
ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.get(Thread.currentThread());
}
;
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
}
;
}.start();
}
public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行读操作");
}
System.out.println(thread.getName() + "读操作完毕");
}
}
而改成用读写锁的话:
public class Test {
private ReentrantReadWriteLock rwl = new
ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.get(Thread.currentThread());
}
;
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
}
;
}.start();
}
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在进行读操作");
}
System.out.println(thread.getName() + "读操作完毕");
} finally {
rwl.readLock().unlock();
}
}
}
说明 thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率。
2.5 Synchronized 与 Lock的区别
Lock 和 synchronized 有以下几点不同:
- Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
- synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过 unlock() 去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在finally快中释放锁;
- Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
- 通过 Lock 可以知道有没有成功获取锁,而 synchronized 确无法办到。
- Lock 可以提高多个线程进行读写操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(既有大量线程同时竞争),此时 Lock 的性能要远远优于 synchronized。
3.线程间通信
3.1 概念
线程间通信的模型有两种:共享内存和消息传递,一下方式都是基本这两种模型来实现的。
3.2 场景:消费者和生产者
两个线程,一个线程对当前值加1,另一个线程对当前值减1,要求用线程减通信。
// 1.创建资源类,定义属性和操作方法
class Share {
// 初始值
private int number = 0;
// +1的方法
public synchronized void incr() throws InterruptedException {
// 2.判断、干活、通知
if (number != 0) {
// number 不是0,等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + " :: " + number);
// 通知其他线程
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
// 判断、干活、通知
if (number != 1) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + " :: " + number);
this.notifyAll();
}
}
public class ThreadDemo1 {
// 3.创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
Share share = new Share();
// 创建第一个线程
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "AA").start();;
// 创建第二个线程
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"BB").start();;
}
}
结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z07UALHB-1688185732836)(https://images-1313675740.cos.ap-shanghai.myqcloud.com/image-%E7%BA%BF%E7%A8%8B-%E7%94%9F%E4%BA%A7%E8%80%85%E4%B8%8E%E6%B6%88%E8%B4%B9%E8%80%85.png)]
3.3 虚假唤醒问题:
现有四个线程:AA与CC做+1操作,BB与DD做-1操作;
由于wait()方法特点是在哪里等待在哪里被唤醒,if 只会判断一次,当被唤醒后,条件不生效,所以将 if 换成 while中。
// 1.创建资源类,定义属性和操作方法
class Share {
// 初始值
private int number = 0;
// +1的方法
public synchronized void incr() throws InterruptedException {
// 2.判断、干活、通知
while (number != 0) {
// number 不是0,等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + " :: " + number);
// 通知其他线程
this.notifyAll();
}
public synchronized void decr() throws InterruptedException {
// 判断、干活、通知
while (number != 1) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + " :: " + number);
this.notifyAll();
}
}
// 3.创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
Share share = new Share();
// 创建第一个线程
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "AA").start();;
// 创建第二个线程
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"BB").start();;
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"CC").start();;
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"DD").start();;
}
使用 Lock 实现上边的案例:
// 创建资源类,在资源类中定义属性和操作方法
class Share {
// 初始化变量
private int number = 0;
// 创建Lock
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// +1
public void incr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断、干活。通知
while (number != 0) {
// 等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
// 通知其他线程(唤醒)
condition.signalAll();
} finally {
// 解锁
lock.unlock();
}
}
// -1
public void decr() throws InterruptedException {
// 上锁
lock.lock();
try {
while (number != 1) {
// 等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
// 唤醒
condition.signalAll();
} finally {
// 解锁
lock.unlock();
}
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for(int i=1;i<=10;i++) {
try {
share.incr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for(int i=1;i<=10;i++) {
try {
share.decr();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"BB").start();
}
}
4.线程间定制化通信
案例:启动三个线程,按照如下要求:AA打印5次,BB打印10次,CC打印15次;进行10轮打印。
案例分析:
代码:
// 创建资源类
class ShareResource {
// 定义标志位
private int flag = 1;// 1 AA 2 BB 3CC
// 创建Lock锁
private Lock lock = new ReentrantLock();
// 创建三个Condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
// 打印5次,参数第几轮
public void print5(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 1) {
c1.await();
}
// 干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + ":轮数: " + loop);
}
// 修改标志位
flag = 2;
// 通知其他线程
c2.signal();
} finally {
// 释放锁
lock.unlock();
}
}
// 打印10次
public void print10(int loop) throws InterruptedException {
lock.lock();
try {
// 判断
while (flag != 2) {
c2.await();
}
// 干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + ":轮数: " + loop);
}
// 修改标志位
flag = 3;
// 通知其他线程
c3.signal();
} finally {
lock.unlock();
}
}
// 打印15次
public void print15(int loop) throws InterruptedException {
lock.lock();
try {
// 判断
while (flag != 3) {
c3.await();
}
// 干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i + ":轮数: " + loop);
}
// 修改标志位
flag = 1;
// 通知其他线程
c1.signal();
} finally {
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource sr = new ShareResource();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
sr.print5(i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
sr.print10(i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "BB").start();
new Thread(() -> {
for(int i=1;i<=10;i++) {
try {
sr.print15(i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"CC").start();
}
}
总结:
synchronized
实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为一下3种形式:
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是 synchronized 括号里配置的对象。
5.集合的线程安全
5.1 集合不安全演示
/**
* List线程不安全
*
* @author 蜡笔小新
*
*/
public class ThreadDemo4 {
public static void main(String[] args) {
// 创建ArrayList对象
List<String> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
// 往集合中添加数据
list.add(UUID.randomUUID().toString().substring(0, 8));
// 从集合中获取数据
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
演示结果:
5.2 解决方案 - Vector
public class ThreadDemo4 {
public static void main(String[] args) {
// 创建ArrayList对象
List<String> list = new Vector<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
// 往集合中添加数据
list.add(UUID.randomUUID().toString().substring(0, 8));
// 从集合中获取数据
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
但是该方法比较古老,是JDK1.0的时候出现的。
5.3 解决方案 - Collections
/**
* List线程不安全
*
* @author 蜡笔小新
*
*/
public class ThreadDemo4 {
public static void main(String[] args) {
List<String> list = Collections.synchronizedList(new ArrayList<>());
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
// 往集合中添加数据
list.add(UUID.randomUUID().toString().substring(0, 8));
// 从集合中获取数据
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
这种解决方案也不经常使用。
5.4 解决方案 - CopyOnWriteArrayList
/**
* List线程不安全
*
* @author 蜡笔小新
*
*/
public class ThreadDemo4 {
public static void main(String[] args) {
List<String> list = new CopyOnWriteArrayList<String>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
// 往集合中添加数据
list.add(UUID.randomUUID().toString().substring(0, 8));
// 从集合中获取数据
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
这种方法是JUC自带的,也是经常使用的一种方法。它相当于线程安全的ArrayList。和ArrayList一样,它是个可变数组;但是和ArrayList不同的是,它具有一下特性:
- 它最合适于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多余可变操作,需要在遍历期间防止线程间的冲突。
- 他是线程安全的。
- 因为通常需要复制整个基础数组,所以可变操作(add()、set()和remove()等)的开销很大。
- 迭代器支持 hasNext(),next()等不可变操作,但不支持可变 remove()等操作。
- 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。
复制思想:
当我们往一个容器中添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后往新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
CopyOnWriteArrayList 源码:
我们不妨来看一下Java中的Set是否是线程安全的呢?
public static void main(String[] args) {
Set<String> set = new HashSet<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
// 往集合中添加数据
set.add(UUID.randomUUID().toString().substring(0,8 ));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
测试结果:
可以看到,Java中的Set也不是线程安全的。Java也提供了响应的CopyOnWriteArraySet
来保证Set的线程安全。
public class ThreadDemo4 {
public static void main(String[] args) {
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
// 往集合中添加数据
set.add(UUID.randomUUID().toString().substring(0,8 ));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
同样Map也是的。
public class ThreadDemo4 {
public static void main(String[] args) {
Map<String,String> map = new HashMap<>();
for (int i = 1; i <= 30; i++) {
String key = String.valueOf(i);
new Thread(()->{
// 往集合中添加数据
map.put(key,UUID.randomUUID().toString().substring(0,8 ));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
测试结果:
JDK中提供了ConcurrentHashMap<K,V>
类来确保Map的线程安全。
public class ThreadDemo4 {
public static void main(String[] args) {
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 30; i++) {
String key = String.valueOf(i);
new Thread(()->{
// 往集合中添加数据
map.put(key,UUID.randomUUID().toString().substring(0,8 ));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
总结:
只要是解决线程安全的类,他们的的源码中的add()、put()方法都是加了synchronized
关键字的,而原始的List、Map、Set的add()、put()方法是没有加改关键字的。
6.多线程锁
6.1 Synchronized 的8种情况
/**
* 1.标准访问,先打印短信还是邮件
* ------sendMSG
* ------sendEmail
* 2.停4秒在短信内,先打印短信还是邮件
* ------sendMSG
* ------sendEmail
* 3.新增普通的hello方法,是先打印短信还是hello
* -------getHello
* ------sendMSG
* 4.现在有两部手机,先打印短信还是邮件
* ------sendEmail
* ------sendMSG
* 5.两个静态同步方法,1部手机,先打印短信还是邮件
* ------sendMSG
* ------sendEmail
* 6.两个静态同步方法,2部手机,先打印短信还是邮件
* ------sendMSG
* ------sendEmail
* 7.1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
* ------sendEmail
* ------sendMSG
* 8.1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
*
*/
public class ThreadDemo5 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone1 = new Phone();
new Thread(() -> {
try {
phone.sendMSG();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AA").start();
Thread.sleep(100);
new Thread(() -> {
try {
phone1.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "BB").start();
}
}
class Phone {
// 带static关键字 锁的是Phone.class字节码文件
public static synchronized void sendMSG() throws InterruptedException {
// 停留4秒
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendMSG");
}
// 不带static关键字 锁的是当前对象(this)
public synchronized void sendEmail() {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("---------getHello");
}
}
结论:
一个对象里面如果有多个synchronized
方法,某一时刻内,只要一个线程去调用其中的一个synchronized
方法了,其他的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized
方法。
锁的是当前对象this
,被锁定后,其他的线程都不能进入到当前对象的其他的synchronized
方法。
加个普通方法后发现和同步锁无关。换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized
实现同步的基础:Java 中的每一个对象都可以作为锁。
具体表现为以下3种形式:
- 对于普通同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前类的
Class
对象。 - 对于同步方法块,锁是
synchronized
括号里配置的对象。
当一个线程视图访问同步代码块时,他首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以无需等待该该实例对象以获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有静态条件的。
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要是他们同一个类的实例对象。
6.2 公平锁与非公平锁
非公平锁:每个线程获取锁的顺序是按照线程访问锁的先后顺序获取的,最前面的线程总是最先获取到锁。效率高。
公平锁:每个线程获取锁的顺序是随机的,并不会遵循先来先得的规则,所有线程会竞争获取锁。效率低。
6.3 可重入锁
可重入性:就是一个线程不用释放,可以重复的获取一个锁n次,而不出现死锁的状况。只是在释放的时候,也需要响应的释放n次。
可重入锁有:
synchronized
(隐式)Rntrantlock
(显式)
使用ReentrantLock
的注意点
ReentrantLock
和 Synchronized
不一样,需要手动释放锁,所以使用 ReentrantLock
的时候一定要手动释放锁,并且加锁次数和释放锁次数要一样。
// 演示什么是可重入锁(Synchronized):可重入锁就是可以重复获取相同的锁
public class ReentrantLockDemo1 {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (this) {
System.out.println("第一次获取锁,这个锁是:" + this);
int index = 1;
while (true) {
index++;
synchronized (this) {
System.out.println("第二次获取锁,这个锁是:" + this);
}
if (index == 10) {
break;
}
}
}
}
}, "AA").start();
}
}
Synchronized演示结果:
// 演示什么是可重入锁(ReentrantLock)
public class ReentrantLockDemo2 {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
// 创建线程
new Thread(() -> {
// 上锁
try {
reentrantLock.lock();
System.out.println("第1次获取锁,这个锁是:" + reentrantLock);
int index = 1;
while (true) {
index++;
try {
// 上锁
reentrantLock.lock();
System.out.println("第" + index + "次获取锁,这个锁是:" + reentrantLock);
if (index == 10) {
break;
}
} finally {
// 解锁
reentrantLock.unlock();
}
}
} finally {
// 解锁
reentrantLock.unlock();
}
}).start();
}
}
ReentrantLock:演示结果
可以发现不管是Synchronized 还是 ReentrantLock,都没有发生死锁,都可以多次获取相同的锁。
6.4 死锁
所谓死锁,是指多个进程在运行过程中因争夺资源而造成的一种僵局,当进程处于这种僵持状态时,若无外力作用,它们都将无法在向前推进。
如图,线程A持有锁A,线程B持有锁B,线程A又试图获取锁B,同时线程B试图获取锁A。线程A和线程B相互等待,如果没有外力干涉,就再无法执行下去。
产生死锁的原因:
- 因竞争资源发生死锁现象:系统中供多个进程共享的资源的数目不足以满足全部进程的需要时,就会引起对诸资源的竞争而发生死锁现象。
- 可剥夺资源和不可剥夺资源:可剥夺资源是指某进程在获得该类资源时,该资源同样可以被其他进程或系统剥夺,不可剥夺资源是指当系统把该类资源分配给某个进程时,不能强制收回,只能在该进程使用完成后自动释放。
- 竞争不可剥夺资源:系统中不可剥夺资源的数目不足以满足诸进程运行的要求,则发生在运行进程中,不同的进程因争夺这些资源陷入僵局。
- 竞争临时资源
- 进程推进顺序不当发生死锁
产生死锁的四个必要条件:
- 互斥条件:进程对所分配的到的资源不允许其他进程进行访问,若其他进程访问该资源,只能等待,直至占有该资源的进程使用完成后释放该资源。
- 请求和保持条件:进程获得一定的资源之后,又对其他资源发出请求,但是该资源可能被其他进程占有,此时请求阻塞,但又对自己获得的资源保持不放。
- 不可剥夺条件:是指进程已获得的资源,在未完成使用之前,不可被剥夺,只能在是用完之后自己释放。
- 环路等待条件:是指进程发生死锁后,必然存在一个进程<——>资源之间的环形链。
处理死锁的基本方法:
- 预防死锁:通过设置一些限制条件,去破坏产生死锁的必要条件
- 避免死锁:在资源分配过程中,使用某种方法避免系统进入不安全状态,从而避免发生死锁
- 检测死锁:允许死锁的发生,但是通过系统的检测之后,采取一些措施,将死锁清除掉
- 解除死锁:该方法与检测死锁配合使用
案例演示:
/**
* @author cpl
* @date 2023/2/16
* @apiNote:演示死锁
*/
public class DeadLockDemo1 {
// 创建两个对象
static Object A = new Object();
static Object B = new Object();
public static void main(String[] args) {
// 创建线程
new Thread(()->{
synchronized (A){
System.out.println(Thread.currentThread().getName()+" 持有A视图获取B");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B){
System.out.println(Thread.currentThread().getName()+" 获取B");
}
}
},"A").start();
new Thread(()->{
synchronized (B){
System.out.println(Thread.currentThread().getName()+" 持有B视图获取A");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized(A){
System.out.println(Thread.currentThread().getName()+" 获取A");
}
}
},"B").start();
}
}
验证是否是死锁:使用 jsp
命令与 jstack
JVM自带堆栈跟踪工具
然后使用 jstack 12868
7.Callable&Future接口
7.1 创建线程的方式
- 继承
Thread
类 - 实现
Runnable
接口 - 实现
Callable
接口 - 线程池方式
7.2 Runnable 与 Callable 的区别
目前我们学习了有两种创建线程的方法,一种是通过创建 Thread
类,另一种是通过使用 Runnable
创建线程。但是,Runnable 缺少的一项功能是,当线程终止时(即 run()方法完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了 Callable
接口。
现在我们使用Callable
接口创建线程
Callable 接口的特点如下:
- 为了实现
Runnable
,需要实现不返回任何内容的 run() 方法,而对于 Callable ,需要实现在完成时返回结果的 call() - call() 方法可以引发异常,而 run() 则不能
- 为实现
Callable
而必须重写 call() 方法 - 不能直接替换 runnable ,因为 Thread 类的构造方法没有参数为 Callable 的
无法直接使用Callable创建线程,因为 Thread
类中的构造方法没有参数是 Callable
的。这里需要使用一个特殊类。
这个类就是 FutureTask
,它即实现了 Runnable 接口,又有一个含有 Callable 为参数的构造器方法。
7.3 Future接口
当 call() 方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果。为此,可以使用 Future 对象。
将 Future 视为保存结果的对象——它可能暂时不保存结果,但将来会保存(一旦 Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。要实现此接口,必须重写5中方法,这里列出了重要的方法,如下:
-
boolean cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。 此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。 参数: mayInterruptIfRunning - 如果应该中断执行此任务的线程,则为 true;否则允许正在运行的任务运行完成 返回: 如果无法取消任务,则返回 false,这通常是由于它已经正常完成;否则返回 true
-
boolean isCancelled() 如果在任务正常完成前将其取消,则返回 true。 返回: 如果任务完成前将其取消,则返回 true
-
boolean isDone() 如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。 返回: 如果任务已完成,则返回 true
-
V get() throws InterruptedException, ExecutionException 如有必要,等待计算完成,然后获取其结果。 返回: 计算的结果 抛出: CancellationException - 如果计算被取消 ExecutionException - 如果计算抛出异常 InterruptedException - 如果当前的线程在等待时被中断
-
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 参数: timeout - 等待的最大时间 unit - timeout 参数的时间单位 返回: 计算的结果 抛出: CancellationException - 如果计算被取消 ExecutionException - 如果计算抛出异常 InterruptedException - 如果当前的线程在等待时被中断 TimeoutException - 如果等待超时
7.4 FutureTask
Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。可以通过为其构造函数提供 Callable 来创建 FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建 Thread 对象。因此,间接地使用 Callable 创建线程。
核心原理:
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成
- 将主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执行状态
- 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果
- 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法
- 一旦计算完成,就不能再重新开始或取消计算
- get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常
- get 只计算一次,因此 get 方法放到最后
7.5 使用 Callable 和 Future
案例:
class CreateThread2 implements Callable {
@Override
public Long call() throws Exception {
try {
System.out.println(Thread.currentThread().getName()+"线程进入了call方法,开始准备睡觉");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"线程睡醒了");
} catch (InterruptedException e) {
e.printStackTrace();
}
return System.currentTimeMillis();
}
}
public class CreateThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 使用Runnable创建线程
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"线程进入了run方法");
} catch (Exception e) {
e.printStackTrace();
}
}
},"AA").start();
Callable callable = new CreateThread2();
FutureTask<Long> futureTask1 = new FutureTask<>(callable);
new Thread(futureTask1, "BB").start();
for (int i=1;i<=10;i++){
Long resutl = futureTask1.get();
System.out.println(resutl);
}
}
}
结果:
8.JUC强大的辅助类
JUC中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过多时Lock锁的频繁操作。这三种辅助类为:
- CountDownLatch:减少计数
- CyclicBarrier:一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (循环栅栏)
- Semaphore:信号灯
下面分别介绍:
8.1 CountDownLatch
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行减1的操作,使用 await 方法等待计数器不大于0,然后继续执行 await 方法之后的语句。
CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这些线程会阻塞
其他线程调用 countDown 方法会将计数器减1(调用 countDown 方法的线程不会阻塞)
当计数器值为0时,因 await 方法阻塞的线程会被唤醒,继续执行。
场景:6 个同学陆续离开教室后值班同学才可以关门。
/**
* @author cpl
* @date 2023/2/18
* @apiNote
* JUC辅助类之CountDownLatch
*/
public class CountDownLatchDemo {
// 场景:6 个同学陆续离开教室后值班同学才可以关门。
public static void main(String[] args) {
for(int i=1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号同学离开教室");
},String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName()+"班长锁门了");
}
}
结果:
使用 CountDownLatch 后
结果:
锁门的那个县城永远是最后一个运行。
8.2 CyclicBarrier
CyclicBarrier 从字面理解大概是循环阻塞的意思,在使用CyclicBarrier的构造方法第一个参数是目标障碍数,每执行一次CyclicBarrier障碍数就会加1,如果达到了目标障碍数,才会执行 cyclicBarrier.await() 之后的语句。可以将 CyclicBarrier 理解为加1操作。
场景:等待约好一起出去玩的朋友
结果:
8.3 Semaphore
Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方法获得许可证,release 方法释放许可。
场景:抢车位,6辆汽车3个停车位
结果:
9.ReentrantReadWriteLock读写锁
9.1 读写锁介绍
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,Java 的并发包提供了读写锁 ReentrantReadWriteLock
,他表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排它锁。
9.1.1 线程进入读锁的前提条件:
- 没有其他线程的写锁
- 没有写请求,或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)
9.1.2 线程进入写锁的前提条件:
- 没有其他线程的读锁
- 没有其他线程的写锁
9.1.3 读写锁三个重要特征:
- 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平由于公平
- 重进入:读锁和写锁都支持线程重进入
- 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级称为读锁。
9.2 读写锁案例
场景:使用 ReentrantReadWriteLock 对一个 hashmap 进行读和写操作
未上读写锁的情况:
// 创建资源类
class MyCache {
// 创建Map结合
private volatile Map<String, Object> map = new HashMap<>();
// 创建读写锁
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 放数据
public void put(String key, Object value) {
// 上写锁
// readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "正在写操作" + key);
try {
// 暂停一会
TimeUnit.SECONDS.sleep(3);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放写锁
}
}
// 取数据
public Object get(String key) {
try {
System.out.println(Thread.currentThread().getName() + "正在读取操作" + key);
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "读完了" + key);
return result;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
// 创建多个写线程
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
myCache.put(num + "", num + "");
}, String.valueOf(i)).start();
}
// 创建多个读线程
for (int i = 1; i <= 5; i++) {
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
未上读写锁测试结果:
上读写锁:
在读写操作方法中加上相对应的读或写锁
上了读写锁测试结果:
可以看到这个结果很正常,并且(1写,1写完了,4写,4写完了…;而一起读取)也体现了 写锁是排它锁,读锁是共享锁
9.3 锁降级
锁降级:当某一个线程上了写锁之后,自己仍然可以上读锁,之后在释放写锁,这个过程就是锁降级
使用场景:当多线程情况下,更新完数据要立刻查询刚更新完的数据
(因更新完数据释放写锁后还持有读锁,所有线程要获得写锁都要等待读锁释放,这时持有读锁的线程可以查到刚更新完的数据)
弊端:适合读多写少的场景,如果锁降级的同时设置成了非公平锁可能会导致写锁很长时间获取不到
ReentrantReadWriteLock
ReentrantReadWriteLock
支持锁降级,但是不支持 锁升级
下面代码说明 ReentrantReadWriteLock
锁降级的使用
public class Degradate {
public static void main(String[] args) {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
// 锁降级 当前线程持有写锁,然后获得读锁,将写锁释放,这样就完成了锁降级
//锁降级
writeLock.lock();
System.out.println("获得写锁");
readLock.lock();
System.out.println("获得读锁");
writeLock.unlock();
System.out.println("释放写锁");
readLock.unlock();
System.out.println("释放读锁");
}
}
执行结果:
调换代码顺序:
结果:
可以看到,程序还在运行,但也只获得了读锁。从而证明了,读锁不能升级为写锁。
9.4 小结
文字描述:
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)
原因:当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程 “升级” 为写锁;而对于获得写锁的线程,他一定独占了读写锁,因此可以继续让他获取读锁,当他同时获取了读锁和写锁后,还可以先释放写锁继续持有读锁,这样一个写锁就 “降级” 为了读锁。
10.BlockingQueue阻塞队列
10.1 BlockingQueue简介
Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全”传输“数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员,包括他们各自的功能以及常见使用场景。
阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,往队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
常用的队列主要有以下两种:
- 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
- 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的时间(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要 BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的”生产者“和”消费者“模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
- 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
- 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒
10.2 常见的 BlockingQueue
10.2.1 ArrayBlockingQueue(常用)
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入
元素会导致操作受阻塞;试图从空队列中提取
元素将导致类似阻塞。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true
而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是公用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其
在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还
可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
一句话总结:ArrayBlockingQueue是一个由数组支持的有界阻塞队列
10.2.2 LinkedBlockingQueue(常用)
一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。
链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE
。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一
个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据
时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;
只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过
构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份
数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生
产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发
的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列
的并发性能
一句话总结:有链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以
10.2.3 DelayQueue
Delayed
元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed
元素。如果延迟都还没有期满,则队列没有头部,并且 poll
将返回 null
。当一个元素的 getDelay(TimeUnit.NANOSECONDS)
方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take
或 poll
移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size
方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
一句话总结:使用优先级队列实现的延迟误解阻塞队列。
10.2.4 PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来
决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而
只会在没有可消费的数据时,阻塞数据的消费者。
因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费
数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。
一句话总结:支持优先级排序的无界阻塞队列。
10.2.5 SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
- 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
- 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
一句话总结: 一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
10.2.6 LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
一句话总结:有链表组成的无界阻塞队列。
10.2.7 LinkedBlockingDeque
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。
对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况:
- 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常
- 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
一句话总结:有链表组成的双向阻塞队列。
10.3 小结
- 在多线程领域:所谓阻塞,在某些情况下会挂起线程(既阻塞),一旦条件满足,被挂起的线程又会自动被唤醒
- 为什么需要
BlockingQueue
? 在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
10.4 阻塞队列的核心方法
BlockingQueue
方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null
或 false
,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:
抛出异常 | 当阻塞队列满时,再往队列里add插入元素会抛出IllegalStateException: Queue full 当阻塞队列空时,再往队列外remove移除元素会抛NosuchElementException |
---|---|
特殊值 | 插入方法,成功true失败false 移除方法,成功返回队列的元素,队列里没有就返回null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据或响应中断退出 当阻塞队列空时,消费者线程视图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
放入数据:
- offer(E e);:表示如果可能的话,将anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回true,否则返回false。(笨方法不阻塞当前执行方法的线程)
- offer(E e, long timeout, TimeUnit unit) :可以设定等待时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
- put(E e):把 e 加入到 BlockingQueue 里,如果 BlockingQueue 没有空间,则调用此方法的线程被阻断,直到 BlockingQueue 里面有空间再继续。
获取数据:
- E poll() :检索并删除该队列的头部,如果队列为空则返回null
- poll(long timeout, TimeUnit unit) :检索并删除此队列的头部,在指定等待时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
- E take():检索并删除此队列的队头,若 BlockingQueue为空,阻断进入等待状态直到 BlockingQueue有新的数据被加入。
- drainTo(Collection<? super E> c):
- 用法1:将阻塞队列中的元素全部转移到给定的集合中
- 用法2:用于线程间通信,a线程给提交任务给b线程,b采用poll主动拉取的方式,如果没有值也不会阻塞,而不是使用take直接阻塞
11.ThreadPool线程池
11.1 线程池概述
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护者多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
**线程池优势:**线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,在从队列中取出任务来执行。
他的主要特点为:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的客观理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
- Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor ,Executors,ExectorService,ThreadPoolExecutor 这几个类。
11.2 线程池架构
Executor 是接口 ,Executor 是辅助工具类,就好像 Collection是接口,Collections是工具类。
11.3 线程池使用方式
- Executors.newFixedThreadPool(int nThreads):一池N线程
- Executors.newSingleThreadExecutor():一个任务一个任务执行,一池一线程
- Executors.newCachedThreadPool():线程池根据需求创建线程,可扩容,遇强则强
/**
* @author cpl
* @date 2023/2/20
* @apiNote
* 线程使用方式
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
// 一池5线程
// 银行的五个窗口
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
// 银行的一个窗口
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
// 一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();
try {
for (int i = 1; i <= 20; i++) {
// 执行
threadPool3.execute(() -> {
System.out.println(Thread.currentThread().getName() + "正在办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool3.shutdown();
}
}
}
对比 new Thread() 创建和销毁线程都非常耗时,使用线程池可以达到线程复用/重用
池化:享元模式,如连接池、常量池
11.4 线程池底层原理
这三种线程池都是创建一个ThreadPoolExecutor
11.5 线程池的七个参数
int corePoolSize:核心线程数量
int maximumPoolSize:最大线程数
long keepAliveTime:最大空闲时间
TimeUnit unit:keepAliveTime 时间单位
BlockingQueue<Runnable> workQueue:任务队列
ThreadFactory threadFactory:线程工厂
RejectedExecutionHandler handler:拒绝策略
其中有三个重要参数决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列。maximumPoolSize - 最大线程数。
当提交线程数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池线程数,直到达到 maximumPoolSize 最大线程数配置。此时,在多余的任务,则会触发线程池的拒绝策略。
总结来说,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize),就会触发线程池的拒绝策略。
11.6 线程池底层工作流程
- 首先,调用execute()执行方法时,先判断线程池中工作线程的数量小于corePoolSize,则调用addWork()方法,在addWorker()方法里创建新线程(判断当前线程池的状态,shutdown,stop等不执行,返回false)
- 创建线程成功,ReentrantLock加锁,放入HashSet 中解锁 t.start() 执行线程。如果大于corePoolSize,则调用 workQueue.offer(command) 放入阻塞队列中,阻塞队列采用 ReentrantLock 加锁保证线程安全
- 如果阻塞队列满了,调用addWorker(command , false) 方法,新建线程加入 HashSet 中,如果> maximumPoolSize ! addWorker(command,false)
true 调用 reject(command) 执行拒绝策略方法 - addWorker() 中线程创建好了,t.start() 执行任务就是调用 runWorker(Worker w) 方法 ,Runnable task = w.firstTask; 第一个任务直接执行,后续的调用getTask() 从队列中 for(;😉 调用 workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)或者 workQueue.task() 从阻塞队列头部获取任务,方法内部 ReentrantLock 加锁保证线程安全
JDK内置的拒绝策略:
- AbortPolicy(默认):直接抛出 RejectedExecutionException 异常组织系统正常运行
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略
注意事项:
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池
11.7 自定义线程池
/**
* @author cpl
* @date 2023/2/20
* @apiNote 自定义创建线程池
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "正在办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
12.Fork/Join分支合并框架
12.1 Fork/Join框架简介
Fork/Join 它可以将一个大的任务分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情:
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
**任务分割:**首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
**执行任务合并结果:**分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里获取数据,然后合并这些数据。
在Java的 Fork/Join 框架中,使用两个类完成上述操作
- ForkJoinTask:我们要使用Fork/Join 框架,首先要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join框架提供了两个子类:
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于又返回结果的任务
- ForkJoinPool:ForkJoin 需要通过 ForkJoinPool 来执行
- RecursiveTask:继承后可以实现递归(自己调自己)调用的任务
Fork/Join 框架的实现原理:
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool , 而ForkJoinWorkerThread 负责执行这些任务
12.2 Fork 方法
分支合并池: 类比=> 线程池
递归任务:继承后可以实现递归(自己调自己)调用的任务
12.3 Fork 方法的实现原理
当我们调用 ForkJoinTask 的 fork() 方法时,程序会把任务放在 ForkJoinWorkerThread 的 pushTask 的 workQueue 中,异步地执行这个任务,然后立即返回结果
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后在调用 ForkJoinPool 的 signalWork() 方法唤醒或创建一个工作线程来执行任务。代码如下:
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
12.4 Join 方法
Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看ForkJoinTask 的 join 方法的实现,代码如下:
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
他首先调用 doJoin() 方法,通过 doJion() 方法得到当前任务的状态来判断返回什么结果,任务状态有4种:
已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和 出现异常(EXCEPTIONAl)
- 如果任务状态是已完成,则直接返回执行结果
- 如果任务状态是被取消,则直接抛出 CancellationException
- 如果任务状态是抛出异常,则直接抛出对应的异常
12.5 doJoin() 方法的实现原理:
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
执行流程如下:
首先通过查询任务状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
如果没有执行完,则从任务数组里取出任务并执行
如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL
12.6 Fork/Join 框架的异常处理
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没有办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。
getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null
12.7 案例
案例:生成一个计算任务,计算 1+2+3…+1000,每 100 个数切分一个子任务
class MyTask extends RecursiveTask<Integer> {
/**
* 拆分差值不能超过10
*/
private final static Integer value = 10;
/**
* 二分法左端的值
*/
private int lVal;
/**
* 二分法右端的值
*/
private int rVal;
/**
* 计算的结果
*/
private int result;
public MyTask(int lVal, int rVal) {
this.lVal = lVal;
this.rVal = rVal;
}
/**
* 拆分与合并的过程
*
* @return
*/
@Override
protected Integer compute() {
// 判断差值是否大于10
if (rVal - lVal <= value) {
// 小于等于10做相加操作
for (int i = lVal; i <= rVal; i++) {
result = result + i;
}
} else {
// 大于10进一步做拆分
int mid = (lVal + rVal) / 2;
// 拆分左侧
MyTask t1 = new MyTask(lVal, mid);
t1.fork();
// 拆分右侧
MyTask t2 = new MyTask(mid + 1, rVal);
t2.fork();
// 合并结果
result = t1.join() + t2.join();
}
return result;
}
}
/**
*
* @author
* @date
*/
public class ForkJoinTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建任务对象
MyTask myyTask = new MyTask(1,100 );
// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myyTask);
// 获取最终合并之后的结果
Integer result = forkJoinTask.get();
System.out.println(result);
forkJoinPool.shutdown();
}
}
测试结果:
13.CompletableFuture异步回调
13.1 CompletaleFuture 简介
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。
CompletableFuture 实现了 Future,CompletionStage 接口,实现了 Future 接口就可以兼容现在有现成池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类。
13.2 Future 与 CompletableFuture
Future 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Future ,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:
(1) 不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2) 不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能
(3) 不支持链式调用
对于 Future 执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的
(4) 不支持多个 Future 合并
比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的
(5) 不支持异常处理
Future 的 API 没有任何的异常处理的 API,所以在异步运行时,如果除了问题是不好定位的
13.3 CompletableFuture 入门
13.3.1 使用 CompletableFuture
场景:主线程里面创建了一个 CompletableFuture ,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 异步调用 没有返回值
*/
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " completableFuture1");
});
completableFuture1.get();
/**
* 异步调用 有返回值
*/
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " integerCompletableFuture");
//int a = 1 / 0;
return 1024;
});
integerCompletableFuture.whenComplete((t, u) -> {
System.out.println("-----" + t);// 正常返回值
System.out.println("-----" + u);// 异常信息
});
}
}
演示结果:
Task.get();
System.out.println(result);
forkJoinPool.shutdown();
}
}
测试结果:
[外链图片转存中...(img-wJFcu51G-1688185732866)]
## 13.CompletableFuture异步回调
### 13.1 CompletaleFuture 简介
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。
CompletableFuture 实现了 Future,CompletionStage 接口,实现了 Future 接口就可以兼容现在有现成池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类。
### 13.2 Future 与 CompletableFuture
Future 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Future ,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
**Future 的主要缺点如下:**
(1) 不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2) 不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能
(3) 不支持链式调用
对于 Future 执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的
(4) 不支持多个 Future 合并
比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的
(5) 不支持异常处理
Future 的 API 没有任何的异常处理的 API,所以在异步运行时,如果除了问题是不好定位的
### 13.3 CompletableFuture 入门
#### 13.3.1 使用 CompletableFuture
场景:主线程里面创建了一个 CompletableFuture ,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
```java
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 异步调用 没有返回值
*/
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " completableFuture1");
});
completableFuture1.get();
/**
* 异步调用 有返回值
*/
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " integerCompletableFuture");
//int a = 1 / 0;
return 1024;
});
integerCompletableFuture.whenComplete((t, u) -> {
System.out.println("-----" + t);// 正常返回值
System.out.println("-----" + u);// 异常信息
});
}
}
演示结果: