文章目录
- 0:AQS简介-常见面试题
- AQS具备特性
- state表示资源的可用状态
- AQS定义两种资源共享方式
- AQS定义两种队列
- 自定义同步器实现时主要实现以下几种方法:
- 同步等待队列
- 条件等待队列
- 1:AQS应用之ReentrantLock
- ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
- ReentrantLocak源码流程图
- ReentrantLock加锁示例代码-代码git地址
- 2:AQS应用之Semapho
- Semaphore 是什么?
- 怎么使用 Semaphore?
- 构造方法
- 重要方法
- 基本使用
- Semapho源码流程图
- Semapho示例代码git地址
- 3:AQS应用之CountDownLatch
- CountDownLatch是什么?
- CountDownLatch如何工作?
- API
- CountDownLatch应用场景例子
- 代码如下:
- CountDownLatch源码流程图
- CountDownLatch示例代码git地址
- 4:CyclicBarrier
- API
- 应用场景
- 示例代码:
- CyclicBarrier示例代码git地址
- 5:AQS应用之BlockingQueue
- 队列类型
- 队列数据结构
- 常见的4种阻塞队列
- ArrayBlockingQ
- LinkedBlockingQueue
- DelayQueue
- BlockingQueue API
- 添加元素
- 检索元素
- 多线程生产者-消费者示例
- BlockingQueue各种队列底层流程图
- BlockingQueue代码示例
0:AQS简介-常见面试题
- Java并发编程核心在于java.concurrent.util包
- 而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,
- AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
AQS具备特性
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
- 除了Lock外,Java.concurrent.util当中同步器的实现Latch,Barrier,BlockingQueue等,都是基于AQS框架实现。
- 一般通过定义内部类Sync继承AQS
- 将同步器所有调用都映射到Sync对应的方法
- AQS内部维护属性volatile int state (32位)
state表示资源的可用状态
State三种访问方式
getState()、setState()、compareAndSetState()
AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,Semaphore/CountDownLatch
AQS定义两种队列
- 同步等待队列
- 条件等待队列
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():
该线程是否正在独占资源。只有用到condition才需要去实现它 - tryAcquire(int):
独占方式。尝试获取资源,成功则返回true,失败则返回false。 - tryRelease(int):
独占方式。尝试释放资源,成功则返回true,失败则返回false。 - tryAcquireShared(int):
共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 - tryReleaseShared(int):
共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
同步等待队列
- AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
条件等待队列
- Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
1:AQS应用之ReentrantLock
- ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
- 而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
//使用ReentrantLock进行同步
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
lock.lock(); //加锁
lock.unlock(); //解锁
ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
- 在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
1、FairSync 公平锁的实现
2、NonfairSync 非公平锁的实现 - 这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁,默认实现的非公平锁
package com.zgs.lock.reentrant_lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author guisong.zhang
* @date 2024/3/9 23:27
* @description ReentrantLock实现加锁示例代码 取出
*/
public class Test {
// private static ReentrantLock reentrantLock = new ReentrantLock();
private static MyLock reentrantLock = new MyLock();
public static void main(String[] args) {
new Thread(() -> {
reentrantLock.lock();
withdrawMoney();
reentrantLock.unlock();
}, "取钱线程1").start();
new Thread(() -> {
reentrantLock.lock();
withdrawMoney();
reentrantLock.unlock();
}, "取钱线程2").start();
}
public static void withdrawMoney() {
System.out.println(Thread.currentThread().getName() + ":开始取钱");
sleep(3000);
System.out.println(Thread.currentThread().getName() + ":取钱完成");
}
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
}
}
}
- 自己实现一个锁
package com.zgs.lock.reentrant_lock;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
/**
* @author guisong.zhang
* @date 2024/3/9 23:41
* @description 自定义lock
*/
public class MyLock {
private static final Unsafe unsafe = getUnsafe();
private volatile int state;
private static long stateOffSet;
static {
try {
stateOffSet = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 自己实现线程阻塞的几种方式
* 1:wait:wait需要搭配 synchronized 使用,没必要因为synchronized就会加锁,这并不是我们自己实现的锁。
* 2:sleep:解锁时间不确定,怎么唤醒呢,所以也不行
* 3:park:
* 4:while(true)自旋:
*/
public void lock() {
//判断当前线程是否需要加锁
while (!unsafe.compareAndSwapInt(this, stateOffSet, 0, 1)) {
System.out.println(Thread.currentThread().getName() + ":正在自选尝试加锁");
}
System.out.println(Thread.currentThread().getName() + ":加锁成功");
}
public void unlock() {
state = 0;
}
private static Unsafe getUnsafe() {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
- park的使用
package com.zgs.lock.reentrant_lock;
import java.util.concurrent.locks.LockSupport;
/**
* @author guisong.zhang
* @date 2024/3/10 22:44
* @description 类描述
*/
public class ParkTest {
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
System.out.println("线程1开始执行");
LockSupport.park();
System.out.println("线程1解除阻塞,继续执行了.....");
}, "线程1");
thread1.start();
Thread thread2 = new Thread(() -> {
System.out.println("线程2开始执行");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(thread1);
}, "线程2");
thread2.start();
}
}
ReentrantLocak源码流程图
ReentrantLock加锁示例代码-代码git地址
2:AQS应用之Semapho
Semaphore 是什么?
- Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。
怎么使用 Semaphore?
构造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
重要方法
public void acquire() throws InterruptedException
public void release()
tryAcquire(long timeout, TimeUnit unit)
基本使用
需求场景
- 资源访问,服务限流(Hystrix里限流就有基于信号量方式)。
代码实现
package com.zgs.lock.semapho;
import java.util.concurrent.Semaphore;
/**
* @author guisong.zhang
* @date 2024/3/11 23:09
* @description semapho测试类
* 默认实现非公平锁
*/
public class SemaphoRunnerTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(new Task(semaphore, "张贵松-线程" + i)).start();
}
}
static class Task extends Thread {
Semaphore semaphore;
public Task(Semaphore semaphore, String name) {
super(name);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
//获取资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获取到资源,时间:" + System.currentTimeMillis());
Thread.sleep(5000);
//释放资源
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
-
输出结果
-
从打印结果可以看出,一次只有两个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。
Semapho源码流程图
Semapho示例代码git地址
3:AQS应用之CountDownLatch
CountDownLatch是什么?
- CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行
- 使用场景:Zookeeper分布式锁,Jmeter模拟高并发等
CountDownLatch如何工作?
- CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。
- 当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务
API
CountDownLatch.countDown()
CountDownLatch.await();
CountDownLatch应用场景例子
- 比如陪媳妇去看病。医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。现在我们是双核,可以同时做这两个事(多线程)。
- 假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)
代码如下:
- CountDownLatchRunner
package com.zgs.lock.countdown_latch;
import java.util.concurrent.CountDownLatch;
/**
* @author: guisong.zhang
* @date: 2024/3/12 15:12:09
* @description CountDownLatch测试类
**/
public class CountDownLatchRunner {
public static void main(String[] args) throws InterruptedException {
long timeNow = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new SeeDoctorTask(countDownLatch)).start();
new Thread(new QueueTask(countDownLatch)).start();
countDownLatch.await();
System.out.println("等待所有线程执行完毕后继续执行——cost time:" + (System.currentTimeMillis() - timeNow));
}
}
- QueueTask
package com.zgs.lock.countdown_latch;
import java.util.concurrent.CountDownLatch;
/**
* @author: guisong.zhang
* @date: 2024/3/12 15:27:57
* @description TODO
**/
public class QueueTask extends Thread {
private CountDownLatch countDownLatch;
public QueueTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("开始在医院药房排队买药....");
Thread.sleep(5000);
System.out.println("排队成功,可以开始缴费买药");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
}
- SeeDoctorTask
package com.zgs.lock.countdown_latch;
import java.util.concurrent.CountDownLatch;
/**
* @author: guisong.zhang
* @date: 2024/3/12 15:28:13
* @description TODO
**/
public class SeeDoctorTask extends Thread {
private CountDownLatch countDownLatch;
public SeeDoctorTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("开始看医生");
Thread.sleep(3000);
System.out.println("结束看医生");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != countDownLatch) {
countDownLatch.countDown();
}
}
}
}
CountDownLatch源码流程图
CountDownLatch示例代码git地址
4:CyclicBarrier
- 栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
- CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
API
cyclicBarrier.await();
应用场景
- 可以用于多线程计算数据,最后合并计算结果的场景。
- 例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,
- 先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,
- 最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
示例代码:
package com.zgs.lock.cyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: guisong.zhang
* @date: 2024/3/12 16:52:33
* @description CyclicBarrierRunner测试类
**/
public class CyclicBarrierRunner {
static class WorkerThread implements Runnable {
private final CyclicBarrier barrier;
private int id;
public WorkerThread(CyclicBarrier barrier, int id) {
this.barrier = barrier;
this.id = id;
}
@Override
public void run() {
try {
// 模拟线程做准备工作或任务执行
System.out.println("Worker " + id + " started.");
Thread.sleep(1000); // 假设执行耗时操作
System.out.println("Worker " + id + " is about to reach the barrier.");
// 当前线程到达屏障点并等待其他线程
barrier.await();
System.out.println("Worker " + id + " passed the barrier and can continue now.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int numberOfWorkers = 5; // 线程数量
CyclicBarrier barrier = new CyclicBarrier(numberOfWorkers, () -> {
System.out.println("All workers have reached the barrier. Executing a barrier action...");
});
ExecutorService executorService = Executors.newFixedThreadPool(numberOfWorkers);
for (int i = 0; i < numberOfWorkers; i++) {
executorService.execute(new WorkerThread(barrier, i + 1));
}
// 关闭线程池
executorService.shutdown();
}
}
CyclicBarrier示例代码git地址
5:AQS应用之BlockingQueue
- BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影。
队列类型
- 无限队列 (unbounded queue ) - 几乎可以无限增长
- 有限队列 ( bounded queue ) - 定义了最大容量
队列数据结构
队列实质就是一种存储数据的结构
- 通常用链表或者数组实现
- 一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
- 主要操作:入队(EnQueue)与出队(Dequeue)
常见的4种阻塞队列
- ArrayBlockingQueue 由数组支持的有界队列
- LinkedBlockingQueue 由链接节点支持的可选有界队列
- PriorityBlockingQueue 由优先级堆支持的无界优先级队列
- DelayQueue 由优先级堆支持的、基于时间的调度队列
ArrayBlockingQ
- 队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好
- 数据结构如下图:
- 队列创建:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
- 应用场景
在线程池中有比较多的应用,生产者消费者场景 - 工作原理
基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞
LinkedBlockingQueue
- 是一个基于链表的无界队列(理论上有界)
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
- 上面这段代码中,blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
- 向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。
- 使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。
DelayQueue
- 由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现。
- 队列创建
BlockingQueue<String> blockingQueue = new DelayQueue();
- 要求
入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口 - 应用场景
电影票 - 工作原理:
队列内部会根据时间优先级进行排序。延迟类线程池周期执行。
BlockingQueue API
- BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。
添加元素
检索元素
在构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。
多线程生产者-消费者示例
-
代码git地址
-
代码说明
-
接下来我们创建一个由两部分组成的程序 - 生产者 ( Producer ) 和消费者 ( Consumer) 。
-
生产者将生成一个 0 到 100 的随机数(十全大补丸的编号),并将该数字放在BlockingQueue 中。我们将创建 16 个线程(潘金莲)用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。
-
需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。
-
从生产者(潘金莲)向消费者(武大郎)发出信号的好方法是,不需要处理消息,而是发送称为毒( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison )丸 ( pill ) ,因为我们有消费者(武大郎)。然后当消费者从队列中获取特殊的毒 (poison ) 丸 ( pill )消息时,它将优雅地完成执行。
-
以下生产者的代码:
package com.zgs.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author: guisong.zhang
* @date: 2024/3/7 15:26:09
* @description TODO
**/
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
@Override
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
System.out.println("潘金莲‐" + Thread.currentThread().getId() + "-号,给武大郎的泡药!");
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
System.out.println("潘金莲‐" + Thread.currentThread().getId() + "-号,往武大郎的药里放入第" + j + 1 + "颗毒丸!");
}
}
}
- 我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理。我们看到方法 generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。
- 它还需要有毒 ( poison ) 丸 ( pill ) (潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。
- 每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison) 丸 ( pill )(武大郎看潘金莲有没有下毒) ,
- 如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。
package com.zgs.lock;
import java.util.concurrent.BlockingQueue;
/**
* @author: guisong.zhang
* @date: 2024/3/7 15:30:16
* @description TODO
**/
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println("武大郎‐" + Thread.currentThread().getId() + "-号,喝药‐编号:" + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- 需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
- 既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素
- 我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:
package com.zgs.lock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author: guisong.zhang
* @date: 2024/3/7 15:34:24
* @description TODO
**/
public class Main {
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
//潘金莲给武大郎熬药
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
//武大郎开始喝药
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
//潘金莲开始投毒,武大郎喝完毒药GG
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
}
}
- BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。
- 我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。
- 这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。