前置工作
创建Maven工程
导入依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
配置log4j
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
API 操作实例
初始化方法
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
//process方法可以为空 在这里设置的监听并不是监听一次就关闭而是一直监听
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("----------init------------");
List<String> children = null;
try {
children = zkClient.getChildren("/sanguo", true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
for (String child : children) {
System.out.println(child);
}
}
});
}
创建节点
create方法的参数:
- path:要创建的节点路径
- data:节点的数据
- acl:一个List<ACL>类型的权限集合
-
createMode:创建的节点类型
@Test
public void create() throws InterruptedException, KeeperException {
//创建节点并返回节点路径/sanguo/wuguo
String Znode = zkClient.create("/sanguo/wuguo", "liubei".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Znode : "+Znode);
}
获取子节点并监听
只要在init方法中新建了Watcher类并实现了process方法,就会一直监听而不是监听一次就自动关闭。
@Test
public void getChildren() throws InterruptedException, KeeperException {
System.out.println("--------------------");
/**设置watcher为true之后就会自动执行init方法中watcher的重写的process方法
* 如果process方法为空,则这里只会监听一次就关闭监听
**/
List<String> children = zkClient.getChildren("/sanguo", true);
/**这个for循环只会被执行一次 真正的监听在初始化ZooKeeper对象中重写Wather的process方法里
*一旦监听的节点有变化,就会从process方法输出监听到的变化结果
**/
for (String child : children) {
System.out.println(child);
}
//延时程序 作用就是让客户端一直开启 以方便返回监听到的变化结果
Thread.sleep(Long.MAX_VALUE);
}
我们发现,我们监听的是"/sanguo"但是当我们在它的孙目录"/sanguo/shuguo"下创建一个新节点时监听就不会生效,这是因为它只能监听子节点。
判断节点是否存在
@Test
public void exits() throws InterruptedException, KeeperException {
//返回节点状态 参数1是节点 参数2是是否开启监听
Stat stat = zkClient.exists("/sanguo", false);
System.out.println(stat==null?"not exist":"exit");
}
写操作流程
我们之前的操作都是在向zookeeper集群写数据,因为我们可以把zookeeper集群看做一个分布式小型文件系统,它存储着大量的Znode数据,只要一台服务器修改了Znode信息,我们集群中的每个节点都可以立即更新。
写数据流程通常只有两种情况:
- 直接发送写入请求给Leader
- 直接发送写入请求给Flower
1. 直接发送给Leader
- 客户端向Leader发送写数据请求
- 写入数据会首先写入领导者节点,领导者将数据同步到所有副本节点,当超过半数节点完成数据同步并回复成功消息后,写操作就算完成了,并通知客户端写入成功。(加入我们的zookeeper集群有三台节点,那么只要两台(Leader+任意一台Flower)同步完成就算写入成功)
- 之后剩余未同步的节点也会马上将数据同步,并告诉Leader自己同步完成。
2. 直接发送请求给Flower
- 客户端发送写入请求给Flower
- Flower没有写入的权限,于是将请求转发给Leader
- Leader收到请求,自己先写入一份,再将数据同步到所有副本节点,同样当超过半数的节点写入完成后,写操作就算完成了。
- 这时候Leader不会去告诉客户端写入完成,而是由向它转发请求的Flower原路返回给客户端。
- 同样之后剩余未同步的节点也会立即将客户端写入的数据同步,并告诉Leader自己已经同步完成。