原文地址在这里
临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:
多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;
其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;
下一个序号的线程得到通知,继续执行;
以此类推,创建节点的时候,就确认了线程执行的顺序。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:
当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:
/**
* 自己本身就是一个 watcher,可以得到通知
* AutoCloseable 实现自动关闭,资源不使用的时候
*/
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
/**
* 记录当前锁的名字
*/
private String znode;
public ZkLock() throws IOException {
this.zooKeeper = new ZooKeeper("localhost:2181",
10000,this);
}
public boolean getLock(String businessCode) {
try {
//创建业务 根节点
Stat stat = zooKeeper.exists("/" + businessCode, false);
if (stat==null){
zooKeeper.create("/" + businessCode,businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
//创建瞬时有序节点 /order/order_00000001
znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//获取业务节点下 所有的子节点
List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
//获取序号最小的(第一个)子节点
Collections.sort(childrenNodes);
String firstNode = childrenNodes.get(0);
//如果创建的节点是第一个子节点,则获得锁
if (znode.endsWith(firstNode)){
return true;
}
//如果不是第一个子节点,则监听前一个节点
String lastNode = firstNode;
for (String node:childrenNodes){
if (znode.endsWith(node)){
zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
break;
}else {
lastNode = node;
}
}
synchronized (this){
wait();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public void close() throws Exception {
zooKeeper.delete(znode,-1);
zooKeeper.close();
log.info("我已经释放了锁!");
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
这段代码实现了一个基于 ZooKeeper 的分布式锁,以下是它的实现步骤:
-
首先创建 ZooKeeper 客户端,并实现 Watcher 接口,在自身上注册监听器。
-
在 ZooKeeper 上创建一个业务根节点,例如 /businessCode,表示该业务下所有的分布式锁。
-
使用 ZooKeeper 的临时有序节点创建子节点,例如 /businessCode/businessCode_00001,表示当前节点占用了分布式锁,并记录在 znode 中。
-
获取业务节点下所有的子节点,并按节点名称排序。如果当前节点的名称是所有子节点中最小的,则获取分布式锁。
-
如果当前节点不是所有子节点中最小的,则监听前一个子节点的删除事件,等待前一个子节点释放锁。
-
当前一个子节点被删除时,重新执行第四步,即再次检查当前节点是否为所有子节点中最小的。
-
如果当前节点成功获取到锁,则执行业务操作;否则等待锁释放,重新获取锁。
-
业务操作完成后,执行 close() 方法释放锁,删除当前节点。
总体来说,这段代码实现了一种基本的分布式锁机制,通过 ZooKeeper 的临时有序节点和 Watcher 监听机制来实现。需要注意的是,此实现方式还可能存在死锁问题,当持有锁的节点出现网络故障或宕机时,会导致整个分布式锁失效。因此,在实际应用中,还需要综合考虑各种异常情况,确保分布式锁的正确性和高可用性。
针对死锁ChatGPT3.5的解决方案:
那段代码中存在潜在的死锁问题,即如果持有锁的节点出现异常,可能导致整个分布式锁失效。为了解决这个问题,可以考虑以下两种方案:
- 引入超时机制
在获取锁的过程中,可以引入超时机制,例如在等待前一个节点释放锁的过程中,设定一个超时时间,如果超过了这个时间仍未收到通知,则当前节点主动删除自己的临时节点,释放锁,并尝试重新获取锁。
- 心跳检测机制
对于已经获取到分布式锁的节点,可以在一定时间间隔内向 ZooKeeper 发送心跳消息,以检测自身是否还处于活跃状态。如果该节点长时间没有发送心跳消息或者 ZooKeeper 无法接收到该节点的心跳消息,就认为该节点出现了异常,需要主动删除自身的临时节点,释放锁,并尝试重新获取锁。
在前面提到的两种方案中,我选择了第一种方案,即引入超时机制来解决死锁问题。具体实现方式如下:
在获取锁的过程中,我们可以增加一个等待时间的参数 waitTime
,表示等待前一个节点释放锁的最长时间。在设置监听器之后,使用 CountDownLatch
等待前一个节点的删除事件通知,当出现以下情况之一时,即可跳出等待循环:
-
在等待时间内成功获取到锁,即当前节点为业务节点下所有临时节点中的最小节点。
-
在等待时间内等待到超时时间,说明前一节点无法正常释放锁,因此当前节点放弃等待,主动删除自己的临时节点,重新尝试获取锁。
修改后的代码如下所示:
public class DistributedLock implements AutoCloseable {
private static final String ROOT_NODE = "/businessCode"; // 业务根节点
private final ZooKeeper zkClient; // ZooKeeper 客户端
private final String lockName; // 分布式锁名称
private String currentLockNodePath; // 当前节点的 znode 路径
public DistributedLock(String zkServers, String lockName) throws IOException {
this.zkClient = new ZooKeeper(zkServers, 3000, event -> {});
this.lockName = lockName;
ensureRootNodeExists();
}
// 确保业务根节点存在
private void ensureRootNodeExists() {
try {
if (zkClient.exists(ROOT_NODE, false) == null) {
zkClient.create(ROOT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new RuntimeException("Failed to ensure root node exists: " + ROOT_NODE, e);
}
}
// 获取锁
public void lock(long waitTime) {
try {
// 创建当前节点
currentLockNodePath = zkClient.create(
ROOT_NODE + "/" + lockName + "_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 等待前一个节点释放锁
List<String> children = zkClient.getChildren(ROOT_NODE, false);
String[] sortedNodes = children.stream().sorted().toArray(String[]::new);
int currentNodeIndex = Arrays.binarySearch(sortedNodes, currentLockNodePath.substring(currentLockNodePath.lastIndexOf("/") + 1));
if (currentNodeIndex == 0) {
// 当前节点为业务节点下所有临时节点中的最小节点,获取锁
return;
}
// 等待前一个节点的删除事件通知,如果超时仍未收到,则主动删除自己的临时节点
CountDownLatch latch = new CountDownLatch(1);
String previousNodePath = ROOT_NODE + "/" + sortedNodes[currentNodeIndex - 1];
Stat stat = zkClient.exists(previousNodePath, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
if (latch.await(waitTime, TimeUnit.MILLISECONDS)) {
// 等待成功,重新执行获取锁操作
lock(waitTime);
} else {
// 超时未等到通知,主动删除自己的临时节点,放弃获取锁
zkClient.delete(currentLockNodePath, -1);
throw new RuntimeException("Failed to acquire lock within wait time: " + waitTime);
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to acquire lock: " + lockName, e);
}
}
// 释放锁
@Override
public void close() {
try {
System.out.println("Releasing lock: " + lockName);
zkClient.delete(currentLockNodePath, -1);
zkClient.close();
} catch (Exception e) {
throw new RuntimeException("Failed to release lock: " + lockName, e);
}
}
}
在 lock()
方法中增加了一个参数 waitTime
,表示等待前一个节点释放锁的最长时间。在方法最后,如果超时未收到通知,则主动删除当前节点,放弃获取锁,并抛出异常提示用户。
需要注意的是,在使用该类时,需要确保所有节点设置的 waitTime
值是一致的。另外,为了让线程能够跳出等待循环,应该每隔一段时间检查一下当前的线程状态,例如通过 Thread.currentThread().isInterrupted()
判断线程是否被打断,如果被打断则立即跳出循环。
下面是使用心跳检测机制的代码实例:
public class DistributedLock implements AutoCloseable {
private static final String ROOT_NODE = "/businessCode"; // 业务根节点
private static final long HEARTBEAT_RATE = 1000L; // 心跳检测频率,单位毫秒
private final ZooKeeper zkClient; // ZooKeeper 客户端
private final String lockName; // 分布式锁名称
private volatile boolean locked = false; // 是否已经获取到了锁
private Thread heartbeatThread; // 心跳线程
private String currentLockNodePath; // 当前节点的 znode 路径
public DistributedLock(String zkServers, String lockName) throws IOException {
this.zkClient = new ZooKeeper(zkServers, 3000, event -> {});
this.lockName = lockName;
ensureRootNodeExists();
}
// 确保业务根节点存在
private void ensureRootNodeExists() {
try {
if (zkClient.exists(ROOT_NODE, false) == null) {
zkClient.create(ROOT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new RuntimeException("Failed to ensure root node exists: " + ROOT_NODE, e);
}
}
// 获取锁
public void lock() {
try {
// 创建当前节点
currentLockNodePath = zkClient.create(
ROOT_NODE + "/" + lockName + "_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 等待前一个节点释放锁
while (!locked) {
List<String> children = zkClient.getChildren(ROOT_NODE, false);
String[] sortedNodes = children.stream().sorted().toArray(String[]::new);
int currentNodeIndex = Arrays.binarySearch(sortedNodes, currentLockNodePath.substring(currentLockNodePath.lastIndexOf("/") + 1));
if (currentNodeIndex == 0) {
// 当前节点为业务节点下所有临时节点中的最小节点,获取锁
locked = true;
startHeartbeatThread(); // 启动心跳线程
break;
}
// 等待前一个节点的删除事件通知
String previousNodePath = ROOT_NODE + "/" + sortedNodes[currentNodeIndex - 1];
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zkClient.exists(previousNodePath, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
latch.await();
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to acquire lock: " + lockName, e);
}
}
// 释放锁
@Override
public void close() {
try {
System.out.println("Releasing lock: " + lockName);
zkClient.delete(currentLockNodePath, -1);
stopHeartbeatThread(); // 停止心跳线程
zkClient.close();
} catch (Exception e) {
throw new RuntimeException("Failed to release lock: " + lockName, e);
}
}
// 启动心跳线程
private void startHeartbeatThread() {
heartbeatThread = new Thread(() -> {
while (true) {
try {
zkClient.setData(currentLockNodePath, null, -1);
Thread.sleep(HEARTBEAT_RATE);
} catch (Exception e) {
System.out.println("Failed to send heartbeat signal: " + lockName);
}
}
});
heartbeatThread.start();
}
// 停止心跳线程
private void stopHeartbeatThread() {
if (heartbeatThread != null) {
heartbeatThread.interrupt();
heartbeatThread = null;
}
}
}
在该代码实例中,我们创建了一个布尔型变量 locked
,表示当前是否已经获取到了锁。在获取锁的过程中,如果当前节点为业务节点下所有临时节点中的最小节点,则设置 locked
为 true
,同时启动心跳线程。心跳线程每隔一段时间就向 ZooKeeper 发送一次空数据以保持会话,从而保证自己的临时节点不会过期。
在释放锁的过程中,停止心跳线程即可。需要注意的是,心跳线程的 while
循环不能被打断,因为一旦被打断,线程就会退出,从而不再发送心跳信号,导致临时节点过期。因此,在捕获 InterruptedException
异常时只是简单地输出日志,并继续下一轮循环。