ZooKeeper的节点操作
ZooKeeper的节点类型
ZooKeeper其实也是一个分布式集群,其中维护了一个目录树结构,在这个目录树中,组成的部分是一个个的节点。ZooKeeper的节点可以大致分为两种类型: 短暂类型 和 持久类型
- 短暂类型ephemeral: 客户端和服务器断开后,创建的节点自己删除。
- 持久类型persistent: 客户端和服务器断开后,创建的节点不删除(默认情况)。
节点类型 | 描述信息 |
---|---|
EPHEMERAL | 临时节点,在会话结束后自动被删除。 |
EPHEMERAL_SEQUENTIAL | 临时顺序节点,在会话结束后会自动被删除。会在给定的path节点名称后添加一个序列号。 |
PERSISTENT | 永久节点,在会话结束后不会被自动删除。 |
PERSISTENT_SEQUENTIAL | 永久顺序节点,在会话结束后不会被自动删除。会在给定的path节点名称后添加一个序列号。 |
ZooKeeper的Shell操作
打开Shell客户端
-
连接到当前节点的Server服务
[root@qianfeng01 ~]# zkCli.sh
复制代码 -
连接到其他节点的Server服务
[root@qianfeng01 ~]# zkCli.sh -server qianfeng02:2181
复制代码
Shell操作
ls
作用: 查看某个节点下的子节点
选项:
-s 查看具体信息,包括time、version等信息
注意事项: 需要使用绝对路径查看
示例:
ls /
ls /zookeeper
ls -s /zookeeper/config
复制代码
create
作用: 创建一个节点,可以设置节点的初始内容
选项:
-e: 设置短暂类型节点
-s: 设置顺序节点
示例:
create /test
create /test2 "content message"
create -e /test3 "content message"
create -e -s /test "content message"
复制代码
get
作用: 获取节点存储的值
选项:
-s: 同时获取版本描述信息,例如: time、version等
示例:
get /zookeeper/config
get -s /zookeeper/config
复制代码
hehe 节点数据信息
cZxid = 0x800000002 节点创建时的zxid
ctime = Thu May 09 03:41:15 CST 2019 节点创建的时间
mZxid = 0x800000002 对应节点最近一次修改的时间,与子节点无关
mtime = Thu May 09 03:41:15 CST 2019 节点最近一次更新的时间
pZxid = 0x800000002 对应节点与子节点(或者子节点)的修改的时间,与孙子节点无关
cversion = 0 子节点数据更新次数
dataVersion = 0 本节点数据更新次数
aclVersion = 0 节点授权信息(ACL)的更新次数
ephemeralOwner = 0x0 如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0
dataLength = 4 节点的数据长度
numChildren = 0 子节点的个数
set
作用: 设置节点存储的值
示例:
set /test "content message"
复制代码
delete
作用: 删除节点,只能删除空节点,即没有子节点的节点
示例:
delete /test
复制代码
deleteAll
作用: 删除节点,可以递归删除所有的子节点
示例:
deleteAll /test
复制代码
addWatch
作用: 监听节点,当这个节点发生变化(内容、创建、删除)会得到通知
示例:
addWatch /test
复制代码
removewatches
作用: 移除对节点的监听
示例:
removewatches /test
复制代码
quit
作用: 退出客户端
复制代码
IDEA操作ZooKeeper的API操作
pom依赖
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
复制代码
初始化ZooKeeper客户端对象
package com.qianfeng.zk;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
/**
* ZooKeeper的API操作
*
* @author 千锋大数据教研院
*/
public class ZkAPI {
// Zookeeper客户端对象
private ZooKeeper zkCli;
@Before
public void init() throws IOException, InterruptedException {
// 连接到ZooKeeper的Server端
String connectString = "qianfeng01:2181,qianfeng02:2181,qianfeng03:2181";
// 连接超时时间
int sessionTimeout = 5000;
// 初始化一个ZooKeeper客户端实例,需要参数: 服务端、连接超时时间、观察者做回调
zkCli = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 暂不做任何处理
}
});
}
}
复制代码
创建节点(同步)
注意: 以下的API操作,需要使用到zkCli对象。在上述的初始化部分已经完成了对zkCli对象的初始化。后面的所有操作,只需要将方法粘贴到ZkAPI类中即可。需要导入的包,也在上方的初始化部分导入完成了。
// ACL权限类型:
// OPEN_ACL_UNSAFE: 完全开放的ACL,任何连接的客户端都可以操作该节点
// CREATOR_ALL_ACL: 只有创建者才有ACL权限
// READ_ACL_UNSAFE: 只能读取ACL
//
// CreateMode:
// EPHEMERAL: 临时型
// EPHEMERAL_SEQUENTIAL: 临时顺序型
// PERSISTENT: 永久型
// PERSISTENT_SEQUENTIAL: 永久顺序型
//
// 同步创建节点,遇到不正常的情况直接抛出异常。
@Test
public void createNode() throws InterruptedException, KeeperException {
zkCli.create("/ApiNode1", "ApiContent1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
复制代码
文末扫码有福利!
创建节点(异步)
@Test
public void createNodeAsync() throws InterruptedException {
zkCli.create("/ApiNode2", "ApiContent2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CreateNodeCallBack(), "");
// 因为是异步操作,如果不等待一下,操作直接就结束了,来不及等到回调事件的触发
Thread.sleep(Integer.MAX_VALUE);
}
private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
/**
* 异步创建节点回调方法
* @param i 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
* @param s 创建的节点路径
* @param o 创建节点时传递的ctx
* @param s1 在ZK上实际创建的节点名(针对顺序节点)
*/
@Override
public void processResult(int i, String s, Object o, String s1) {
System.out.println("i = " + i + ", s = " + s + ", o = " + o + ", s1 = " + s1);
switch (i) {
case 0:
System.out.println("节点创建成功");
break;
case -4:
System.out.println("客户端与服务端连接已断开");
break;
case -110:
System.out.println("指定节点已存在");
break;
case -112:
System.out.println("会话已过期");
break;
}
}
}
复制代码
删除节点(同步)
// 同步删除,遇到无法正常删除的情况,直接异常
@Test
public void deleteNode() throws Exception {
zkCli.delete("/ApiNode1", -1);
}
复制代码
删除节点(异步)
@Test
public void deleteNodeAsync() throws Exception {
zkCli.delete("/ApiNode1", -1, new DeleteNodeCallBack(), "");
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 删除节点回调
*/
private static class DeleteNodeCallBack implements AsyncCallback.VoidCallback {
/**
* 异步删除节点回调方法
* @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
* @param path 删除的节点路径
* @param ctx 删除节点时传递的ctx
*/
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("删除结果:rc=" + rc + ",path=" + path + ",ctx=" + ctx);
switch (rc) {
case 0:
System.out.println("节点删除成功");
break;
case -4:
System.out.println("客户端与服务端连接已断开");
break;
case -112:
System.out.println("会话已过期");
break;
default:
System.out.println("服务端响应码" + rc + "未知");
break;
}
}
}
复制代码
修改节点内容(同步)
@Test
public void setNode() throws InterruptedException, KeeperException {
zkCli.setData("/ApiNode1", "hello".getBytes(), -1);
}
复制代码
修改节点内容(异步)
@Test
public void setNodeAsync() throws Exception {
zkCli.setData("/ApiNode2", "hello".getBytes(), -1, new SetNodeCallBack(), "");
Thread.sleep(Integer.MAX_VALUE);
}
private static class SetNodeCallBack implements AsyncCallback.StatCallback {
/**
* @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
* @param path 修改的节点路径
* @param ctx 修改节点时传递的ctx
* @param stat 节点状态,由服务器端响应的新stat替换
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (rc) {
case 0:
System.out.println("节点数据设置成功");
break;
case -4:
System.out.println("客户端与服务端连接已断开");
break;
case -112:
System.out.println("会话已过期");
break;
default:
System.out.println("服务端响应码" + rc + "未知");
break;
}
}
}
复制代码
获取节点内容(同步)
@Test
public void getNode() throws InterruptedException, KeeperException {
// 实例化对象,用于记录节点的状态信息
Stat stat = new Stat();
// 获取数据
byte[] data = zkCli.getData("/ApiNode1", true, stat);
// 打印数据
System.out.println(new String(data));
// 打印节点状态信息
System.out.println(stat);
System.out.println(stat.getVersion());
System.out.println(stat.getCtime());
}
复制代码
获取节点内容(异步)
@Test
public void getNodeAsync() throws InterruptedException {
zkCli.getData("/ApiNode1", true, new GetNodeCallBack(), "");
Thread.sleep(Integer.MAX_VALUE);
}
private static class GetNodeCallBack implements AsyncCallback.DataCallback {
/**
* 获取数据的回调
* @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
* @param path 获取数据的节点路径
* @param ctx 获取数据时传递的ctx
* @param data 获取到的节点数据
* @param stat 获取到的节点状态
*/
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (rc) {
case 0:
System.out.println("节点数据获取成功: " + new String(data));
break;
case -4:
System.out.println("客户端与服务端连接已断开");
break;
case -112:
System.out.println("会话已过期");
break;
default:
System.out.println("服务端响应码" + rc + "未知");
break;
}
}
}
复制代码
获取所有子节点(同步)
@Test
public void getChildren() throws InterruptedException, KeeperException {
List<String> children = zkCli.getChildren("/", true);
System.out.println(children);
}
复制代码
获取所有子节点(异步)
@Test
public void getChildrenAsync() throws InterruptedException {
zkCli.getChildren("/", true, new GetChildrenCallBack(), "");
Thread.sleep(Integer.MAX_VALUE);
}
private static class GetChildrenCallBack implements AsyncCallback.ChildrenCallback {
/**
* 获取到所有的子节点的回调
* @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
* @param path 获取子节点的节点路径
* @param ctx 调用方法传递的ctx
* @param children 所有的子节点
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (rc) {
case 0:
System.out.println("子节点获取成功: " + children);
break;
case -4:
System.out.println("客户端与服务端连接已断开");
break;
case -112:
System.out.println("会话已过期");
break;
default:
System.out.println("服务端响应码" + rc + "未知");
break;
}
}
}