Zookeeper学习笔记
- 四、客户端命令
- 4.1、新增节点
- 4.2、查询节点信息
- 4.3、节点类型
- 4.4、更新节点
- 4.5、删除节点
- 4.6、监听器
- 五、SpringBOOT整合Zookeeper
- 六、写数据流程
- 6.1、写流程之写入请求直接发送给Leader节点
- 6.2、写流程之写入请求发送给follower节点
- 七、服务器动态上下线监听
- 7.1、案例
- 八、分布式锁
- 8.1、分析
- 8.1、案例
- 九、其他
- 9.1、生产集群安装多少k合适
四、客户端命令
在bin目录下 集群客户端启动命令
./zkCli.sh -server 192.168.3.34:2181
4.1、新增节点
命令格式:
# -s 为有序节点, -e 为临时节点
create [-s] [-e] path data
创建持久化节点并写入数据:
# 创建 hadoop节点,内容为123456
create /hadoop "123456"
# 读取节点内容
get /hadoop
创建持久化有序节点。此时创建出来的节点名称为:指定的节点名+自增序号:
#创建出来的节点名称为:指定的节点名+自增序号:# 此时创建出来的节点名称为/a0000000001
create -s /a "a"
# 再创建/b时,节点名称为/b0000000002
create -s /b "b"
创建临时节点:
create -e /tmp "tmp"
# 创建完之后,通过get /tmp可以查到
get /tmp
# 使用quit退出当前会话
quit
# 重新打开zkCli,get /tmp 找不到该节点
get /tmp
创建临时有序节点,可用于分布式锁:
# 创建的临时节点:/t0000000004
create -s -e /t "tt"
4.2、查询节点信息
节点数据信息
ls -s /
节点属性说明:
- cZxid:数据节点创建时的事务ID
- ctime:数据节点创建时的时间
- mZxid:数据节点最后一次更新时的事务ID
- pZxid:数据节点最后一次更新时的时间
- cversion:子节点的更改次数
- dataVersion:节点数据的更改次数
- aclVersion:节点的ACL的更改次数
- ephemeralOwner:如果节点是临时节点,则表示创建该节点的会话sessionID。如果节点是持久节点,则该属性值为0
- dataLength:数据内容的长度
numChildren:数据节点当前的子节点个数
4.3、节点类型
持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
- 持久化目录节点
客户端与Zookeeper断开连接后,该节点依旧存在 - 持久化顺序编号目录节点
客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号由父节点维护 - 临时目录节点
客户端与Zookeeper断开连接后,该节点被删除 - 临时顺序编号目录节点
客户端与Zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。让天下漫有难学的技
4.4、更新节点
set /hadoop "1234"
4.5、删除节点
delete /hadoop
4.6、监听器
1、监听原理详解
- 首先要有一个main()线程
- 在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet) ,一个负责监听(listener) 。
- 通过connect线程将注册的监听事件发送给Zookeeper。
- 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
- Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
- listener线程内部调用了process()方法。
监听节点数据的变化
get path [watch]
# 注册一次,只能监听一次。想再次监听,需要再次注册。
监听子节点增减的变化
ls path [watch]
五、SpringBOOT整合Zookeeper
SpringBOOT整合Zookeeper
六、写数据流程
6.1、写流程之写入请求直接发送给Leader节点
- 客户端想服务器leader写入数据
- leader向slave1写入
- slave1写完向leader发送ack表示自己写入完成
- 只要超过半数,就可以应答了 leader向客户端发送ack
- leader向slave2写入
- slave2写完向leader发送ack表示自己写入完成
6.2、写流程之写入请求发送给follower节点
- 客户端想服务器slave1写入数据
- slave1没有写入权限、将数据转发给leader
- leader写入后再向slave1写入
- slave1写完向leader发送ack表示自己写入完成
- 只要超过半数,就可以应答了 leader向客户端发送ack
- leader向slave2写入
- slave2写完向leader发送ack表示自己写入完成
七、服务器动态上下线监听
7.1、案例
先在客户端创建一个结点
create /servers "servers"
Server端 (创建3个 在 server.regist(“zk1”)改成不同的名字zk1、zk2、zk3)
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer01 {
//,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名
private String connectString = "master:2181,slave1:2181,slave2:2181";
//超时时间
private int sessionTimeout = 2000;
//zk客户端
private ZooKeeper zkClient;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer01 server=new DistributeServer01();
//1.获取zk连接
server.getConnect();
//2.注册服务器到zk集群
server.regist("zk1");
//3.启动业务逻辑(睡觉)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void regist(String hostname) throws InterruptedException, KeeperException {
String s = zkClient.create("/servers/", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+"已经上线");
}
private void getConnect() throws IOException {
zkClient= new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
Client端
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
//,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名
private String connectString = "master:2181,slave1:2181,slave2:2181";
//超时时间
private int sessionTimeout = 2000;
//zk客户端
private ZooKeeper zkClient;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient client=new DistributeClient();
//1.获取zk连接
client.getConnect();
//2.监听/servers下面子节点的增加和删除
client.getServerList();
//3.业务逻辑(睡觉)
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws InterruptedException, KeeperException {
List<String> children = zkClient.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String s:children){
try {
byte[] data = zkClient.getData("/servers/" + s, false, null);
servers.add(new String(data, "utf-8"));
}catch (Exception e){}
}
System.out.println(servers);
}
private void getConnect() throws IOException {
zkClient= new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getServerList();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
逐步启动DistributeServer01 、DistributeServer02、DistributeServer013 启动一个看一下DistributeClient 的控制台
八、分布式锁
8.1、分析
- 接收到请求后,在/locks节点下创建一个临时顺序节点
- 判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听
- 获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复第二步判断
8.1、案例
DistributeLock
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributeLock {
//,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名
private String connectString = "master:2181,slave1:2181,slave2:2181";
//超时时间
private int sessionTimeout = 2000;
//zk客户端
private ZooKeeper zkClient;
private CountDownLatch countDownLatch=new CountDownLatch(1);
private CountDownLatch waitDownLatch=new CountDownLatch(1);
//前一个结点的路径
private String waitPath;
private String currentMode;
public DistributeLock() throws IOException, InterruptedException, KeeperException {
//获取连接
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// connectLatch如果连接上zk可以释放
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
// waitLatch需要释放
if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
waitDownLatch.countDown();
}
}
});
//等待zk连接成功
countDownLatch.await();
//判断根节点locks是否存在
Stat exists = zkClient.exists("/locks", false);
if(exists==null){
//创建根节点
zkClient.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}
//对zk加锁
public void zkLock(){
//创建临时带序号结点
try {
currentMode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断创建的结点是否是最小的结点 是获取锁,如果不是监听前一个结点
List<String> children = zkClient.getChildren("/locks", false);
//如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小
if (children.size() == 1){
return;
}else{
Collections.sort(children);
//获取节点名称seq-00000000
String thisNode = currentMode.substring( "/locks/".length());
//通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode) ;
//判断
if (index == -1 ){
System.out.println("数据异常");}
else if(index == 0){
//就一个节点,可以获取锁了
return;
}else {
waitPath="/locks/"+children.get(index-1);
zkClient.getData(waitPath,true,null);
//等待监听
waitDownLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//对zk解锁
public void UnzkLock(){
//删除结点
try {
zkClient.delete(currentMode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
DistributeLockTest
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributeLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
final DistributeLock distributeLock1 = new DistributeLock();
final DistributeLock distributeLock2 = new DistributeLock();
new Thread(()->{
try {
distributeLock1.zkLock();
System.out.println("aaa线程获取锁");
Thread.sleep(5000);
distributeLock1.UnzkLock();
System.out.println("aaa线程释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"aaa").start();
new Thread(()->{
try {
distributeLock2.zkLock();
System.out.println("bbb线程获取锁");
Thread.sleep(5000);
distributeLock2.UnzkLock();
System.out.println("bbb线程释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"bbb").start();
}
}
九、其他
9.1、生产集群安装多少k合适
安装奇数台
生产经验:
- 10台服务器:3台zk;
- 20台服务器:5台zk;
- 100台服务器:11台zk
- 200台服务器:11台zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时