Lock
互斥锁:
1、锁的可重入性:
当一个线程调用object.lock()获取到锁,进入临界区后,还可以再次调用object.lock()。
通常锁都应该设计为可重入,否则就会发生死锁。 比如synchronized就是可重入,在一个synchronized方法中可以继续调用另一个synchronized方法。
2、Lock:
基本认识:
public interface Lock { void lock(); //可以被中断 void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
ReentrantLock实现Lock接口,它的实现都在Sync类中:
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }// ... }
Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。
如果一个线程来了不排队,直接去抢锁,就是非公平。 这也是默认的构造方法,目的是为了提高效率。
锁的基本原理:
Sync 的父类 AbstractQueuedSynchronizer,被称作队列同步器(AQS),它的父类是AbstractOwnableSynchronizer(AOS)。 看命名,都是Synchronizer结尾,因此,此锁具有备synchronized 功能,可以阻塞一个线程。 为了实现一把具有阻塞或唤醒功能的锁,需要几个要素:
- 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作, 使用CAS(Compare and Swap)保证线程安全。
- 需要记录当前是哪个线程持有锁。
- 需要底层支持对一个线程进行阻塞或唤醒操作。
- 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用 CAS。
针对1和2,Sync的两个父类AOS、AQS已有对应的实现:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private transient Thread exclusiveOwnerThread; // 记录持有锁的线程 } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private volatile int state; // 记录锁的状态,通过CAS修改state的值。 }
state可以大于1,例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。
- 当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
- 当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
- 当state > 1时,说明该线程重入了该锁;
针对第3点:Unsafe类提供了阻塞或唤醒线程的一对操作,park/unpark。 LockSupport工具类进行了进一步封装:
public class LockSupport { // ... private static final Unsafe U = Unsafe.getUnsafe(); public static void park() { U.park(false, 0L); } public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); } }
当一个线程中调用park(),该线程就会被阻塞; 然后另一个线程中调用
unpark(Thread thread),传入一个被阻塞的线程,就可以将其唤醒(notify只能唤醒一个不确定的线程)。
针对第4点:AOS这个父类中,还实现了一个双向链表的阻塞队列,存放阻塞的线程:
public abstract class AbstractQueuedSynchronizer { // ... static final class Node { volatile Thread thread; // 每个Node对应一个被阻塞的线程 volatile Node prev; volatile Node next; // ... } private transient volatile Node head; private transient volatile Node tail; // ... }
head指向第一个Node的位置,tail指向下一个要添加的位置。 初始为空,head和tail都指向null,入队时往tail处添加,tail往后移指向下一个null;出队时,将head指向的Node移除,head往后移。 所以,当head=tail=null时,代表队列为空。
ReentrantLock在公平性和非公平性上的实现差异:
非公平锁:如果state为0,直接将当前线程设置为锁持有者,并设置state的值; 如果state不是0,但锁的持有者是当前线程,直接更新state。
公平锁:如果state为0,要看看队列中有没有其他等待线程,如果没有才将当前线程设置为持有者; 如果state不为0,和上面一样。
阻塞队列与唤醒机制:
lock.lock()
调用lock.lock(),最终会到AQS中的核心方法,acquire:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
解析:
- 如果tryAcquire没有获取到锁,就调用acquireQueued去获取。
- 在acquireQueued中先调用addWaiter: 为当前线程生成一个Node,然后把Node放入双向链表的尾部。 此时还未阻塞,需要调用acquireQueued。 线程一旦进入acquireQueued方法,就会无限期阻塞,即使其他线程调用interrupt也无法唤醒,直到方法结束,也就是它获取到锁那一刻才会被唤醒。此时,会删除队列的第一个Node。
阻塞方法parkAndCheckInterrupt,其实就是调用了LockSupport.park方法。
3.此外,acquireQueued有个返回值,代表当前线程有没有中断标志(在阻塞期间,可能有其他线程给他发送过中断信号,但此时无法响应),如果有会调用selfInterrupt(),自己给自己发送一下中断信号,重新响应一下中断。
lock.unlock()
unlock 不区分公不公平,直接释放锁后,唤醒head节点,让其获取锁。代码逻辑在AQS中:public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
如果尝试释放锁成功,就调用unparkSuccessor唤醒头节点,让其获取锁。
tryRelease中,就是判断当前线程是否持有锁,并state的值减到0为止。 参数中的releases,在上层调用unlock时默认传的1,因此,lock了几次,就要调用unlock几次,才能真正的释放锁。
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
lockInterruptibly ():
ReentrantLock除了lock()方法,还可以调用lockInterruptibly (),此方法可以响应中断。 底层调用了AQS中的acquireInterruptibly:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
里面的tryAcquire只是个模版方法,分别被FairSync和 NonfairSync实现。 当tryAcquire中没有获取到锁时,会执行doAcquireInterruptibly,判断如果有其他线程发了中断信号,则抛出异常,不会一直阻塞。
tryLock():
ReentrantLock中,其实用的比较多的,还有tryLock。 它是基于非公平锁的tryAcquire实现逻辑,如果拿到锁就返回true,否则返回false,不会一直阻塞等待。
读写锁:
与上面的互斥锁ReentrantLock相比,读写锁(ReadWriteLock)也是实现了Lock接口。 但是,它可以满足:读读不互斥(一个线程获取了读锁,其他线程还能获取读锁),读写互斥(一个线程获取了读锁,其他线程就不能获取写锁。 反之亦然。),写写互斥(一个线程获取了写锁,其他线程不能再获取写锁)。 ReadWriteLock也是个接口,具体逻辑由ReentrantReadWriteLock实现(RRWL)。 而在RRWL中,有两个内部类,读锁与写锁,也是实现了Lock。 因此,在使用读写锁时,要分别获取读锁与写锁:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock ();Lock readLock = readWriteLock . readLock ();readLock . lock ();// 进行读取操作readLock . unlock ();Lock writeLock = readWriteLock . writeLock ();writeLock . lock ();// 进行写操作writeLock . unlock ();
实际上,两把锁都只是同一把锁的两个视图而已,他们只有一个sync对象, 所以,在同一个对象中,也才能实现读写互斥的逻辑:当对象中state=0时,说明没有线程持有锁;当state != 0时,要么有线程持有读锁,要么有线程持有写锁。再通过 sharedCount(state)和exclusiveCount(state)判断到底是读线程还是写线程持有了该锁。
从构造方法可以看出,共用了一个sync,sync也同样实现了公平,非公平的逻辑,并继承AQS。public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
因此,两把锁的逻辑实现,其实就是调用了sync的方法(AQS的方法,由多个sync继承实现):acquire/release(互斥锁和读写锁的写锁)、acquireShared/releaseShared(读写锁的读锁),公平和非公平(是否需要阻塞)在sync不同的子类中实现:
static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; // 写线程抢锁的时候是否应该阻塞 final boolean writerShouldBlock() { // 写线程在抢锁之前永远不被阻塞,非公平锁 return false; } // 读线程抢锁的时候是否应该阻塞 final boolean readerShouldBlock() { // 读线程抢锁的时候,当队列中第一个元素是写线程的时候要阻塞(即便是非公平,也要排在写线程之后) return apparentlyFirstQueuedIsExclusive(); } }
static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; // 写线程抢锁的时候是否应该阻塞 final boolean writerShouldBlock() { // 写线程在抢锁之前,如果队列中有其他线程在排队,则阻塞。公平锁 return hasQueuedPredecessors(); } // 读线程抢锁的时候是否应该阻塞 final boolean readerShouldBlock() { // 读线程在抢锁之前,如果队列中有其他线程在排队,阻塞。公平锁 return hasQueuedPredecessors(); } }
对于公平,都需要排队获取锁; 对于非公平,就要分情况了:
- 写锁:当state=0没有其他线程持有锁(或者state!=0,但是持锁的是自己),直接获取锁,不用排队。
- 读锁:如果队列的第一个是写线程,先让写线程获取锁,否则可能导致写线程一直获取不到。
Condition:
Condition本身也是一个接口,其功能和wait/notify类似,必须同Lock一起使用。 所以,Lock接口中,有一个和创建Conditon的方法。
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; // 所有的Condition都是从Lock中构造出来的 Condition newCondition(); boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); }
public interface Condition { void await() throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
我们知道,如果使用wait notify,是无差别唤醒。 假如只有一个生产者和一个消费者还好,如果有多个,可能出现生产者通知生产者、消费者通知消费者的问题(如果没有使用notifyAll,还可能出现死锁)。 而使用Condition,就可以精确唤醒,具体用法就是在Lock中new 两个Condition,分别给生产者和消费者使用(生产者使用condition1来等待,也可以唤醒condition2; 消费者则反过来)。
StampedLock:
StampedLock是在JDK8中新增的,可以支持读写不互斥。
ReentrantReadWriteLock 采用的是 “ 悲观读 ” 的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程 “ 饿死 ” 。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock 引入了 “ 乐观读 ” 策略,读的时候不加读锁,读出来发现数据被修改了,再升级为 “ 悲观读” ,相当于降低了 “ 读 ” 的地位,把抢锁的天平往 “ 写 ” 的一方倾斜了一下,避免写线程被饿死。