一、读写锁
读写锁概述
1️⃣ 什么是读写锁?
读写锁是一种多线程同步机制,用于在多线程环境中保护共享资源的访问
与互斥锁不同的是,读写锁允许多个线程同时读取共享资源,但在有线程请求写操作时,必须将其他读写锁的请求等待,只有该线程可以写入共享资源
读写锁可以提高并发访问性能,特别适用于读多写少的场景。在存在多个读线程的情况下,读写锁可以避免串行化读的过程,从而提高读取操作的并发性能。
读写锁的实现一般包括两种状态:读态和写态
- 在读态下,多个线程可以同时获得读锁,共享资源可以被多个线程同时访问,并且读锁的数量可以动态调整
- 在写态下,任何一个线程获得了写锁,都将暂停其他任何读锁和写锁操作执行,直到写锁的操作完成并释放锁
需要注意的是,读写锁也存在死锁问题 ,即当写锁等待获取读锁,而读锁持有者等待获取写锁时,会形成死锁,因此使用读写锁时需要特别注意避免死锁
2️⃣ 获取锁的条件:
- 线程进入读锁的前提条件:
- 没有其他线程的写锁
- 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)
- 线程进入写锁的前提条件:
- 没有其他线程的读锁
- 没有其他线程的写锁
3️⃣ 读写锁有以下三个重要的特性:
(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
(2)重进入:读锁和写锁都支持线程重进入
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁
4️⃣ 读写锁和互斥锁有什么区别?
互斥锁和读写锁都是多线程同步机制,都可以用于保护共享资源的访问。它们之间的主要区别在于锁的类型和锁的粒度
互斥锁是一种独占锁(也称为排他锁),一次只能有一个线程持有该锁,如果有其他线程请求该锁,则必须等待前一个线程释放锁。它适用于读写操作频率较少,而需要保证资源访问的原子性和同步性的情况
读写锁是一种共享锁,允许多个线程同时持有读锁进行读操作,但是在某个线程持有写锁进行写操作时,其他所有的读写锁请求线程都必须等待释放。它适用于读操作频率较高,而写操作的频率较少的情况
因此,互斥锁比读写锁更适用于对共享资源进行频繁的读写的场景,而读写锁则更适用于对共享资源进行频繁读取的场景
5️⃣ 我们需要了解几个概念:
(1)乐观锁与悲观锁
乐观锁和悲观锁是用于多线程并发控制的两种不同的思路,其中乐观锁和悲观锁的实现方式和适用场景也各不相同
乐观锁是一种基于版本号或时间戳的乐观并发控制机制。它假设对于同一数据的并发执行操作的互斥性较低,因此在访问数据时不会上锁,而是通过比较数据的版本号或时间戳,判断是否可以执行更新操作。如果在比较过程中发现数据已经被其他线程更改,则当前线程的操作会被取消并重新执行。 乐观锁适用于读多写少的情况,因为一旦发生冲突需要重新操作,其假设并发度较高,比较适用于并发不太激烈的场景。
悲观锁是一种基于排他锁的悲观并发控制机制。它假设对于同一数据的并发执行操作的互斥性较高,因此在访问数据时会上锁,避免其他线程同时访问数据。只有当前线程释放锁后,其他线程才能继续访问这个数据。悲观锁适用于读写差别较小的情况,因为一旦上锁后,其他线程只能等待,对同步的执行顺序有较强的控制能力,比较适用于并发激烈的场景。
两种锁在实现上也有不同,乐观锁的实现方式包括使用 CAS 操作和基于版本号或时间戳的比较;悲观锁的实现方式包括使用互斥锁、读写锁和信号量等
锁的选择取决于并发操作的特点和对数据一致性的要求。
(2)行锁与表锁
表锁和行锁是数据库中常见的锁类型,用于对数据进行并发控制和保证数据一致性
表锁指锁定整张表,以保证并发操作的原子性和同步性
在表锁定状态下,其他并发操作需要等待锁释放才能继续执行。表锁适用于对整张表进行操作,或表中的数据行在短时间内不会被频繁修改的情况。
行锁则是锁定表中的某行(或某些行),以保证并发操作的互斥性
在行锁定状态下,其他并发操作只能等待锁释放后才能继续执行。行锁适用于对表中的数据行进行频繁修改的情况
相较于表锁,行锁有更好的并发性和灵活性,可以避免不必要的等待时间和死锁问题
但是行锁的耗时和占用内存资源相对较大,因此它的实现也需要选取适合的算法和技术
需要根据实际情况选择表锁或行锁来保证数据库操作的正确性和高效性。在实际应用中,也需要考虑锁定粒度、锁定记录数量、锁的占用时间等因素,以提高并发性和降低系统开销。
读写锁案例
JAVA 的并发包提供了读写锁 ReentrantReadWriteLock,它表示两个锁,一个是共享锁(读锁);一个是排他锁(写锁)
1️⃣ 接下来我们不使用锁来实现读写数据的案例
package com.atguigu.threeday;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Bonbons
* @version 1.0
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for(int i = 1; i <= 6; i++){
final int num = i;
new Thread(() -> {
// Variable used in lambda expression should be final or effectively final
myCache.put(num + "", num);
}, String.valueOf(i)).start();
}
for(int i = 1; i <= 6; i++){
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
class MyCache{
// 定义Map集合
Map<String, Object> map = new HashMap<String, Object>();
// 添加操作
public void put(String key, Object value){
try {
System.out.println(Thread.currentThread().getName() + "正在添加数据");
map.put(key, value);
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "添加数据成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 查询操作
public Object get(String key){
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + "正在读取数据");
result = map.get(key);
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "读取数据成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
}
}
我们发现不使用锁会出现一个问题:
当我们没有写完数据呢,已经开始读了,这就导致了无法读到数据
2️⃣ 使用ReentrantReadWriteLock改进
package com.atguigu.threeday;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Bonbons
* @version 1.0
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for(int i = 1; i <= 6; i++){
final int num = i;
new Thread(() -> {
// Variable used in lambda expression should be final or effectively final
myCache.put(num + "", num);
}, String.valueOf(i)).start();
}
for(int i = 1; i <= 6; i++){
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
class MyCache{
// 定义Map集合
Map<String, Object> map = new HashMap<String, Object>();
// 创建读写锁
ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 添加操作
public void put(String key, Object value){
// 添加写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在添加数据");
map.put(key, value);
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "添加数据成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 释放写锁
rwLock.writeLock().unlock();
}
}
// 查询操作
public Object get(String key){
// 添加读锁
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + "正在读取数据");
result = map.get(key);
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "读取数据成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 释放读锁
rwLock.readLock().unlock();
}
return result;
}
}
此时我们发现,当写线程执行后,只有释放锁了才能开始下一个写线程的写操作
当所有写线程结束后,所有读线程同时开始读取数据
读写锁深入
1️⃣ 读写锁的演变过程
2️⃣ 读写锁降级
读写锁降级是指在持有写锁的情况下,又获取锁的过程中申请读锁,然后释放写锁的过程,而不是直接释放写锁
这种技术可以避免频繁的锁状态切换,从而提高应用程序的性能和响应速度。
读写锁降级的过程通常包括以下几个步骤:
- 获取写锁
- 对共享资源进行写操作
- 在持有写锁的情况下,获取读锁
- 释放写锁
- 对共享资源进行读操作
- 释放读锁
通过读写锁降级,可以有效避免在读写锁状态切换时,由于锁状态不一致导致的死锁和线程饥饿等问题。同时,读写锁降级的实现也需要注意避免潜在的数据竞争等问题,以保证程序的正确性和稳定性。
需要注意的是,读写锁降级是一种高级应用技术,在实际应用中需要根据具体的场景和需求进行合理的设计和实现。如果不慎处理不当,可能会导致一系列的死锁和数据一致性问题。
package com.atguigu.threeday;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Bonbons
* @version 1.0
* 演示读锁不能升级为写锁,就是没有读完不能写
*/
public class ReadToWrite {
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
// 添加读锁
readLock.lock();
System.out.println("正在读数据");
// 添加写锁
writeLock.lock();
System.out.println("正在写数据");
// 释放读锁
readLock.unlock();
// 释放写锁
writeLock.unlock();
}
}
但是读锁不能升级为写锁:
读锁不能升级为写锁(也被称为不能升级为互斥锁),是因为它会导致死锁和并发性能问题。这是因为,升级锁的过程本质上是从一种锁类型切换到另一种锁类型,而这个过程需要释放原有的锁并重新获取新的锁。如果尝试将读锁升级为写锁,就需要先将所有持有该读锁的线程释放该锁,才能申请写锁,这个过程称为“升级梯队”(promotion hierarchy),从而导致线程等待和性能下降。
直白一点:就是读的过程中不能写,因为可能存在多个读进程,我当前的进程如果修改了数据,那么其他读进程读到的数据可能不一致
3️⃣ 案例演示:
package com.atguigu.threeday;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Bonbons
* @version 1.0
* 演示读锁不能升级为写锁,就是没有读完不能写
*/
public class ReadToWrite {
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
// 添加读锁
readLock.lock();
System.out.println("正在读数据");
// 添加写锁
writeLock.lock();
System.out.println("正在写数据");
// 释放读锁
readLock.unlock();
// 释放写锁
writeLock.unlock();
}
}
二、阻塞队列
阻塞队列概述
1️⃣ 什么是阻塞队列?
Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员,包括他们各自的功能以及常见使用场景。
阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出
- 当队列是空的,从队列中获取元素的操作将会被阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
2️⃣ 常用的队列
- 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
- 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
3️⃣ 为什么需要 BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue 都给你一手包办了
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
- 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
- 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒
4️⃣ 什么是 BlockingQueue?
BlockingQueue 是一个接口,是Java中用于多线程编程的队列。它可以在多线程中实现线程数据的传递和控制,是一个多线程并发编程的重要工具
BlockingQueue接口继承自java.util.Queue,是一个阻塞队列。它提供了多种阻塞队列的实现方式,如ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue和SynchronousQueue等。
BlockingQueue的主要特点是线程安全、阻塞等待和队列容量限制等。它提供了一些方法,如**put()、take()、putIfAbsent()、offer()、poll()**等方法,这些方法的实现都是线程安全的,并且可以通过方法的阻塞等待和队列容量限制等来控制多线程并发访问的顺序和数量。
BlockingQueue的使用场景包括但不限于:流量控制、任务调度、事件处理等场景
阻塞队列的架构
阻塞队列的架构通常由三部分组成:
- 队列:用于存储数据的数据结构。阻塞队列可以使用数组、链表、栈等数据结构来实现。在每个队列上,都有一个指针或游标(cursor),指向即将读或写的元素。
- 同步器:用于实现多线程同步的机制。同步器可以是锁、信号量、条件等。通过同步器,阻塞队列可以实现线程阻塞、获取锁和释放锁等操作。
- 底层实现:用于实现阻塞队列的底层逻辑。底层实现通常包括如何处理线程阻塞、通知其他线程和释放锁等。
阻塞队列的架构可以根据实际应用和需求进行优化和改进。例如,可以通过精细的锁机制、缓存池和批处理等技术来提高队列的并发性和性能
还可以借助内存映射文件、消息队列和共享内存等机制来实现高效地数据传输和交换
在实际开发中,需要根据实际场景和需求来选择阻塞队列的具体实现方式,并定制化相关参数以优化性能。
阻塞队列分类
阻塞队列是Java多线程中常用的一种数据结构,它具有多用途且灵活的特点,可以应用在生产消费者模式、流量控制、线程池等多种场景中。根据实现方式和特性,阻塞队列一般可以分为以下几种类型:
ArrayBlockingQueue:数组实现的阻塞队列,有固定容量。当队列达到容量限制时,put方法就会阻塞,直到队列有位置空出来。在读取队列数据时也遵循类似的阻塞规则
LinkedBlockingQueue:链表实现的阻塞队列,可以指定容量,如果不指定则默认为Integer.MAX_VALUE。如果容量已满,put方法就会阻塞,直到队列有位置空出来。在读取队列数据时也遵循类似的阻塞规则
DelayQueue:基于优先级堆的延时队列,元素必须实现Delayed接口。这种队列中的元素只有在指定的时间后才会被消费。如果队列中没有可消费的元素,则take方法会阻塞,直到消费元素为止
PriorityBlockingQueue:基于优先级的无界阻塞队列,元素需要实现Comparable接口或传入比较器Comparator。该队列是按照内部元素的优先级排序,每次出队列的是优先级最高的元素。如果队列为空,则take方法会阻塞,直到有元素进队列
SynchronousQueue:=容量为1的阻塞同步队列,生产者线程将元素放入队列中之后就会被阻塞,直到消费者线程取走这个元素后才继续执行。这种队列可以用于传递任务和交换数据等场景
总之,在具体使用时,需要根据不同场景和需求来选择适合的阻塞队列类型,并了解各种阻塞队列的特性和限制,以便做出合理的设计和实现。
阻塞队列核心方法
接下来通过代码演示具体的使用方法:
1️⃣ 第一组:超出范围会抛出异常
package com.atguigu.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author Bonbons
* @version 1.0
*/
public class BlockingQueueDemo {
public static void main(String[] args) {
// 容量我们设置为2,只能放两个元素
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(2);
// 使用add添加元素
for (int i = 0; i < 3; i++) {
final int num = i;
new Thread(() -> {
queue.add(num);
System.out.println(Thread.currentThread().getName() + "添加了一个元素");
}, String.valueOf(i + 1)).start();
}
}
}
// 使用add添加元素
for (int i = 0; i < 2; i++) {
final int num = i;
new Thread(() -> {
queue.add(num);
System.out.println(Thread.currentThread().getName() + "添加了一个元素");
}, String.valueOf(i + 1)).start();
}
//使用remove删除元素,队列元素个数为0时调用此方法会抛出异常
for (int i = 0; i < 3; i++) {
final int num = i;
new Thread(() -> {
queue.remove();
System.out.println(Thread.currentThread().getName() + "删除了一个元素");
}, String.valueOf(i + 1)).start();
}
2️⃣ 超出范围会返回特殊值:
package com.atguigu.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author Bonbons
* @version 1.0
*/
public class BlockingQueueDemo {
public static void main(String[] args) {
// 容量我们设置为2,只能放两个元素
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(2);
// 使用offer添加元素
for (int i = 0; i < 3; i++) {
final int num = i;
new Thread(() -> {
System.out.println("添加元素是否成功: " + queue.offer(num));
}, String.valueOf(i + 1)).start();
}
}
}
如果添加元素成功就会返回 true,当队列已经达到了容量的最大值,那么就会添加失败返回 false
// 使用offer添加元素
for (int i = 0; i < 2; i++) {
final int num = i;
new Thread(() -> {
System.out.println("添加元素是否成功: " + queue.offer(num));
}, String.valueOf(i + 1)).start();
}
// 使用poll删除元素
for (int i = 0; i < 3; i++) {
final int num = i;
new Thread(() -> {
System.out.println("删除元素: " + queue.poll());
}, String.valueOf(i + 1)).start();
}
当队列已经为空时,再调用poll方法删除元素会返回 null,如果有元素就会返回删除的元素
3️⃣ 超过范围就会导致队列处于阻塞状态
(1)队列已满添加元素
package com.atguigu.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author Bonbons
* @version 1.0
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 容量我们设置为2,只能放两个元素
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(2);
// 使用put添加元素
for (int i = 0; i < 3; i++) {
queue.put(i);
}
}
}
(2)队列为空删除元素
package com.atguigu.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author Bonbons
* @version 1.0
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 容量我们设置为2,只能放两个元素
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(2);
// 使用put添加元素
for (int i = 0; i < 2; i++) {
queue.put(i);
}
// 使用take取出元素
for (int i = 0; i < 3; i++) {
System.out.println(queue.take());
}
}
}
三、线程池
线程池概述
线程池是一种管理和重用线程的机制,它允许可以多次运行的任务被交给一组预定义的线程,而不是每次都创建新线程
在这个模式中,当一个线程完成了它的工作后,它不会被销毁,而是继续等待处理新的任务。线程池可以 减少线程的创建和销毁次数,降低系统资源的消耗,提高程序的运行效率
- 线程池通常由三个部分组成:
- 任务队列 用于存储任务;
- 线程池管理器 用于管理线程,包括线程的创建、销毁、监控和调度等
- 任务执行器 用于处理任务队列中的任务,通常采用了先进先出(FIFO)策略,从任务队列中取出一个任务分配给线程池中的一个可用线程执行
线程池的好处包括:提高系统吞吐量、降低线程创建和销毁的开销、统一任务分配和执行、对于资源的管理和可控等
- 常用的线程池有四种类型:
- FixedThreadPool和SingleThreadExecutor对于任务量和线程数都是固定的
- CachedThreadPool可以创建任意数量的线程,但是需要确保有足够的系统资源
- ScheduledThreadPoolExecutor则用于实现任务的定时执行或周期执行
需要注意的是,在使用线程池时需要避免出现线程泄露、资源争夺、线程饥饿等问题。并且,需要根据不同的应用场景和需求来选择合适的线程池类型和参数,以充分发挥线程池的优势。
线程池架构
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executor,ExecutorService,ThreadPoolExecutor 这几个类
线程池使用方式
先介绍在 Java 中,创建线程池有以下三种方式:
(1)使用 Executors 工厂类创建线程池:
Executors 是 Java 提供的一个工厂类,可以用它来创建不同类型的线程池
常用的有 newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor 等
Executors 工厂类创建线程池:
// 创建一个固定大小的线程池,大小为 5
ExecutorService executor = Executors.newFixedThreadPool(5);
// 创建一个可以缓存线程的线程池,大小为 Integer.MAX_VALUE。当有新任务时,如果没有空闲的线程,则创建一个新线程执行任务。如果线程超过 60 秒没用,就会被回收
ExecutorService executor = Executors.newCachedThreadPool();
// 创建一个只有一个线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
(2)继承 ThreadPoolExecutor 类并自定义线程池:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable>workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// 具体的实现逻辑
}
(3)直接使用 ForkJoinPool 类创建线程池:
// 创建一个默认大小的 ForkJoinPool 线程池
ForkJoinPool executor = new ForkJoinPool();
注意:无论哪种方式创建线程池,都应该在使用完毕之后调用 shutdown 方法关闭线程池,以释放线程池占用的资源。
1️⃣ Executors.newFixedThreadPool(int) 一池N线程
package com.atguigu.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Bonbons
* @version 1.0
* 演示1池N线程的情况
*/
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try{
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
2️⃣ Executors.newSingleThreadExecutor() 一个任务一个任务执行,一池一线程
package com.atguigu.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Bonbons
* @version 1.0
* 演示1池1线程的情况
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
try{
// 一个线程处理十个任务
for (int i = 1; i <= 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool1.shutdown();
}
}
}
3️⃣ Executors.newCachedThreadPool() 线程池根据需求创建线程,可扩容,遇强则强
package com.atguigu.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Bonbons
* @version 1.0
* 演示1池可扩容的线程池
*/
public class ThreadPoolDemo3 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
try{
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
线程池底层原理
线程池的七个参数
线程池底层工作流程
1️⃣ 底层工作流程
- 任务提交后,先看常驻核心线程数量是否达到最大值
- 没有达到,就创建核心线程执行任务;达到了就去判断等待队列是否已经满了
- 等待队列还有位置,就将当前任务添加到等待队列中;等待队列没有位置就去判断是否达到了最大线程池数量
- 没有达到就创建非核心线程执行任务,达到了就执行我们的拒绝策略处理任务
- 等待队列还有位置,就将当前任务添加到等待队列中;等待队列没有位置就去判断是否达到了最大线程池数量
- 没有达到,就创建核心线程执行任务;达到了就去判断等待队列是否已经满了
2️⃣ 四种拒绝策略
在 Java 中,线程池是通过 Executor 框架来实现的。当线程池达到容量上限并且任务队列已满时,会触发拒绝策略,即无法继续执行新的任务。JDK 内置的四种拒绝策略如下:
- 直接抛出异常(AbortPolicy):线程池默认的拒绝策略。当线程池达到容量上限并且任务队列已满时,会抛出一个 RejectedExecutionException 异常
- 队列满时丢弃新任务(DiscardPolicy):当线程池达到容量上限并且任务队列已满时,直接丢弃新提交的任务,不做任何处理
- 队列满时丢弃队列最早的任务(DiscardOldestPolicy):当线程池达到容量上限并且任务队列已满时,丢弃队列中最早加入的任务,并尝试将新任务添加到队列末尾
- 将任务交给调用线程来执行(CallerRunsPolicy):当线程池达到容量上限并且任务队列已满时,将任务交给调用线程来执行。也就是说,任务将不会被线程池中的线程执行,而是由调用 execute 方法的线程来执行
可以通过 ThreadPoolExecutor 的构造函数或者 setRejectedExecutionHandler 方法来设置拒绝策略,例如:
ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());
// 或者
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
需要注意的是,当线程池处于关闭状态时,无论使用什么拒绝策略,新提交的任务都会被直接拒绝。因此,在关闭线程池前应该确保所有任务都已经提交并执行完毕。
自定义线程池
1️⃣ 我们使用 ThreadPoolExecutor 自定义线程池
package com.atguigu.pool;
import java.util.concurrent.*;
/**
* @author Bonbons
* @version 1.0
* 演示自定义线程池
*/
public class ThreadPoolDemo4 extends ThreadPoolExecutor {
public ThreadPoolDemo4(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public static void main(String[] args) {
// 自定义线程池
ThreadPoolExecutor threadService = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy()
);
// 模拟十个顾客发起请求
try{
for (int i = 1; i <= 10; i++) {
threadService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "窗口卖票");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "售票结束");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 关闭线程池
threadService.shutdown();
}
}
}
2️⃣ Executor 和 ThreadPoolExecutor 的关系:
在 Java 中,ExecutorService 和 ThreadPoolExecutor 都是用来管理线程池的类,它们之间的关系可以理解为“面向接口编程”
ExecutorService 是一个接口,定义了线程池的基本功能,如提交任务、关闭线程池等。ThreadPoolExecutor 则是 ExecutorService 接口的一个实现类,实现了线程池的具体逻辑
ThreadPoolExecutor 中对于线程池的管理和任务执行的细节进行了更加详细的配置,可以根据实际需求来调整线程池的容量大小、拒绝策略、任务队列类型等参数
-
因此,ExecutorService 和 ThreadPoolExecutor 的主要区别在于:
- 功能不同:
- ExecutorService 定义了线程池的基本功能
- 而 ThreadPoolExecutor 是实现了这些功能的具体线程池类
- 接口与实现:
- ExecutorService 是一个接口,可以通过工厂方法 Executors 创建不同类型的线程池,例如 newFixedThreadPool、newCachedThreadPool 等
- ThreadPoolExecutor 是 ExecutorService 接口的一个具体实现类,它可以被自定义实现,也可以使用默认的实现策略
- 粒度不同:
- ExecutorService 对线程池的管理和控制的粒度较为粗略,只提供了基本的线程池接口
- 而 ThreadPoolExecutor 提供了更细致的配置参数,可以实现更全面、更精细的线程池控制和管理
- 功能不同:
因此,在实际开发中,可以根据具体需求选择使用 ExecutorService 接口或者 ThreadPoolExecutor 类来管理线程池
如果需要更加灵活、细节化的线程池控制,就可以选择使用 ThreadPoolExecutor 类
如果只需要基本的线程池功能,而不关心细节,可以使用 ExecutorService 接口或工厂方法来快速创建线程池并提交任务
四、Fork/Join分支合并框架
1️⃣ 什么是Fork/Join分支合并框架?
Fork/Join(分支合并)框架是在 Java 7 中引入的一种并行计算框架,旨在通过利用多个处理器来提高程序的执行效率
该框架的特点是能够 将一个大任务拆分成多个小任务并行执行,然后将这些小任务的结果合并成最终的结果
-
Fork/Join 框架主要有两个核心组件:ForkJoinPool 和 ForkJoinTask
-
ForkJoinPool 是一种特殊的线程池,它内部维护了一个工作队列和一个任务队列,用于管理 ForkJoinTask
-
在 ForkJoinPool 中,所有的线程都是 Worker 线程,它们会从工作队列中获取任务并执行
-
ForkJoinTask 则是具体的任务实现类,它可以是一个大任务,也可以是将一个大任务拆分成的小任务
-
-
ForkJoinTask 中有两个核心方法:fork 和 join
- fork 方法用于将一个大任务拆分成多个小任务,并将这些小任务提交到 ForkJoinPool 中执行。
- join 方法则用于等待所有子任务执行完成,并将它们的结果合并成最终的结果
除了 fork 和 join 方法之外,ForkJoinTask 还提供了其他一些有用的方法,例如 getSurplusQueuedTaskCount 用于获取等待执行的任务数量等
在 Fork/Join 框架中,一般会通过递归的方式将一个大任务不断拆分成更小的任务,直到每个任务都足够小,无法再继续拆分为止,然后将最小的任务提交给 ForkJoinPool 中的 Worker 线程执行,最后将所有任务的执行结果合并成最终的结果
总之,Fork/Join(分支合并)框架可以大幅提高并行计算的效率,尤其是在多核处理器的系统中更为明显,但需要开发者针对实际业务场景来合理运用
2️⃣ 案例演示:1-100加和,最大的差值不能超过十
package com.atguigu.forkjoin;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @author Bonbons
* @version 1.0
* 拆分合并演示
*/
public class ForkJoinDemo {
public static void main(String[] args) {
// 创建MyTask对象
MyTask myTask = new MyTask(0, 100);
// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
// 获取最终合并之后的结果
try {
Integer result = forkJoinTask.get();
System.out.println(result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}finally {
// 关闭连接池
forkJoinPool.shutdown();
}
}
}
class MyTask extends RecursiveTask<Integer>{
// 定义几个变量
private static final Integer VALUE = 10;
private int start;
private int end;
private int result;
// 创建有参构造
public MyTask(int start, int end){
this.start = start;
this.end = end;
}
// 实现拆分细节
@Override
protected Integer compute() {
// 判断是否满足拆分条件
if((end - start) > VALUE){
// 获取中间值
int mid = (end + start) / 2;
// 拆分左边
MyTask task1 = new MyTask(start, mid);
MyTask task2 = new MyTask(mid + 1, end);
//执行拆分
task1.fork();
task2.fork();
//合并 [join方法的作用是阻塞当前线程并等待获取结果]
result = task1.join() + task2.join();
}else{
// 将范围内所有的元素进行计算
for (int i = start; i <= end; i++) {
result += i;
}
}
return result;
}
}
执行结果如下:
五、CompletableFuture异步回调
1️⃣ 我们需要知道什么是 CompletableFuture ?
CompletableFuture 是 Java 8 中引入的一种新型异步编程方式(也是 Java 平台上第一种真正意义上的异步编程方式),它为异步回调提供了强大而灵活的支持
CompletableFuture 提供了一种简洁的方式来处理异步操作的结果,这种方式允许你在执行异步计算时,指定回调函数,一旦计算完成,就可以自动触发这些回调函数
异步回调为异步编程提供了一种友好的机制,它将回调函数作为异步任务完成后所执行的函数传递给异步任务本身。在异步任务完成后,回调函数被自动触发执行,确保异步任务的结果可以被处理。
2️⃣ Java 8 CompletableFuture 类提供了如下异步回调方法:
- thenApply():将一个 CompletableFuture 的结果传递给一个函数,并返回一个代表结果的 CompletableFuture
- thenAccept():接受一个 Consumer 并在 CompletableFuture 完成时执行该 Consumer
- thenRun():当 CompletableFuture 完成时执行一个 Runnable
- thenCompose():当 CompletableFuture 完成时,将一个函数作为参数,并返回一个 CompletableFuture
- handle():传递一个 BiFunction 接口的实例,它的参数为 CompletableFuture 异步计算结果和异常,可以对结果进行相应处理
- whenComplete():当 CompletableFuture 触发时,执行一个 BiConsumer 接口的实例
3️⃣ CompletableFuture 的几个核心特性:
-
异步执行:CompletableFuture 可以在子线程中执行异步任务,避免阻塞主线程
-
回调通知:在异步任务完成时,可以触发回调通知,通知函数可以是同步或异步的方法
-
链式调用:CompletableFuture 可以通过链式调用方式完成多个异步任务之间的依赖和合并
-
合并操作:CompletableFuture 提供了多种合并方法,可以将多个 CompletableFuture 的结果进行合并,并最终获得一个结果
4️⃣ Completable 的两个异步调用方法介绍
runAsync 和 supplyAsync 是 Java 8 中 CompletableFuture 类提供的异步执行任务的方法
(1)runAsync 方法使用 Runnable 函数式接口
允许我们在后台执行一个没有返回值的异步任务。该方法返回一个 CompletableFuture<Void>
对象
下面是一个例子,展示如何使用 runAsync 启动一个异步任务:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 异步任务
System.out.println("Async Task is running...");
});
(2)supplyAsync 方法使用 Supplier 函数式接口
允许我们在后台执行一个有返回值的异步任务,该方法返回一个 CompletableFuture<U>
对象,其中 U 是我们所定义的返回值类型
下面是一个例子,展示如何使用 supplyAsync 启动一个异步任务,并在任务完成后获取它的返回值:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异步任务
return "Async Task is Done.";
});
String result = future.get(); // 在这里等待异步任务完成并获取返回结果
System.out.println(result);
需要注意的是,get() 方法会阻塞调用线程,直到任务执行完成并返回结果。如果任务执行时间很长,这种方式可能会对性能造成影响。因此,我们可以通过提供回调函数来在任务完成时获得返回值,而不必等待任务完成:
CompletableFuture.supplyAsync(() -> {
// 异步任务
return "Async Task is Done.";
}).thenAccept(result -> {
// 异步任务完成之后的回调方法
System.out.println(result);
});
当任务成功完成时,thenAccept 方法会异步地调用回调函数。如果任务抛出异常,则会调用 exceptionally 方法提供的回调函数
5️⃣ 案例演示:
package com.atguigu.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Bonbons
* @version 1.0
* 演示有返回值的异步调用和没有返回值的异步调用
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception{
// 异步调用没有返回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "completableFuture1");
});
completableFuture1.get();
// 异步调用有返回值
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "completableFuture2");
//模拟出现异常
// int i = 1 / 0;
return 1024;
});
completableFuture2.whenCompleteAsync((t, u) -> {
// 第一个参数t获取到的是返回值,第二个参数u获取到的是异常信息
System.out.println("t = " + t);
System.out.println("u = " + u);
}).get();
}
}
当我们在返回值前添加一个异常时, u 就会保存异常信息