AQS 是什么
AQS 的全称为 AbstractQueuedSynchronizer,翻译过来的意思就是抽象队列同步器,这个类在 java.util.concurrent.locks 包下面 Java 中的大部分同步类(Lock、Semaphore、ReentrantLock等) 都是基于 AQS 实现的 AQS 是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架 AQS 就是一个抽象类,主要用来构建锁和同步器 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { }
AQS 有哪些变量
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
private transient Thread exclusiveOwnerThread;
AQS 底层数据结构
state:初始值为 0,如果有线程持有变量为 1,如果是可重入锁,同个线程可以继续叠加 CLH(Craig Landin Hagersten)队列:是一个虚拟的双向队列、FIFO 队列,虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系,AQS 将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配 为什么使用双向而不用单向 ?
假设队列是单向的如:head -> n1 -> n2 -> tail,出队的时候获取 n1 很简单,head.next 就行了 入队就麻烦了,因为是一个 FIFO 队列,后放的元素要放在尾部,要遍历整个链表到 n2,然后 n2.next = n3;n3.next = tail,入队的复杂度就是 O(n) ,而且 tail 也失去他的意义 相反双向链表出队和入队都是 O(1) 时间复杂度,说白了空间换时间
condition 队列,只有使用到 Condition 对象才有的队列 单向队列,等待的线程就存在该队列中
AQS 扩展组件
线程池
import java. util. concurrent. * ;
public class ThreadPool {
private static final RejectedExecutionHandler handler = new ThreadPoolExecutor. DiscardPolicy ( ) ;
private static final int MAX_QUEUE_SIZE = 200 ;
public static final ThreadPoolExecutor POOL = new ThreadPoolExecutor (
20 ,
40 ,
0 ,
TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < > ( MAX_QUEUE_SIZE ) ,
Executors . defaultThreadFactory ( ) ,
handler
) ;
public static void shutdown ( ) {
POOL . shutdown ( ) ;
}
}
使用的日志依赖
< dependency>
< groupId> org.slf4j</ groupId>
< artifactId> slf4j-simple</ artifactId>
< version> 1.7.25</ version>
</ dependency>
CountDownLatch
等待一组线程执行完成,再执行下一步操作 例如:服务有多个地区,但是账号是全局唯一的,在用户注册的时候需要遍历每个区域是否存在账号,全部区域都判断账号不存在,再进行下一步,这时候就可以使用 CountDownLatch 同时校验每个区域是否存在账号
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. CountDownLatch ;
import java. util. concurrent. TimeUnit ;
public class CountDownLatchExample {
private static final Logger log = LoggerFactory . getLogger ( CountDownLatchExample . class ) ;
private static final int threadCount = 200 ;
public static void main ( String [ ] args) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch ( threadCount) ;
for ( int i = 0 ; i < threadCount; i++ ) {
final int threadNum = i;
ThreadPool . POOL . execute ( ( ) -> {
try {
test ( threadNum) ;
} catch ( Exception e) {
log. error ( "exception " , e) ;
} finally {
countDownLatch. countDown ( ) ;
}
} ) ;
}
countDownLatch. await ( 10 , TimeUnit . MILLISECONDS ) ;
log. info ( "finish" ) ;
ThreadPool . shutdown ( ) ;
}
public static void test ( int threadNum) throws Exception {
Thread . sleep ( 100L ) ;
log. info ( "{}" , threadNum) ;
Thread . sleep ( 100L ) ;
}
}
Semaphore
信号量:可以指定某个共享资源可以同时最多被几个线程访问 应用场景:数据库的连接,数据库同时最多可以被几个线程访问,通过 Semaphore 机制,可以限制最大的访问线程数
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. Semaphore ;
import java. util. concurrent. TimeUnit ;
public class SemaphoreExample {
private static final Logger log = LoggerFactory . getLogger ( SemaphoreExample . class ) ;
private static final int threadCount = 20 ;
public static void main ( String [ ] args) throws Exception {
final Semaphore semaphore = new Semaphore ( 3 ) ;
for ( int i = 0 ; i < threadCount; i++ ) {
final int threadNum = i;
ThreadPool . POOL . execute ( ( ) -> {
try {
if ( semaphore. tryAcquire ( 3 , 500 , TimeUnit . MILLISECONDS ) ) {
test ( threadNum) ;
semaphore. release ( 3 ) ;
}
} catch ( Exception e) {
log. error ( "exception" , e) ;
}
} ) ;
}
ThreadPool . shutdown ( ) ;
}
private static void test ( int threadNum) throws Exception {
log. info ( "{}" , threadNum) ;
Thread . sleep ( 5000 ) ;
}
}
CyclicBarrier
循环屏障,当线程统一到达某个状态,再执行下一步操作 (这里和 CountDownLatch 一样) 当第一次的使用结束之后,还可以再使用执行下一步操作,循环使用
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. CyclicBarrier ;
public class CyclicBarrierExample {
private static final Logger log = LoggerFactory . getLogger ( CyclicBarrierExample . class ) ;
private static final CyclicBarrier barrier = new CyclicBarrier ( 5 , ( ) -> {
log. info ( "callback is running" ) ;
} ) ;
public static void main ( String [ ] args) throws Exception {
for ( int i = 0 ; i < 10 ; i++ ) {
final int threadNum = i;
Thread . sleep ( 1000L ) ;
ThreadPool . POOL . execute ( ( ) -> {
try {
race ( threadNum) ;
} catch ( Exception e) {
log. error ( "exception" , e) ;
}
} ) ;
}
ThreadPool . POOL . shutdown ( ) ;
}
private static void race ( int threadNum) throws Exception {
Thread . sleep ( 1000 ) ;
log. info ( "{} is ready" , threadNum) ;
barrier. await ( ) ;
log. info ( "{} continue" , threadNum) ;
}
}
ReentrantLock
可以通过构造方法执行公平锁还是非公平锁,默认非公平锁 使用锁之后,一般都要在 finally 中释放锁
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. locks. Lock ;
import java. util. concurrent. locks. ReentrantLock ;
public class ReentrantLockExample {
private static final Logger log = LoggerFactory . getLogger ( ReentrantLockExample . class ) ;
private final Lock lock = new ReentrantLock ( ) ;
private Integer i = 0 ;
public static void main ( String [ ] args) {
ReentrantLockExample main = new ReentrantLockExample ( ) ;
for ( int i = 0 ; i < 100 ; i++ ) {
ThreadPool . POOL . execute ( main:: testLock ) ;
}
ThreadPool . POOL . shutdown ( ) ;
}
public void testLock ( ) {
lock. lock ( ) ;
try {
i++ ;
log. info ( "i 的值:" + i) ;
} finally {
lock. unlock ( ) ;
}
}
}
Condition
Condition 对象,可以结合 ReentrantLock 一起使用 通过 condition 的单向队列维护等待的线程,在获取锁之后,调用 await() 方法让线程中从 AQS 队列中移除,然后加入到 condition 队列进行等待,当收到线程的通知之后,线程再回到 AQS 队列中执行
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class ConditionalExample {
private static final Logger log = LoggerFactory . getLogger ( ConditionalExample . class ) ;
public static void main ( String [ ] args) {
ReentrantLock reentrantLock = new ReentrantLock ( ) ;
Condition condition = reentrantLock. newCondition ( ) ;
new Thread ( ( ) -> {
try {
reentrantLock. lock ( ) ;
log. info ( "wait signal" ) ;
condition. await ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
log. info ( "get signal" ) ;
reentrantLock. unlock ( ) ;
} ) . start ( ) ;
new Thread ( ( ) -> {
reentrantLock. lock ( ) ;
log. info ( "get lock" ) ;
try {
Thread . sleep ( 3000 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
condition. signalAll ( ) ;
log. info ( "send signal ~ " ) ;
reentrantLock. unlock ( ) ;
} ) . start ( ) ;
}
}
ReentrantReadWriteLock
有读写锁的锁,读锁可以重入 需要留意如果,如果读取比较频繁,那么写锁会一直在等待
import java. util. Map ;
import java. util. Set ;
import java. util. TreeMap ;
import java. util. concurrent. locks. Lock ;
import java. util. concurrent. locks. ReentrantReadWriteLock ;
public class ReentrantReadWriteLockExample {
private final Map < String , Data > map = new TreeMap < > ( ) ;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock ( ) ;
private final Lock readLock = lock. readLock ( ) ;
private final Lock writeLock = lock. writeLock ( ) ;
public Data get ( String key) {
readLock. lock ( ) ;
try {
return map. get ( key) ;
} finally {
readLock. unlock ( ) ;
}
}
public Set < String > getAllKeys ( ) {
readLock. lock ( ) ;
try {
return map. keySet ( ) ;
} finally {
readLock. unlock ( ) ;
}
}
public Data put ( String key, Data value) {
writeLock. lock ( ) ;
try {
return map. put ( key, value) ;
} finally {
readLock. unlock ( ) ;
}
}
class Data {
}
}
StampedLock
可以通过悲观锁或者客观锁实现并发控制,乐观锁性能较好
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. util. concurrent. CountDownLatch ;
import java. util. concurrent. Semaphore ;
import java. util. concurrent. locks. StampedLock ;
public class StampedLockExample {
private static final Logger log = LoggerFactory . getLogger ( StampedLockExample . class ) ;
public static int clientTotal = 50 ;
public static int threadTotal = 20 ;
public static int count = 0 ;
private static final StampedLock lock = new StampedLock ( ) ;
public static void main ( String [ ] args) throws Exception {
final Semaphore semaphore = new Semaphore ( threadTotal) ;
final CountDownLatch countDownLatch = new CountDownLatch ( clientTotal) ;
for ( int i = 0 ; i < clientTotal; i++ ) {
ThreadPool . POOL . execute ( ( ) -> {
try {
semaphore. acquire ( ) ;
add ( ) ;
semaphore. release ( ) ;
} catch ( Exception e) {
log. error ( "exception" , e) ;
}
countDownLatch. countDown ( ) ;
} ) ;
}
countDownLatch. await ( ) ;
ThreadPool . POOL . shutdown ( ) ;
log. info ( "count:{}" , count) ;
}
private static void add ( ) {
long stamp = lock. writeLock ( ) ;
try {
count++ ;
} finally {
lock. unlock ( stamp) ;
}
}
class Point {
private double x, y;
private final StampedLock sl = new StampedLock ( ) ;
void move ( double deltaX, double deltaY) {
long stamp = sl. writeLock ( ) ;
try {
x += deltaX;
y += deltaY;
} finally {
sl. unlockWrite ( stamp) ;
}
}
double distanceFromOrigin ( ) {
long stamp = sl. tryOptimisticRead ( ) ;
double currentX = x, currentY = y;
if ( ! sl. validate ( stamp) ) {
stamp = sl. readLock ( ) ;
try {
currentX = x;
currentY = y;
} finally {
sl. unlockRead ( stamp) ;
}
}
return Math . sqrt ( currentX * currentX + currentY * currentY) ;
}
void moveIfAtOrigin ( double newX, double newY) {
long stamp = sl. readLock ( ) ;
try {
while ( x == 0.0 && y == 0.0 ) {
long ws = sl. tryConvertToWriteLock ( stamp) ;
if ( ws != 0L ) {
stamp = ws;
x = newX;
y = newY;
break ;
} else {
sl. unlockRead ( stamp) ;
stamp = sl. writeLock ( ) ;
}
}
} finally {
sl. unlock ( stamp) ;
}
}
}
}
基于 AQS 实现同步组件
定义内部类继承 AQS 重写 lock() unlock() tryAcquire() tryRelease() isHeldExclusively() 方法
import java. util. concurrent. locks. AbstractQueuedSynchronizer ;
public class ExampleLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire ( int arg) {
return compareAndSetState ( 0 , 1 ) ;
}
@Override
protected boolean tryRelease ( int arg) {
setState ( 0 ) ;
return true ;
}
@Override
protected boolean isHeldExclusively ( ) {
return getState ( ) == 1 ;
}
}
private Sync sync = new Sync ( ) ;
public void lock ( ) {
sync. acquire ( 1 ) ;
}
public void unlock ( ) {
sync. release ( 1 ) ;
}
}
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
public class ExampleMain {
private static final Logger log = LoggerFactory . getLogger ( ExampleMain . class ) ;
static int count = 0 ;
static ExampleLock lock = new ExampleLock ( ) ;
public static void main ( String [ ] args) throws InterruptedException {
Runnable runnable = ( ) -> {
try {
lock. lock ( ) ;
for ( int i = 0 ; i < 10000 ; i++ ) {
count++ ;
log. info ( "cur count: {}" , count) ;
}
} catch ( Exception e) {
e. printStackTrace ( ) ;
} finally {
lock. unlock ( ) ;
}
} ;
Thread thread1 = new Thread ( runnable) ;
Thread thread2 = new Thread ( runnable) ;
thread1. start ( ) ;
thread2. start ( ) ;
thread1. join ( ) ;
thread2. join ( ) ;
log. info ( "count: {}" , count) ;
}
}
参考地址