安装与配置
概念
基于观察者模式设计的分布式服务管理框架,负责存储和管理大家都关心数据,然后接受观察者的注册,一单这些数据的这状态发生了变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应
特点
-
zookeeper由一个领导者,和多个跟随者组成的集群
-
集群中有半数节点存活,zookeeper就可以正常服务,所以zookeeper适合安装奇数台服务器
-
全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据一致
-
更新请求顺序一致,来自同一个client的更新请求按其发送顺序依次执行
-
数据更新原子性:一次数据更新要么成功,要么失败
-
实时性,在一定时间范围内,client能读到最新数据
数据结构
Zookeeper数据模型的结构与Unix文件系统类似,整体可以看做是一棵树,每个节点可以被称作一个ZNode。每一个ZNode默认存储1M数据,每个ZNode都可以通过其路径唯一标识
应用环境
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、负载均衡等
-
统一命名服务
- 在分布式环境下,需要对环境进行统一命名,便于识别,例如IP用域名命名
-
统一配置管理
- 一个集群中,所有节点的配置信息是一致的,比如kafka集群
- 对配置文件修改后,希望快速同步到各个节点上
-
统一集群管理
- 根据节点实时做出调整,将节点信息写入zookeeper上的一个ZNode,监听这个ZNode可以获取他们实时的变化
-
服务器节点动态上下线
- 让服务器动态上线下线
-
负载均衡
- 让访问数最少的服务器去处理最新的客户端请求
安装
安装地址:阿里云镜像
放在linux内,解压
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
将
/zookeeper/conf/zoo_example.cfg 改名为/zookeeper/conf/zoo.cfg
更改zoo.cfg配置,路径改为自己自定义的路径,避免存放为tmp文件夹
dataDir=/usr/local/zookeeper-3.7.1/zkData
启动
- 启动服务端(bin下)
./zkServer.sh start
- 启动客户端
./zkCli.sh
[root@localhost zookeeper-3.7.1]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.7.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
配置详解
- zoo.cfg文件内的参数
#服务器leader与客户端follower心跳时间,单位毫秒
tickTime=2000
#LF初始通信时限,初始连接最多可以承受的tickTime数量
initLimit=10
#同步通信时限,如果超过syncLimit * tickTime,leader认为Follower死掉,从服务器节点删除
syncLimit=5
#保存zookeeper内的数据
dataDir=/usr/local/zookeeper-3.7.1/zkData
#通信端口
clientPort=2181
常用命令
数据模型
- 树形目录结构,类似Unix,拥有层次化的结构
- 每个节点被称为ZNode,每个节点会保存自己的数据和节点信息
- 节点拥有子节点,同时也允许少量数据存储在该节点之下
- 节点分类
- PERSISTENT 持久化节点
- EPHEMERAL 临时节点 : -e
- PERSISTENT_SEQUENTIAL 持久化顺序节点: -s
- EPHEMERAL_SEQUENTIAL 临时顺序节点: -es
Zookeeper 服务端
- 启动服务
./zkSever.sh start
- 查看状态
./zkSever.sh status
- 停止服务
./zkSever.sh stop
- 重启服务
./zkSever.sh restart
Zookeeper 客户端
- 启动
/zkCli.sh -server ip:端口
- 退出客户端
./zkClie.sh quit
- 增加节点
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 5] create /app1 test #创建节点app1 数据为test
Created /app1
[zk: localhost:2181(CONNECTED) 6] ls /
[app1, zookeeper]
- 查
#获取节点存放的数据
[zk: localhost:2181(CONNECTED) 9] get /app1
test
- 给节点追加数据,使用set修改数据
[zk: localhost:2181(CONNECTED) 10] create /app2
Created /app2
[zk: localhost:2181(CONNECTED) 12] set /app2 test2
[zk: localhost:2181(CONNECTED) 15] get /app2
test2
- 删除节点
[zk: localhost:2181(CONNECTED) 16] delete /app1
[zk: localhost:2181(CONNECTED) 17] ls /
[app2, zookeeper]
- 删除全部,用来删除节点下有节点的情况
deleteall /app1
- 帮助命令
help
- 创建临时节点和顺序节点
#临时节点 生命周期维持到本次会话结束
[zk: localhost:2181(CONNECTED) 3] create -e /app1
Created /app1
[zk: localhost:2181(CONNECTED) 4] ls /
[app1, app2, zookeeper]
#顺序节点 每次建立按照顺序建立 可以容纳多个
[zk: localhost:2181(CONNECTED) 5] create -s /app3
Created /app30000000003
[zk: localhost:2181(CONNECTED) 6] ls /
[app1, app2, app30000000003, zookeeper]
[zk: localhost:2181(CONNECTED) 7] create -s /app3
Created /app30000000004
[zk: localhost:2181(CONNECTED) 8] create -s /app3
Created /app30000000005
[zk: localhost:2181(CONNECTED) 9] ls /
[app1, app2, app30000000003, app30000000004, app30000000005, zookeeper]
#创建临时顺序节点
[zk: localhost:2181(CONNECTED) 10] create -es /app3
Created /app30000000006
[zk: localhost:2181(CONNECTED) 11] ls /
[app1, app2, app30000000003, app30000000004, app30000000005, app30000000006, zookeeper]
#关闭本次会话再打开 临时节点都没了
[zk: localhost:2181(CONNECTED) 0] ls /
[app2, app30000000003, app30000000004, app30000000005, zookeeper]
- 查看节点详细信息
[zk: localhost:2181(CONNECTED) 5] ls -s /app2
[]
cZxid = 0x9
ctime = Sat May 13 07:15:26 PDT 2023
mZxid = 0xa
mtime = Sat May 13 07:15:38 PDT 2023
pZxid = 0x9
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
JavaAPI操作
Curator介绍
是Apache Zookeeper 的 Java客户端库
常见:
- 原生javaapi
- ZkClient
- Curator:目的是为了简化Zookeeper客户端的使用
Curator API
- maven依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>1.18.24</version>
</dependency>
<!-- 显示lombok日志信息-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
连接客户端
@Test
public void testConnect(){
//重试策略 3s一次 重试10次
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000,10);
//第一种方式
//zkSever服务端地址和端口 多个用逗号隔开 会话超时时间(ms) 连接超时时间(ms) 重试策略
// CuratorFramework client = CuratorFrameworkFactory.newClient(
// "192.168.230.1:2181",
// 60 * 1000,
// 15 * 1000,
// retry);
// client.start();
//第二种方式
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.230.1:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.namespace("zkServer") //名称空间 以后做的任何事情 会将命名的设置为根目录
.build();
client.start();
}
操作节点
优化连接
CuratorFramework client = null;
@Before
public void connect(){
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("192.168.230.1:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.namespace("zkServer") //名称空间 以后做的任何事情 会将命名的设置为根目录
.build();
client.start();
}
/**
* 释放资源
*/
@After
public void close(){
if(client != null){
client.close();
}
}
增加节点
/**
* 创建节点
*/
@Test
public void create() throws Exception {
//1.基本创建 没有指定数据,将当前客户端ip作为数据存入
String s1 = client.create().forPath("/app1"); // /zkServer/app1
System.out.println(s1);
//2.带有数据的
String s2 = client.create().forPath("/app2","data hello".getBytes());
System.out.println(s2);
//3.创建临时节点 通过withMode(CreateMode.EPHEMERAL)指定类型
String s3 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(s3); //打个断点 查看客户端 可以看见app3 如果程序意外停止 app3不会被删除
//4.创建多级节点 creatingParentsIfNeeded() 父节点不存在也可也创建
String s4 = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(s4);
}
- 客户端结果
[zk: localhost:2181(CONNECTED) 13] ls /
[zookeeper]
#1.基本创建
[zk: localhost:2181(CONNECTED) 14] ls /
[zkServer, zookeeper]
[zk: localhost:2181(CONNECTED) 15] ls /zkServer
[app1]
[zk: localhost:2181(CONNECTED) 16] get /zkServer/app1
192.168.230.10
#2.带有数据的
[zk: localhost:2181(CONNECTED) 17] ls /
[zkServer, zookeeper]
[zk: localhost:2181(CONNECTED) 18] ls /zkServer
[app1, app2]
[zk: localhost:2181(CONNECTED) 19] get /zkServer/app2
data hello
#3.临时节点
[zk: localhost:2181(CONNECTED) 20] ls /
[zkServer, zookeeper]
[zk: localhost:2181(CONNECTED) 21] ls /zkServer
[app1, app2]
#debug还没关闭
[zk: localhost:2181(CONNECTED) 22] ls /zkServer
[app1, app2, app3]
# debug 关闭之后
[zk: localhost:2181(CONNECTED) 23] ls /zkServer
[app1, app2]
#4多级节点
[zk: localhost:2181(CONNECTED) 24] ls /zkServer
[app1, app2, app4]
[zk: localhost:2181(CONNECTED) 25] ls /zkServer/app4
[p1]
[zk: localhost:2181(CONNECTED) 26]
查询节点
/**
* 查询节点
*/
@Test
public void query() throws Exception {
//1.查询数据
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes)); //192.168.230.10
//2.查询子节点
List<String> list = client.getChildren().forPath("/");
list.forEach(System.out::println); //app1 app2 app4
//3.查询状态
Stat status = new Stat();
//将查询到的结果封装给status对象
client.getData().storingStatIn(status).forPath("/app1");
System.out.println(status); //33,33,1684034344358,1684034344358,0,0,0,0,14,0,33 懒蛋程序员
}
修改节点
@Test
public void update() throws Exception {
//1.修改数据,但是可能发生并发问题
// client.setData().forPath("/app1","testSet".getBytes());
//2.CAS修改 版本号法
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();
System.out.println(version); //1
//根据查询出的version进行修改,如果修改的version和内部的version不匹配,将会报错
client.setData().withVersion(version).forPath("/app1","haha".getBytes());
}
- 客户端
[zk: localhost:2181(CONNECTED) 28] ls -s /zkServer/app1
[]
cZxid = 0x21
ctime = Sat May 13 20:19:04 PDT 2023
mZxid = 0x44
mtime = Sun May 14 00:30:25 PDT 2023
pZxid = 0x21
cversion = 0
dataVersion = 2 #此处为2,上面的为1 操作过之后会+1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
删除节点
/**
* 删除
* @throws Exception e
*/
@Test
public void delete() throws Exception {
//1.普通删除
client.delete().forPath("/app1");
//2.删除全部(带子节点)
client.delete().deletingChildrenIfNeeded().forPath("/app4");
//3.一定成功的删除 防止网络原因没有成功删除
client.delete().guaranteed().forPath("/app3");
//4.回调 inBackground()
client.delete().guaranteed().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("我被删除啦");
System.out.println(curatorFramework);
System.out.println(curatorEvent);
}
}).forPath("/app3");
}
- 控制台结果
我被删除啦
org.apache.curator.framework.imps.CuratorFrameworkImpl@1eda9cdc
CuratorEventImpl{type=DELETE, resultCode=-101, path='/app3', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null, opResults=null}
Watch事件监听
zookeeper允许用户在指定节点上注册一些watcher,并且在一些特定事件触发的时候,zookeeper服务端将会将事件通知到感兴趣的客户端上,该机制是zookeeper实现分布式协调服务的重要特性。
根据watch机制可以实现发布订阅功能,可以让多个订阅者同时监听一个对象,当一个对象自身状态变化时,会通知所有订阅者。
zookeeper原生支持通过注册watch来进行事件监听,但是其使用并不是很方便,需要自己实现,比较繁琐
curator引入了cache来实现对zookeeper服务端事件的监听
zookeeper提供了三种watcher
- NodeCache:只是监听某一个特定的节点
- PathChildrenCache:监控一个ZNode的字节点
- TreeCache:可以监控整个树上的所有节点,类似PathChildrenCache和NodeCache的组合
- NodeCache普通节点监听
/**
* 给指定节点 注册监听
*/
@Test
public void NodeCacheTest() throws Exception {
//1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client, "/app1");
//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
//对之后的操作进行记录
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.开启监听 true表示,开启监听,将会一次性加载缓存内的数据
nodeCache.start(true);
while (true){
}
}
- PathChildrenCache监听子节点
/**
* 监听某个节点的所有子节点们 只能监听儿子们 孙子们和自己不监听
*/
@Test
public void PathChildrenCacheTest() throws Exception {
//true 是否缓存状态信息
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//绑定监听
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化");
System.out.println(curatorFramework);
System.out.println(pathChildrenCacheEvent);
//监听子节点数据变更
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//特别对修改数据感兴趣
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
//getData() 是获取孩子数据 getData().getData() 是获得具体孩子修改数据
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
pathChildrenCache.start();
while (true);
}
新增节点和修改节点
[zk: localhost:2181(CONNECTED) 47] create /zkServer/app2/p2
Created /zkServer/app2/p2
#控制台输出
子节点变化
org.apache.curator.framework.imps.WatcherRemovalFacade@d013913
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/app2/p2', stat=101,101,1684052651069,1684052651069,0,0,0,0,0,0,101
, data=null}}
[zk: localhost:2181(CONNECTED) 49] set /zkServer/app2/p2 aaa
#控制台输出
子节点变化
org.apache.curator.framework.imps.WatcherRemovalFacade@d013913
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p2', stat=101,103,1684052651069,1684052677940,1,0,0,0,3,0,101
, data=[97, 97, 97]}}
aaa #因为if只对修改感兴趣
- TreeCache,上面两个合体
/**
* 监听自己和子节点变化
*/
@Test
public void TreeCacheTest() throws Exception {
//1.创建
TreeCache treeCache = new TreeCache(client, "/app2");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("子节点变化");
System.out.println(curatorFramework);
System.out.println(treeCacheEvent);
//监听子节点数据变更
}
});
//3.开启
treeCache.start();
while (true);
}
分布式锁
除了使用mysql、redis实现分布式锁以外,现在更多的使用zookeeper实现分布式锁
原理
**核心思想:**使用临时顺序节点,当客户端需要获取锁,则创建节点(不能有多个客户端创建同一个节点),使用完锁之后删除该节点
-
客户端获取锁的时候,在lock节点下创建临时顺序节点
-
然后获取lock下面所有子节点,客户端获取所有子节点之后,如果发现自己创建的子节点序号最小,那么就任务该客户端获取到了锁,使用完之后,将该节点删除
-
如果自己不是最小的,说明自己还没获取到锁,那么客户端需要找到比自己小的那个节点,同时对其注册事件监听,监听删除事件
-
如果发现比自己小的那个节点被删除,则客户端的watcher会收到通知,此时再判断自己创建的节点是否是lock序列最小的,如果是获取锁,不是重复第三步
模拟12306售票
Curator中有五种锁案例:
- InterProcessSemaphorMutex:分布式排他锁(非可重入)
- InterProcessMutex:分布式可重入排他锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
编写案例
- 12306类
/**
* @author 我见青山多妩媚
* @date 2023/5/14 0014 17:04
* @Description TODO
*/
public class Ticket12306 implements Runnable{
//票数
private int tickets = 10;
//分布式锁
private InterProcessMutex lock;
public Ticket12306(){
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.230.1:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.build();
client.start();
lock = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while (true){
//获取锁 3s内
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread().getName()+":"+tickets);
tickets--;
}else {
break;
}
}catch (Exception e){
e.printStackTrace();
} finally {
try {
//释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
- 测试类
/**
* @author 我见青山多妩媚
* @date 2023/5/14 0014 16:42
* @Description TODO
*/
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket, "携程");
Thread t2 = new Thread(ticket, "飞猪");
t1.start();
t2.start();
}
}
运行没问题,我们看客户端
[zk: localhost:2181(CONNECTED) 53] ls /
[lock, zkServer, zookeeper]
[zk: localhost:2181(CONNECTED) 54] ls /lock
[_c_780e8659-46bd-44fa-ae1a-c6d63df66a76-lock-0000001543, _c_a1d87292-26c3-452c-b02b-f9c4cf19711e-lock-0000001542]
这里142肯定比143先获取锁,因为他小
长时间不用时,lock节点将会被自动删除