大纲
1.Curator的可重入锁的源码
2.Curator的非可重入锁的源码
3.Curator的可重入读写锁的源码
4.Curator的MultiLock源码
5.Curator的Semaphore源码
1.Curator的可重入锁的源码
(1)InterProcessMutex获取分布式锁
(2)InterProcessMutex的初始化
(3)InterProcessMutex.acquire()尝试获取锁
(4)LockInternals.attemptLock()尝试获取锁
(5)不同客户端线程获取锁时的互斥实现
(6)同一客户端线程可重入加锁的实现
(7)客户端线程释放锁的实现
(8)客户端线程释放锁后其他线程获取锁的实现
(9)InterProcessMutex就是一个公平锁
(1)InterProcessMutex获取分布式锁
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",
5000,
3000,
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端");
//获取分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");
lock.acquire();
Thread.sleep(1000);
lock.release();
}
}
(2)InterProcessMutex的初始化
设置锁的节点路径basePath + 初始化一个LockInternals对象实例。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
...
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME, 1, driver);
}
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//1.设置锁的节点路径
basePath = PathUtils.validatePath(path);
//2.初始化一个LockInternals对象实例
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
}
public class LockInternals {
private final LockInternalsDriver driver;
private final String lockName;
private volatile int maxLeases;
private final WatcherRemoveCuratorFramework client;
private final String basePath;
private final String path;
...
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
...
}
(3)InterProcessMutex.acquire()尝试获取锁
LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象,用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount,用于锁的重入次数计数。
在执行InterProcessMutex的acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的LockData数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间time = -1。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
//一个线程对应一个LockData数据对象
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//设置锁的路径
basePath = PathUtils.validatePath(path);
//初始化LockInternals
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
@Override
public void acquire() throws Exception {
//获取分布式锁,会一直阻塞等待直到获取成功
//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData数据
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
//可重入计算
lockData.lockCount.incrementAndGet();
return true;
}
//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
LockData newLockData = new LockData(currentThread, lockPath);
//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
//LockData是InterProcessMutex的一个静态内部类
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
protected byte[] getLockNodeBytes() {
return null;
}
...
}
(4)LockInternals.attemptLock()尝试获取锁
先创建临时节点,再判断是否满足获取锁的条件。
步骤一:首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。
步骤二:然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的PredicateResults对象。
具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁,默认是1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。
获取锁成功,则会中断LockInternals的internalLockLoop()方法的while循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象,然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。
获取锁失败,则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息,并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后,便会调用wait()方法将当前线程挂起。
所以前一个节点发生变化时,便会通知添加的Watcher监听。然后便会唤醒阻塞的线程,继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。
public class LockInternals {
private final LockInternalsDriver driver;
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
...
}
...
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//获取当前时间
final long startMillis = System.currentTimeMillis();
//默认情况下millisToWait=null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//默认情况下localLockNodeBytes也是null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;//是否已经获取到锁
boolean isDone = false;//是否正在获取锁
while (!isDone) {
isDone = true;
//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//2.检查是否获取到了锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
List<String> getSortedChildren() throws Exception {
//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
return getSortedChildren(client, basePath, lockName, driver);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = client.getChildren().forPath(basePath);
//对节点名称进行排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort(
sortedList,
new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//级联创建一个临时顺序节点
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
//默认情况下传入的lockNodeBytes=null
if (lockNodeBytes != null) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
//创建临时顺序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(5)不同客户端线程获取锁时的互斥实现
maxLeases代表了同时允许多少个客户端可以获取到锁,默认值是1。能否获取锁的判断就是:线程创建的节点的位置outIndex < maxLeases。当线程1创建的节点在节点列表中排第一时,满足outIndex = 0 < maxLeases = 1,可以获取锁。当线程2创建的节点再节点列表中排第二时,不满足outIndex = 1 < maxLeases = 1,所以不能获取锁。从而实现线程1和线程2获取锁时的互斥。
(6)同一客户端线程可重入加锁的实现
客户端线程重复获取锁时,会重复调用InterProcessMutex的internalLock()方法。在InterProcessMutex的internalLock()方法中:线程第一次获取锁成功会创建一个LockData对象,并存放在一个Map中。线程第二次获取锁时,便会从这个Map中取出这个LockData对象,并对LockData对象中的重入计数器lockCount进行递增,接着就返回true。以此实现可重入加锁。
(7)客户端线程释放锁的实现
客户端线程释放锁时会调用InterProcessMutex的release()方法。
首先对LockData里的重入计数器进行递减。当重入计数器大于0时,直接返回。当重入计数器为0时才执行下一步删除节点的操作。
然后删除客户端线程创建的临时顺序节点,client.delete().guaranteed().forPath(ourPath)。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
@Override
public void release() throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData对象
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//1.首先对LockData里的重入计数器lockCount进行递减
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
//当重入计数器大于0时,直接返回
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
//2.当重入计数器为0时执行删除节点的操作
internals.releaseLock(lockData.lockPath);
} finally {
threadData.remove(currentThread);
}
}
...
}
public class LockInternals {
...
final void releaseLock(String lockPath) throws Exception {
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception {
//删除节点
client.delete().guaranteed().forPath(ourPath);
}
...
}
(8)客户端线程释放锁后其他线程获取锁的实现
由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法,也就是唤醒该线程,让其继续执行while循环获取锁。
public class LockInternals {
...
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
...
}
(9)InterProcessMutex就是一个公平锁
因为所有客户端线程都会创建一个顺序节点,然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁,实现了所有客户端排队获取锁。
2.Curator的非可重入锁的源码
(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用
(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码
(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用
非可重入锁:同一个时间只能有一个客户端线程获取到锁,其他线程都要排队,而且同一个客户端线程是不可重入加锁的。
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//非可重入锁
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks");
lock.acquire();
Thread.sleep(3000);
lock.release();
}
}
(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码
Curator的非可重入锁是基于Semaphore来实现的,也就是将Semaphore允许获取Lease的客户端线程数设置为1,从而实现同一时间只能有一个客户端线程获取到Lease。
public class InterProcessSemaphoreMutex implements InterProcessLock {
private final InterProcessSemaphoreV2 semaphore;
private final WatcherRemoveCuratorFramework watcherRemoveClient;
private volatile Lease lease;
public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
}
@Override
public void acquire() throws Exception {
//获取非可重入锁就是获取Semaphore的Lease
lease = semaphore.acquire();
}
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
Lease acquiredLease = semaphore.acquire(time, unit);
if (acquiredLease == null) {
return false;
}
lease = acquiredLease;
return true;
}
@Override
public void release() throws Exception {
//释放非可重入锁就是释放Semaphore的Lease
Lease lease = this.lease;
Preconditions.checkState(lease != null, "Not acquired");
this.lease = null;
lease.close();
watcherRemoveClient.removeWatchers();
}
}