【ZooKeeper】第二章 JavaAPI 操作
文章目录
- 【ZooKeeper】第二章 JavaAPI 操作
- 一、Curator 简介
- 二、Curator API
- 1.建立连接
- 2.创建节点
- 3.查询节点
- 4.修改节点
- 5.删除节点
- 6.Watch 事件监听
- 三、分布式锁
- 四、案例:12306售票
一、Curator 简介
- Curator 是 Apache ZooKeeper 的 Java 客户端库
- 常见的 ZooKeeper Java API:
- 原生Java API
- ZkClient
- Curator
- Curator 项目的目标是简化 Zookeeper 客户端的使用
- Curator 最初是 Netflix 研发的,后来捐献给了 Apache 基金会
- 官网:curator.apache.org
二、Curator API
1.建立连接
public void testConnect(){
//1.第一种方式
/*
* @param connectString list of servers to connect to 格式:ip:port,ip:port
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy retry policy to use
*/
String connectString = "172.20.10.2:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client1 = CuratorFrameworkFactory.newClient(connectString, 60*1000, 15*1000, retryPolicy);
//2.第二种方式
CuratorFramework client2 = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(60*1000)
.connectionTimeoutMs(15*1000)
.retryPolicy(retryPolicy)
//操作目录时会自动拼接前缀 /sisyphus
//.namespace("sisyphus")
.build();
//开启连接
//client1.start();
client2.start();
}
2.创建节点
public void testCreate() throws Exception {
//1.基本创建
//如果创建节点时没有指定数据,则默认将当前客户端的ip作为数据存储
String path1 = client.create().forPath("/app1");
System.out.println(path1);
//2.创建带数据的节点
String path2 = client.create().forPath("/app2", "hello".getBytes());
System.out.println(path2);
//3.设置节点的类型
String path3 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path3);
//4.创建多级节点
String path4 = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1");
System.out.println(path4);
}
3.查询节点
public void testGet() throws Exception {
//testCreate();
//1.查询数据:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
//2.查询子节点:ls
List<String> path = client.getChildren().forPath("/app4");
System.out.println(path);
//3.查询节点的状态信息:ls -s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
}
4.修改节点
public void testSet() throws Exception {
//1.修改数据
client.setData().forPath("/app1", "demo".getBytes());
//2.根据版本修改
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion();
client.setData().withVersion(version).forPath("/app1", "demo".getBytes());
}
5.删除节点
public void testDelete() throws Exception{
//1.删除单个节点
client.delete().forPath("/app1");
//2.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
//3.必须成功的删除,会在会话有效期内不断重试直至成功
client.delete().guaranteed().forPath("/app2");
//4.回调
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了");
System.out.println(event);
}
}).forPath("/app2");
}
6.Watch 事件监听
- ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上,该机制是 ZooKeeper 实现分布式协调服务的重要特性
- ZooKeeper 中引入了 Watcher 机制实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者
- ZooKeeper 原生支持通过注册 Watcher 来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册 Watcher,比较繁琐
- Curator 引入了 Cache 来实现对 ZooKeeper 服务端事件的监听
- ZooKeeper 提供了三种 Watcher
- NodeCache:只是监听某一个特定的节点
- PathChildrenCache:监控一个 ZNode 的子节点
- TreeCache:可以监控整个树上的所有节点,类似于 PathChildrenCache 和 NodeCache 的组合
public void testNodeCache() throws Exception {
//1.创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client, "/app1");
//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
//获取修改后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(Arrays.toString(data));
}
});
//3.开启监听,如果设置为true,则开启监听时加载缓存数据
nodeCache.start(true);
TimeUnit.SECONDS.sleep(60);
}
@Test
public void testPathChildrenCache() throws Exception{
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
//2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
//监听子节点的数据变更,并且拿到变更后的数据
//获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//判断类型是否是CHILD_UPDATED
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("数据变更!");
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(Arrays.toString(data));
}
}
});
//3.开启监听
pathChildrenCache.start();
TimeUnit.SECONDS.sleep(120);
}
三、分布式锁
- 在我们进行单机应用开发时涉及并发同步的时候,我们往往采用 synchronized 或者 Lock 的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个 JVM 之下,没有任何问题
- 但当我们的应用是分布式集群工作的情况下,属于多 JVM 下的工作环境,跨 JVM 之间已经无法通过多线程的锁解决同步问题
- 那么就需要一种更高级的锁机制,来处理这种跨机器的进程之间的数据同步问题——这就是分布式锁
ZooKeeper 分布式锁原理
- 核心思想:当客户端要获取锁,则创建节点,使用完锁,删除该节点
- 客户端获取锁时,在 lock 节点下创建临时(防止某个集群节点宕机无法释放锁)顺序节点
- 然后获取 lock 下面的所有子节点,客户端获取到所有的子节点后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除
- 如果发现自己创建的节点并非 lock 所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
- 如果发现比自己小的那个节点被删除,则客户端的 Watcher 会收到相应通知,此时再次判断自己创建的节点是否是 lock 子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听
在 Curator 中有五种锁方案
- InterProcessSemaphoreMutex:分布式排他锁(非可重入锁)
- InterProcessMutex:分布式可重入排他锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
四、案例:12306售票
模拟 12306 的数据库和被调用的减库存方法
package com.sisyphus.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets = 10; //数据库的票数
private InterProcessMutex lock;
public Ticket12306() {
String connectString = "172.20.10.2:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(60*1000)
.connectionTimeoutMs(15*1000)
.retryPolicy(retryPolicy)
//操作目录时会自动拼接前缀 /sisyphus
.namespace("sisyphus")
.build();
client.start();
lock = new InterProcessMutex(client, "/lock");
}
@Override
public void run() {
while(true){
//加锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread() + ":" + tickets);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try{
lock.release();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
用多个线程模拟 12306 多个服务器之间的锁竞争
package com.sisyphus.curator;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306, "携程");
Thread t2 = new Thread(ticket12306, "飞猪");
t1.start();
t2.start();
}
}