· ## 4)ZooKeeper JavaAPI 操作
4.1)Curator介绍
•Curator 是 Apache ZooKeeper 的Java客户端库。
•常见的ZooKeeper Java API :
•原生Java API
•ZkClient
•Curator
•Curator 项目的目标是简化 ZooKeeper 客户端的使用。
•Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。
•官网:http://curator.apache.org/
4.2)JavaAPI操作建立连接
1,搭建项目
创建项目curator-zk
引入pom和日志文件
资料文件夹下pom.xml和log4j.properties
/**
* 建立连接
*/
@Test
public void testConnect() {
/**
* @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
* @param sessionTimeoutMs 会话超时时间 单位ms
* @param connectionTimeoutMs 链接超时时间 单位ms
* @param retryPolicy 重试策略
*/
/// 创建重试策略 休眠时间, 重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/// 1 第一种方式
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
// 60 * 1000, 15 * 1000, retryPolicy);
// /// 开启链接
// curatorFramework.start();
/// 2 第二种方式
CuratorFramework build = CuratorFrameworkFactory.builder().connectString("192.168.0.102 :2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();
build.start();
}
4.3)Zookeeper JavaAPI操作-创建节点
/**
- 创建节点:create 持久 临时 顺序 数据
-
- 基本创建 :create().forPath(“”)
-
- 创建节点 带有数据:create().forPath(“”,data)
-
- 设置节点的类型:create().withMode().forPath(“”,data)
-
- 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath(“”,data)
*/
节点可以分为四大类:
- 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath(“”,data)
PERSISTENT 持久化节点
EPHEMERAL 临时节点 :-e
PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest {
private CuratorFramework build;
/**
* 建立连接
*/
@Before
public void testConnect() {
/**
* @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
* @param sessionTimeoutMs 会话超时时间 单位ms
* @param connectionTimeoutMs 链接超时时间 单位ms
* @param retryPolicy 重试策略
*/
/// 创建重试策略 休眠时间, 重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/// 1 第一种方式
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
// 60 * 1000, 15 * 1000, retryPolicy);
// /// 开启链接
// curatorFramework.start();
/// 2 第二种方式
build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();
build.start();
}
/**
* 创建节点 create 持久 临时 顺序 数据
* 1 基本创建
* 2 创建节点, 带有数据
* 3 设置节点的类型
* 4 创建多级节点
*/
@Test
public void testCreate() throws Exception {
/// 1 基本创建
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = build.create().forPath("/app1");
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
/// 2 创建节点, 带有数据
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
/// 第二个参数是getBytes数组
String path = build.create().forPath("/app2","hehe".getBytes());
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
/// 3 设置节点的类型
/// 默认类型:持久化 临时的
String path = build.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
/// 4 创建多级节点
/// 默认类型:持久化 临时的
/// creatingParentContainersIfNeeded():如果父节点不存在,则创建父节点
String path = build.create().creatingParentContainersIfNeeded().forPath("/app4/p1");
System.out.println(path);
}
@After
public void close() {
if(build != null) {
build.close();
}
}
}
4.4)ZookeeperJavaAPI操作-查询节点
/**
- 查询节点:
-
- 查询数据:get: getData().forPath()
-
- 查询子节点: ls: getChildren().forPath()
-
- 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
*/
- 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorTest {
private CuratorFramework build;
/**
* 建立连接
*/
@Before
public void testConnect() {
/**
* @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
* @param sessionTimeoutMs 会话超时时间 单位ms
* @param connectionTimeoutMs 链接超时时间 单位ms
* @param retryPolicy 重试策略
*/
/// 创建重试策略 休眠时间, 重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/// 1 第一种方式
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
// 60 * 1000, 15 * 1000, retryPolicy);
// /// 开启链接
// curatorFramework.start();
/// 2 第二种方式
build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();
build.start();
}
/**
* 查询节点:
* 1 查询数据 get
* 2 查询子节点 ls
* 3 查询节点状态信息 ls -s
* @throws Exception
*/
@Test
public void testGet1() throws Exception {
///1 查询数据 get
byte[] bytes = build.getData().forPath("/app1");
System.out.println(new String(bytes)); ///192.168.198.1
}
@Test
public void testGet2() throws Exception {
///2 查询子节点 ls
//List<String> strings = build.getChildren().forPath("/app4");///[p1]
List<String> strings = build.getChildren().forPath("/"); ///[app2, app1, app4]
System.out.println(strings);
}
@Test
public void testGet3() throws Exception {
///3 查询节点状态信息 ls -s
Stat status = new Stat();
System.out.println(status); ///0,0,0,0,0,0,0,0,0,0,0
byte[] bytes = build.getData().storingStatIn(status).forPath("/app1"); //3,3,1673978538957,1673978538957,0,0,0,0,13,0,3
System.out.println(status);
}
@After
public void close() {
if(build != null) {
build.close();
}
}
}
4.5)Zookeeper JavaAPI操作-修改节点
/**
- 修改数据
-
- 基本修改数据:setData().forPath()
-
- 根据版本修改: setData().withVersion().forPath()
-
- version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
*/
- version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorTest {
private CuratorFramework build;
/**
* 建立连接
*/
@Before
public void testConnect() {
/**
* @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
* @param sessionTimeoutMs 会话超时时间 单位ms
* @param connectionTimeoutMs 链接超时时间 单位ms
* @param retryPolicy 重试策略
*/
/// 创建重试策略 休眠时间, 重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/// 1 第一种方式
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
// 60 * 1000, 15 * 1000, retryPolicy);
// /// 开启链接
// curatorFramework.start();
/// 2 第二种方式
build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();
build.start();
}
///============================SET==============================================
/**
* 修改数据
* 1. 基本修改数据:setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
*
* @throws Exception
*/
@Test
public void testSet() throws Exception {
//1. 基本修改数据:setData().forPath()
build.setData().forPath("/app1", "itcast".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
// * 2. 根据版本修改: setData().withVersion().forPath()
// * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
Stat status = new Stat();
build.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion(); /// 查询出来的 1
System.out.println(version);
build.setData().withVersion(version).forPath("/app1", "haha".getBytes());
}
@After
public void close() {
if(build != null) {
build.close();
}
}
}
4.6)Zookeeper JavaAPI操作-删除节点
/**
- 删除节点: delete deleteall
-
- 删除单个节点:delete().forPath(“/app1”);
-
- 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath(“/app1”);
-
- 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath(“/app2”);
-
- 回调:inBackground
- @throws Exception
*/
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorTest {
private CuratorFramework build;
/**
* 建立连接
*/
@Before
public void testConnect() {
/**
* @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
* @param sessionTimeoutMs 会话超时时间 单位ms
* @param connectionTimeoutMs 链接超时时间 单位ms
* @param retryPolicy 重试策略
*/
/// 创建重试策略 休眠时间, 重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
/// 1 第一种方式
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
// 60 * 1000, 15 * 1000, retryPolicy);
// /// 开启链接
// curatorFramework.start();
/// 2 第二种方式
build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();
build.start();
}
///============================delete==============================================
/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
// 1. 删除单个节点:delete().forPath("/app1");
build.delete().forPath("/app1");
}
@Test
public void testDelete1() throws Exception {
// 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
build.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete2() throws Exception {
// 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
build.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete3() throws Exception {
//4. 回调:inBackground
build.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("我被删除了~");
System.out.println(curatorEvent);//CuratorEventImpl{type=DELETE, resultCode=0, path='/app1',
// name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null, opResults=null}
}
}).forPath("/app1");
}
@After
public void close() {
if(build != null) {
build.close();
}
}
}
4.7)Zookeeper JavaAPI操作-Watch监听概述
•ZooKeeper 允许用户在指定节点上注册一些Watcher(监听器),并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
•ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
•ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便
需要开发人员自己反复注册Watcher,比较繁琐。
•Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
•ZooKeeper提供了三种Watcher:
•NodeCache : 只是监听某一个特定的节点
•PathChildrenCache : 监控一个ZNode的子节点.
•TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合