一 zk实现分布式锁
1.1 使用临时顺序节点 的问题
接上一篇文章,每个请求要想正常的执行完成,最终都是要创建节点,如果能够避免争抢必然可以提高性能。这里借助于zk的临时序列化节点,实现分布式锁
1. 主要修改了构造方法和lock方法:
2.并添加了getPreNode获取前置节点的方法。
存在的问题就是羊群效应。
1.2 使用临时顺序节点+watch监听实现阻塞锁
假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”。
在这种羊群效应中,zookeeper需要通知1000个客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表为/locks/lock-0000000000、/locks/lock-0000000001、/locks/lock-0000000002,序号为1的客户端监听序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。
1.3 使用临时顺序节点+watch监听实现阻塞锁的算法逻辑
1.客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/locks/lock-0000000000,第二个为/locks/lock-0000000001,以此类推;
2.客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;
3.执行业务代码;
4.完成业务流程后,删除对应的子节点释放锁。
二 操作步骤
2.1 代码
2.1.1 zk的客户端
初始化则创建临时序列化节点
2.1.2 分布式锁代码
package com.atguigu.distributed.lock.config;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @ClassName: ZkDistributedTempLock
* @Description: TODO
* @Author: admin
* @Date: 2024/01/06 11:05:09
* @Version: V1.0
**/
public class ZkDistributedTempLock implements Lock {
private static final String ROOT_PATH = "/d-zk";
private String path;
private ZooKeeper zooKeeper;
public ZkDistributedTempLock(ZooKeeper zooKeeper,String lockName) throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
// this.path = ROOT_PATH + "/" + lockName+"-";
this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
public void lock() {
try {
String preNode = getPreNode(path);
// 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
if (StringUtils.isEmpty(preNode)) {
return;
} else {//有前一个节点,阻塞,对前一个节点进行监听
CountDownLatch countDownLatch = new CountDownLatch(1);
if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("当前节点=="+path+" 前一个节点:"+ROOT_PATH + "/" + preNode);
countDownLatch.countDown();
}
}) == null) {
return;
}
// 阻塞。。。。
countDownLatch.await();
return;
}
} catch (InterruptedException | KeeperException e) {
// 重新检查。是否获取到锁
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
lock();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock(){
try {
this.zooKeeper.delete(path, 0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 获取指定节点的前节点
* @param path
* @return
*/
private String getPreNode(String path){
System.out.println("path:"+path);
try {
// 获取当前节点的序列化号
Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
// 获取根路径下的所有序列化子节点
List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
// 判空
if (CollectionUtils.isEmpty(nodes)){
return null;
}
// 获取前一个节点
Long flag = 0L;
String preNode = null;
for (String node : nodes) {
// 获取每个节点的序列化号
Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
if (serial < curSerial && serial > flag){
flag = serial;
preNode = node;
}
}
return preNode;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
@Override
public Condition newCondition() {
return null;
}
}
2.1.3 service层
2.1.4 controller层
2.2 nginx代理多服务节点访问
1.服务启动
2.nginx启动
3.jemeter访问
2.3 测试
数据库初始化
测试后