全局锁是控制全局系统之间同步访问共享资源的一种方式。
下面介绍zookeeper如何实现全民锁,讲解他锁和共享锁两类全民锁。
排他锁
排他锁(Exclusive Locks),又被称为写锁或独占锁,如果事务T1对数据对象O1加上排他锁,那么整个加锁期间,只允许事务T1对O1进行和读取更新操作,其他事务都不能进行读或写。
定义锁:
/exclusive_lock/lock
实现方式:
利用zookeeper的同级节点的唯一性特性,在需要获取排他锁时,所有的客户端尝试通过调用create()接口,在/exclusive_lock节点下创建临时子节点/exclusive_lock/lock,最终只有一个客户端能力创建成功,那么此客户端就获得了一轮锁。同时,所有没有获取到锁的客户端可以在/exclusive_lock节点上注册一个子节点变更的watcher监听事件,以便重新争取获得锁。
共享锁
共享锁(Shared Locks),约定读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1读取进行操作,事务也只能对这个数据对象其他数据对象加共享锁,直到该数据对象上的所有共享锁都释放。
定义锁:
/shared_lock/[hostname]-请求类型W/R-序号
实现方式:
1、客户端调用create方法创建类似定义锁定方式的临时顺序节点。
2、客户端调用 getChildren 接口来获取所有已创建的子节点列表。
3、判断是否获得锁,对于读请求如果所有比自己小子节点都是读请求或者没有比自己序号小子节点,表明已经成功获取共享锁,同时开始执行度逻辑。对于写请求,如果自己不是序号最小的子节点,那么就进入等待。
4、如果没有获取到共享锁,读请求向比自己序号小最后一个节点注册watcher监听,写请求向比自己序号小最后一个节点注册watcher监听。
实际开发过程中,可以通过curator工具包封装的API帮助我们实现全球锁。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>x.x.x</version>
</dependency>
策展人的几种锁方案:
- 1、InterProcessMutex:多重可重入排锁
- 2、InterProcessSemaphoreMutex:遍布它锁
- 3、InterProcessReadWriteLock:全局读写锁
下面的例子模拟50个线程使用重入排它锁InterProcessMutex同时争抢锁:
public class InterprocessLock {
public static void main(String[] args) {
CuratorFramework zkClient = getZkClient();
String lockPath = "/lock";
InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
//模拟50个线程抢锁
for (int i = 0; i < 50; i++) {
new Thread(new TestThread(i, lock)).start();
}
}
static class TestThread implements Runnable {
private Integer threadFlag;
private InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run() {
try {
lock.acquire();
System.out.println("第"+threadFlag+"线程获取到了锁");
//等到1秒后释放锁
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "192.168.3.39:2181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
控制台每间隔一个工作站输出一条记录: