ZooKeeper数据模型
- ZooKeeper是一个树形目录服务,其数据模型和Uiix的文件目录树很类似,拥有一个层次化结构。
- 这里面的每一个节点都被称为:ZNode,每个节点上都会保存自己的数据和节点信息。
- 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
- 节点可以分为四大类:
-
- PEFSISTENT持久化节点
- EPHEMERAL临时节点:-e
- PERSISTENT_SEQUENTIAL持久化顺序节点:-s
- EPHEMERAL_SEQUENTIAL临时顺序节点:-es
ZooKeeper服务端常用命令
- 启动ZooKeeper服务:
./zkServer.sh start
- 查看ZooKeeper服务:
./zkServer.sh status
- 停止ZooKeeper服务:
./zkServer.sh stop
- 重启ZooKeeper服务:
./zkServer.sh restart
ZooKeeper客户端命令
- ./zkCli.sh -server localhost:2181连接服务端,如果是单机后面的可以省略不写。
- ls [/] :查看指定节点下子节点
- create [/app] [hrbu]:创建一个名为/app1的子节点,并存放数据。
- get [/app] :获取节点下的数据。
- set [/app] [hrbu]:给指定节点设置数据
- delete [/app] :删除指定节点 ps:此命令无法删除存在子节点的节点,如果要删除带有子节点的节点可以是使用deleteall [/app] 命令。
- quit 断开连接
- help 查看命令帮助
- create -e [/app] 创建临时节点,会话关闭就会删除
- create -s [/app] 创建顺序节点
- create -es [/app] 创建临时顺序节点
- ls -s [/app] 查看节点的详细信息
使用Curator API操作Zookeeper
建立连接
@Test
public void testConnect() {
//重试策略
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
//第一种方式
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.120:2181", 60 * 1000, 15 * 1000, retry);
//第二种方式
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.130.120:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry).namespace("hrbu").build();
//开启连接
client.start();
}
参数解读
- connectString – list of servers to connect to (ZooKeeper的地址)
sessionTimeoutMs – session timeout (会话超时时间)
connectionTimeoutMs – connection timeout (连接超时时间)
retryPolicy – retry policy to use (重试策略)
会话超时时间和连接超时时间有默认值。
第二种链式编程的方式可以指定一个工作空间,在此客户端下的所有操作都会将此工作空间作为根目录。
注意
如果使用的是云服务器需要将指定端口打开
firewall-cmd --zone=public --add-port=2181/tcp --permanent
开放端口
firewall-cmd --zone=public --list-ports
查看已经开放的端口
systemctl restart firewalld
重启防火墙生效
最后别忘了在服务器的安全组里面添加端口,将2181端口打开
添加节点
@Test
public void testCreate1() throws Exception {
//基本创建
CreateBuilder createBuilder = client.create();
//创建时不指定数据,会将当前客户端ip存到里面
createBuilder.forPath("/app1");
//指定数据
createBuilder.forPath("/app2", "hello".getBytes());
}
@Test
public void testCreate2() throws Exception {
CreateBuilder createBuilder = client.create();
//设置节点类型,默认的类型是持久化
//CreateMode是枚举类型
createBuilder.withMode(CreateMode.EPHEMERAL).forPath("/app3");
}
@Test
public void testCreate3() throws Exception {
CreateBuilder createBuilder = client.create();
//创建多级节点,如果父节点不存在,则创建父节点。
createBuilder.creatingParentContainersIfNeeded().forPath("/app4/app4_1");
}
查询节点
@Test
public void testGet() throws Exception {
//查询数据
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//查询子节点
List<String> strings = client.getChildren().forPath("/app4");
strings.forEach(System.out::println);
//查询节点状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
}
修改节点
@Test
public void testSet() throws Exception {
//修改数据
client.setData().forPath("/app1","hrbu".getBytes());
//根据版本修改
int version = 0;
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
version = stat.getVersion();
client.setData().withVersion(version).forPath("/app1", "HRBU".getBytes());
}
删除节点
@Test
public void testDelete() throws Exception {
//删除单个节点
client.delete().forPath("/app4/app4_1");
//删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
//强制删除
client.delete().guaranteed().forPath("/app4");
//回调
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("执行删除操作");
}
}).forPath("/app4");
}
Watch事件监听
-
Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性。
-
ZooKeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
-
ZooKeeper原生支持通过注册Watcher来进行事件监听,但是使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。
-
Curator引入了Cache来时限对Zookeeper服务端事件的监听。
-
ZooKeeper提供了三种Watcher:
-
- NodeCache:只是监听某一个特定的节点。
- PathChildrenCache:监控一个Node的子节点。
- TreeCache:可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合。
NodeCache
@Test
public void testNodeCache() throws Exception {
//NodeCache:指定一个节点注册监听器
//创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client, "/app1");
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("app1节点发生变化");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("变化后的节点:"+new String(data));
}
});
//开启监听,如果为true,则开启则开启监听,加载缓冲数据
nodeCache.start(true);
}
PathChildrenCache
@Test
public void testPathChildrenCache() throws Exception {
//PathChildrenCache:监听某个节点的所有子节点
//创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/hrbu", true);
//绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生变化");
System.out.println(pathChildrenCacheEvent);
//监听子节点的数据变更,并且得到变更后的数据
//获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//判断类型
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
//获取数据
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
//开启
pathChildrenCache.start();
}
TreeCache
@Test
public void testTreeCache() throws Exception {
//创建监听器
TreeCache treeCache = new TreeCache(client, "/");
//注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点发生变化");
System.out.println(treeCacheEvent);
}
});
//开启
treeCache.start();
}
分布式锁实现
概述
- 我们在进行单机应用开发,涉及并发同步的时候,,我们往往采用synchronized或者lock的方式来解决多线程间的代码同步问题,这时候多线程的运行都是在同一个JVM之下,没有任何问题。
- 但当我们的应用时分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
- 那么就需要一种更加高级的锁机制,来处理跨机器进程之间的数据同步问题,这就是分布式锁。
Zookeeper分布式锁原理
- 核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
-
- 1.客户端获取锁时,在lock节点下创建临时顺序节点。
- 2.然后获取lock下面的所有子节点,客户端获取到所有的子节之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
- 3.如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
- 4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否时lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取比自己小的一个节点并注册监听。
Curator实现分布式锁API
-
在Curator中有五种锁方案:
-
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
-
- InterProcessMutex:分布式可重入排它锁
-
- InterProcessReadWriteLock:分布式读写锁
-
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
-
- InterProcessSemaphoreV2:共享信号量
案例
package com.hrbu.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets = 10;//数据库的票数
private InterProcessMutex lock ;
public Ticket12306(){
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.130.32.75:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//开启连接
client.start();
lock = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while(true){
//获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.hrbu.curator;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"飞猪");
t1.start();
t2.start();
}
}