文章目录
- 1、zk分布式锁的实现原理
- 1_获取锁过程
- 2_释放锁
- 2、代码实现
- 1_创建客户端对象
- 2_使用和测试案例
1、zk分布式锁的实现原理
Z o o k e e p e r Zookeeper Zookeeper 就是使用临时顺序节点特性实现分布式锁的,官网。
- 获取锁过程 (创建临时节点,检查序号最小)
- 释放锁 (删除临时节点,监听通知)
1_获取锁过程
1、当第一个客户端请求过来时,
Z
o
o
k
e
e
p
e
r
Zookeeper
Zookeeper客户端会创建一个持久节点/locks
。
如果它(Client1)想获得锁,需要在locks节点下创建一个顺序节点lock1
。如图
2、接着,客户端 Client1 会查找locks下面的所有临时顺序子节点,判断自己的节点lock1是不是排序最小的那一个,如果是,则成功获得锁。
3、这时候如果又来一个客户端Client2前来尝试获得锁,它会在locks下再创建一个临时节点lock2。
4、客户端 Client2 一样也会查找locks下面的所有临时顺序子节点,判断自己的节点lock2是不是最小的,此时,发现lock1才是最小的,于是获取锁失败。
获取锁失败,它是不会甘心的,Client2 向它排序靠前的节点lock1注册Watcher事件,用来监听lock1是否存在,也就是说 Client2 抢锁失败进入等待状态。
5、此时,如果再来一个客户端Client3来尝试获取锁,它会在locks下再创建一个临时节点lock3。
6、同样的,Client3 一样也会查找locks下面的所有临时顺序子节点,判断自己的节点lock3是不是最小的,发现自己不是最小的,就获取锁失败。
它也是不会甘心的,它会向在它前面的节点lock2注册Watcher事件,以监听lock2节点是否存在。
2_释放锁
再来看看释放锁的流程, Z o o k e e p e r Zookeeper Zookeeper的「客户端业务完成或者故障」,都会删除临时节点,释放锁。
如果是任务完成,Client1 会显式调用删除lock1的指令。
如果是客户端故障了,根据临时节点得特性,lock1是会自动删除的。
lock1节点被删除后,Client2 一直监听着lock1。
lock1节点删除,Client2 立刻收到通知,也会查找locks下面的所有临时顺序子节点,发现lock2是最小,就获得锁。
同理,Client2 获得锁之后,Client3 持续监听。
2、代码实现
基于 Z o o k e e p e r Zookeeper Zookeeper 原生态的客户端类实现分布式是非常麻烦的,我们使用 a p a h c e apahce apahce 提供的一个 z o o k e e p e r zookeeper zookeeper 客户端来实现。
Curator (动物园园长): http://curator.apache.org/。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<!-- 据说是最厉害的版本 -->
<version>4.2.0</version>
</dependency>
recipes
是 Curator 族谱大全,里面包含
Z
o
o
k
e
e
p
e
r
Zookeeper
Zookeeper 和 framework。
1_创建客户端对象
package org.example.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author shenyang
* @version 1.0
* @info zookeeper_lock
* @since 2024/10/28 20:18
*/
@Configuration
public class CuratorConfig {
private static String connection = "192.168.100.121:2181,192.168.122:2181,192.168.100.122:2181";
@Bean
public CuratorFramework curatorFramework() {
// 声明重试策略 1秒1次,最多3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//创建 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(connection, retryPolicy);
client.start();
return client;
}
}
2_使用和测试案例
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author shenyang
* @version 1.0
* @info zookeeper_lock
* @since 2024/10/28 20:18
*/
@RestController
@RequestMapping
public class TestController {
/**
* 模拟资源数量
*/
static int resources = 10;
@Resource
private CuratorFramework client;
@GetMapping("test")
public String test(@RequestParam("noId") int noId) throws Exception {
// 1.创建内部互斥锁(此锁可重入,实现原理就是客户端自己维护了一个计数器)
InterProcessMutex lock = new InterProcessMutex(client, "/path_" + noId);
try {
System.out.println("进入test方法,获取锁: " + noId + " ing");
// 2.加锁
lock.acquire();
System.out.println("资源被消耗ing" + " " + Thread.currentThread().getName() + " " + noId + " " + System.currentTimeMillis());
Thread.sleep(10000L);
resources--;
System.out.println("资源数量: " + resources);
} finally {
// 3.解锁
lock.release();
System.out.println("锁被释放了 ed " + Thread.currentThread() + " " + noId + " " + System.currentTimeMillis());
}
return "ok";
}
}
快速模拟两次 HTTP 请求输出如下: