本文Java代码地址: https://gitee.com/blackjie_1/ljUp/tree/master/zookeeperDemo
个人博客网站:什么是快乐
基于docker 安装
拉取zookeeper 3.4.10
docker pull zookeeper:3.4.10
启动服务端
docker run -d -p 2181:2181 -v /root/docker/zookeeper:/home/zookeeper --name zookeeper_1 --restart always zookeeper:3.4.10
启动客户端
docker run -it --``rm` `--link zookeeper_one:zookeeper zookeeper zkCli.sh -server zookeeper
或者
docker exec -it zookeeper_1 zkCli.sh
数据模型
其数据结构类似一个树形结构,每个节点可以拥有子节点并能存储1M的数据
1、持久化
2、临时节点 -e
3、持久化顺序节点 -s
4、临时顺序节点 -es
客户端命令
连接本地zookeeper
docker exec -it zookeeper_1 zkCli.sh
或者
docker run -it --rm --link zookeeper_1:zookeeper_1 zookeeper:3.4.10 zkCli.sh -server zookeeper_1
退出
quit
查看节点
ls /节点名称
创建节点
create /节点名称 [数据]
create -e /节点名称 [数据] 临时节点,当前会话断开时,临时节点会删除
create -s /节点名称 [数据] 顺序节点,节点名称后会有编号
create -s /节点名称 [数据] 临时的顺序节点
获取数据
get /节点名称
设置数据
set /节点名称 [数据]
删除
delete /节点名称
delete all /节点名称 删除节点及节点下所有节点
Java代码操作
maven依赖
<!-- 本次学习zookeeper版本是3.4.10-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
增删改查
连接
public void curatorFramework() {
//重试策略
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);
//第一种方式
// CuratorFramework client = CuratorFrameworkFactory
// .newClient(zookeeperUrl, 60000, 15000, exponentialBackoffRetry);
//第二种方法
client = CuratorFrameworkFactory.builder()
.connectString("192.168.106.128:2181")
.connectionTimeoutMs(15000)
.sessionTimeoutMs(60000)
.retryPolicy(exponentialBackoffRetry)
.namespace("lj")
.build();
client.start();
}
新增
/**
* zookeeper 创建节点 持久、临时、顺序 带有数据
* create
* 1、创建节点并带有数据
* 2、设置节点类型
* 3、创建多级节点
*/
@Test
public void curatorCreate() throws Exception {
//基本创建
String app1 = client.create().forPath("/app1");
//创建带有数据的节点
String s1 = client.create().forPath("/app2", "李杰_app2".getBytes(StandardCharsets.UTF_8));
//创建节点默认类型:持久化。可通过withMode方法设置类型
String s2 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "李杰_临时".getBytes(StandardCharsets.UTF_8));
//创建多级节点
//creatingParentsIfNeeded :创建多级节点,如果父节点不存在则创建父节点
String s3 = client.create().creatingParentsIfNeeded().forPath("/app4/app5", "李杰_多级节点".getBytes(StandardCharsets.UTF_8));
System.out.println(s3);
}
查询
/**
* zookeeper 查询节点
* 1、查询字节点 ls /
* 2、获取数据 get /
* 3、查询节点状态 ls -s
*/
@Test
public void curatorQuery() throws Exception {
//获取数据 getData
byte[] bytes = client.getData().forPath("/app1");
//查询子节点 getChildren
List<String> strings = client.getChildren().forPath("/");
//查询子节点信息+数据信息
//stat 用于获取节点信息,结果会放在stat对象中
Stat stat = new Stat();
byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/app2");
System.out.println(new String(bytes1));
}
修改
/**
* zookeeper 修改节点数据
* 1、修改数据
* 2、根据版本修改数据
*/
@Test
public void curatorUpdate() throws Exception {
//1、修改数据
// client.setData().forPath("/app2","app2_修改".getBytes(StandardCharsets.UTF_8));
//2、根据版本修改数据 withVersion
//获取版本号
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app2");
//根据版本号修改数据
client.setData()
.withVersion(stat.getVersion())
.forPath("/app2","app2_version_update".getBytes(StandardCharsets.UTF_8));
}
删除
/**
* zookeeper 删除节点
* 1、删除单个节点
* 2、删除带有子节点的节点
* 3、必须成功的删除
* 4、回调函数
*/
@Test
public void curatorDelete() throws Exception {
//1、删除数据
client.delete().forPath("/app1");
//2、删除带有子节点的节点 deletingChildrenIfNeeded
client.delete().deletingChildrenIfNeeded().forPath("/app4");
//3、删除子节点 (必须删除成功,本质是重试策略) guaranteed
client.delete().guaranteed().forPath("/app4");
//4、回调函数 inBackground 。在删除后执行的方法
client.delete().guaranteed().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("我被删除了");
System.out.println(curatorEvent);
}
}).forPath("/app4");
}
监听器
* 监听节点
* 1、监听单个节点 nodeCache
* 2、监听节点下所有子节点 PathChildrenCache
* 3、监听单个节点和节点下的所有子节点 TreeCache
监听单个节点
@Test
public void curatorFrameworkWatch() throws Exception {
//监听单个节点
//1、创建监听对象
NodeCache nodeCache = new NodeCache(client,"/app2",false);
//2、注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("节点发生改变,当前值:"+new String(data));
}
});
//3、开启监听
nodeCache.start();
while (true){
}
}
监听某节点下的所有子节点
/**
* 监听某节点的所有子节点
* @throws Exception
*/
@Test
public void curatorFrameworkWatchChildren() throws Exception {
//监听某节点的所有子节点
//1、创建监听对象PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
//2、注册监听
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
//会监听很多数据,包括节点新增,删除,修改连接等
System.out.println("节点发生改变");
System.out.println(event);
//监听子节点数据发生变化
if(PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(event.getType())){
// 确实是子节点数据发生变化,获取变化后的值
byte[] data = event.getData().getData();
String s = new String(data);
System.out.println(s);
}
}
});
//3、开启监听
pathChildrenCache.start();
while (true){
}
}
监听某节点和其所有的子节点
/**
* 监听某节点和其所有子节点
* @throws Exception
*/
@Test
public void curatorFrameworkWatchAll() throws Exception {
//1、创建监听对象PathChildrenCache
TreeCache pathChildrenCache = new TreeCache(client, "/app2");
//2、注册监听
pathChildrenCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
//会监听很多数据,包括节点新增,删除,修改连接等
System.out.println("节点发生改变");
System.out.println(event);
//监听子节点数据发生变化
if(TreeCacheEvent.Type.NODE_UPDATED.equals(event.getType())){
// 确实是节点数据发生变化,获取变化后的值
byte[] data = event.getData().getData();
String s = new String(data);
System.out.println(s);
}
}
});
//3、开启监听
pathChildrenCache.start();
while (true){
}
}
分布式锁
简略概念:多机器下的对于锁的处理。实现方式:
1、redis (性能高,但是不是很可靠)
2、数据库实现(获得锁:数据库新增一条唯一数据。释放锁:删除新增的数据。锁等待:等新增成功。此思想同样可以用redis实现。)
3、zookeeper
Java代码实现
本次使用的锁是InterProcessMutex
主要步骤:
1、构建CuratorFramework client 对象
2、通过client 构建InterProcessMutex 对象:lock= new InterProcessMutex(client, “/lock”);
3、执行业务前获取锁:boolean acquire = lock.acquire(5, TimeUnit.SECONDS);
4、业务结束后释放锁:lock.release();
模拟售票
public class ZookeeperLockTests {
private static class Tick12306{
private int tick=100;
public int buyTick(){
int result=0;
if(tick>0){
result=tick;
tick--;
}else{
System.out.println("无票了");
return -1000;
}
return result;
}
}
private static class OtherTick implements Runnable{
//抢票机构名称
private String name;
//12306票池
private Tick12306 tick12306;
//分布式锁
private InterProcessMutex lock;
public OtherTick(String name,Tick12306 tick12306){
this.name=name;
this.tick12306=tick12306;
//重试策略
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.106.128:2181")
.connectionTimeoutMs(15000)
.sessionTimeoutMs(60000)
.retryPolicy(exponentialBackoffRetry)
.namespace("lj")
.build();
client.start();
lock = new InterProcessMutex(client, "/lock");
}
//抢票
@Override
public void run() {
while (tick12306.tick>0){
try {
//获取锁
boolean acquire = lock.acquire(5, TimeUnit.SECONDS);
if(acquire){
System.out.println(this.name+"抢票:"+tick12306.buyTick());
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
//锁释放
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
Tick12306 tick12306 = new Tick12306();
OtherTick t1 = new OtherTick("携程", tick12306);
OtherTick t2 = new OtherTick("飞猪", tick12306);
Thread thread1 = new Thread(t1);
Thread thread2 = new Thread(t2);
thread1.start();
thread2.start();
}
}