写在前面
本文一起看下zk的基本用法。
安装 。
1:数据结构
采用类似于linux系统的文件系统存储结构,但不同于Linux系统文件,zk每个节点都可以存储数据,结构如下图:
节点类型分为如下四种:
PERSISTENT,持久性ZNode。创建后,即使客户端与服务端断开连接也不会删除,只有客户端主动删除才会消失。
PERSISTENT_SEQUENTIAL,持久性顺序编号ZNode。和持久性节点一样不会因为断开连接后而删除,并且ZNode的编号会自动增加,比如创建/aa/bb,创建结果为/aa/bb00000001这种。
EPHEMERAL,临时性ZNode。客户端与服务端断开连接,该ZNode会被删除。
EPEMERAL_SEQUENTIAL,临时性顺序编号ZNode。和临时性节点一样,断开连接会被删除,并且ZNode的编号会自动增加,比如创建/aa/bb,创建结果为/aa/bb00000001这种。
2:操作
源码 。
//连接地址及端口号
private static final String SERVER_HOST = "127.0.0.1:5181";
//会话超时时间
private static final int SESSION_TIME_OUT = 2000;
2.1:Maven
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
2.2:create
private static void create() throws IOException, KeeperException, InterruptedException {
//参数一:服务端地址及端口号
//参数二:超时时间
//参数三:监听器
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("新增ZNode成功");
zooKeeper.close();
}
2.3:query
private static void query() throws IOException, KeeperException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
//数据的描述信息,包括版本号,ACL权限,子节点信息等等
Stat stat = new Stat();
//返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
byte[] bytes = zooKeeper.getData("/java", false, stat);
//打印结果
System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0
}
2.4:update
private static void update() throws IOException, KeeperException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
//获取节点描述信息
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
System.out.println("更新ZNode数据...");
//更新操作,传入路径,更新值,版本号三个参数,返回结果是新的描述信息
Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
System.out.println("更新后的版本号为:" + setData.getVersion());//更新后的版本号为:1
}
2.5:del
private static void del() throws IOException, KeeperException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
//删除ZNode
zooKeeper.delete("/java", stat.getVersion());
}
2.6:update watch
注意watch只会触发一次。
节点更新触发watch。
private static void testUpdateWatcher() throws Exception {
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
zooKeeper.exists("/java", new MyWatcher());
//对ZNode进行更新数据的操作,触发监听器
zooKeeper.setData("/java", "fly".getBytes(), -1);
}
2.7:delete watch
注意watch只会触发一次。
节点删除触发watch。
private static void testDeleteWatcher() throws Exception {
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
String path = "/java" + Math.random();
System.out.println(zooKeeper.create(path, "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
ZooKeeper zooKeeper1 = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//获取事件的状态
Event.KeeperState state = watchedEvent.getState();
//判断是否是连接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客户端已连接...");
}
}
}
});
// 其他客户端监听
zooKeeper1.exists(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的监听器:" + event.getType().name());
}
});
Stat stat = new Stat();
zooKeeper.getData(path, false, stat);
// zooKeeper.delete(path, stat.getVersion());
// 连接关闭自动删除
zooKeeper.close();
Thread.sleep(Integer.MAX_VALUE);
}
3:实现分布式锁
3.1:原理
当有多个线程并发创建同一个节点时,zookeeper只允许一个线程创建成功,基于此我们就可以通过创建节点的方式来实现,这里的节点我们使用临时节点
,这里使用临时节点的原因是其在会话结束后会自动删除,客户端就不需要关心节点删除的问题,也避免了基于Redis实现分布式锁 时客户端异常断开还需要等到超时key才会删除的问题。具体流程如下:
1:用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。
2:所有客户端争相创建此节点,但只有一个客户端创建成功。
3:创建成功代表获取锁成功,此客户端执行业务逻辑
4:未创建成功的客户端,监听/exlusive_lock变更
5:获取锁的客户端执行完成后,删除/exlusive_lock/lock,表示锁被释放
6:锁被释放后,其他监听/exlusive_lock变更的客户端得到通知,再次争相创建临时子节点/exlusive_lock/lock。此时相当于回到了第2步。
这种方式的优点是实现简单,缺点如下:
1:实现的是非公平锁,容易出现线程的饿死,系统要避免出现饿死,因为饿死可以认为是一定程度上的死锁了
2:锁释放时,zk要同时通知所有节点,所有节点又会同时并发调用zk创建节点,争抢执行资格,当节点较多时,会给zk带来比较大的压力
上述两个问题,我们可以通过基于临时顺序节点
实现分布式锁的方案来解决,具体如下:
1:每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001
2:客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。
3:如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
4:如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
5:当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
监听客户端重新执行第2步逻辑,判断自己是否获得了锁。
以上排队获取锁,实现了公平锁,解决了线程饿死的问题,每个节点只会有一个服务注册watcher,zk每次只需要通知一个节点,且每个服务创建节点的操作只需要执行一次(后续收到通知只需要判断自己创建过的节点是否排在首位就行了)
,也大大降低了zk的服务压力。
针对这两种方案我们接下来分别给出具体的编程实现,pom如下:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
3.2:基于临时节点
源码 。
核心代码如下,争抢创建节点失败的话,注册一个watcher,监听节点删除,再重复争抢过程,直到抢到:
try {
zooKeeper.create(LOCK_KEY_NAME , "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.err.println(Thread.currentThread().getName() + " 争抢锁成功,开始执行临界区业务!");
Thread.sleep((long) (Math.random() * 5000));
System.out.println(Thread.currentThread().getName() + " 执行临界区业务结束!");
// 业务执行完毕,可以关闭
canClose = true;
countDownLatch.countDown();
} catch (KeeperException.NodeExistsException e) {
try {
zooKeeper.exists(LOCK_KEY_NAME, watchedEvent -> {
// 删除
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
System.out.println("锁被释放了,开始抢!");
try {
tryAcquireLock(zooKeeper);
} catch (Exception interruptedException) {
interruptedException.printStackTrace();
}
}
});
System.out.println(Thread.currentThread().getName() + " 争抢锁失败,注册监听下次再抢!!!");
} catch (KeeperException keeperException) {
keeperException.printStackTrace();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
运行如下:
3.3:基于临时顺序节点
源码 。
核心代码如下,创建节点后判断自己是否为第一个,是的话则争抢成功,否则监听前一个节点,之后收到watch通知后判断是否为第一个节点,重复这个过程,直到成功,理论上只要收到通知自己已经处于第一个节点了,但道理虽然如此,程序还是要严谨:
// realLockPath不为空则说明是第一次已经创建节点,其前节点删除触发了watch
private static void tryAcquireLock(ZooKeeper zooKeeper, String realLockPath) {
boolean canClose = false;
try {
String lockPath = LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME;
if (realLockPath == null) {
realLockPath = zooKeeper.create(lockPath , "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> lockPaths = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
Collections.sort(lockPaths);
int index = lockPaths.indexOf(realLockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果lockPath是序号最小的节点,则获取锁
if (index == 0) {
System.err.println(Thread.currentThread().getName() + " 争抢锁成功,开始执行临界区业务!lockPath: " + realLockPath);
Thread.sleep((long) (Math.random() * 5000));
System.out.println(Thread.currentThread().getName() + " 执行临界区业务结束!");
// 业务执行完毕,可以关闭
canClose = true;
countDownLatch.countDown();
} else {
// lockPath不是序号最小的节点,监听前一个节点
String preLockPath = lockPaths.get(index - 1);
String watchPath = LOCK_ROOT_PATH + "/" + preLockPath;
System.out.println(Thread.currentThread().getName() + " 争抢锁失败,注册监听下次再抢!!! watchPath: " + watchPath + " lockPath: " + realLockPath);
// 监听自己前一个节点的删除,理论上,watch执行一次肯定能拿到锁
String finalRealLockPath = realLockPath;
zooKeeper.exists(watchPath, watchedEvent -> {
// 删除
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
System.out.println("前节点锁被释放了,开始抢!");
try {
tryAcquireLock(zooKeeper, finalRealLockPath);
} catch (Exception interruptedException) {
interruptedException.printStackTrace();
}
}
});
}
} catch (KeeperException.NodeExistsException e) {
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (canClose) zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行如下:
对应zk的节点信息变化如下图:
写在后面
参考文章列表:
ZooKeeper入门,看这篇就够了! 。
ZooKeeper分布式锁实现java例子,附完整可运行源代码 。