Zookeeper从零入门笔记
- 一、入门
- 1. 概述
- 2. 特点
- 3. 数据结构
- 4. 应用场景
- 二、本地
- 1.安装
- 2. 参数解读
- 三、集群操作
- 3.1.1 集群安装
- 3.2 选举机制
- 1. 第一次启动
- 2. 非第一次启动
- 3.3 ZK集群启动停止脚本
- 3.4 客户端命令行操作
- 3.2.1 命令行语法
- 3.2.2 节点类型(持久/短暂/有序号/无序号)
- 3.2.4 监听器原理
- 3.3 删除节点
- 3.5 客户端API操作
- 3.6 监听节点变化
- 3.7 判断Znode是否存在
- 3.8 写数据原理
- 四、服务器动态上下线监听案例
- 五、ZooKeeper分布式锁案例
- 5.1、分布式锁-成熟框架curator
- 六、企业面试真题
一、入门
1. 概述
2. 特点
3. 数据结构
4. 应用场景
统一命名服务:nginx也可以实现
统一配置管理:
统一集群管理:
服务器动态上下线:
软负载均衡:
二、本地
1.安装
2. 参数解读
三、集群操作
3.1.1 集群安装
遇到问题:1防火墙未关闭,2配置错误
sudo systemctl stop firewalld
加到zok.conf中:
############cluster#################
server.2=192.168.88.130:2888:3888
server.3=192.168.88.131:2888:3888
server.4=192.168.88.132:2888:3888
3.2 选举机制
1. 第一次启动
2. 非第一次启动
3.3 ZK集群启动停止脚本
#!/bin/bash
case $1 in
"start"){
for i in 192.168.88.130 192.168.88.131 192.168.88.132
do
echo -------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in 192.168.88.130 192.168.88.131 192.168.88.132
do
echo -------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in 192.168.88.130 192.168.88.131 192.168.88.132
do
echo -------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
}
;;
esac
3.4 客户端命令行操作
3.2.1 命令行语法
3.2.2 节点类型(持久/短暂/有序号/无序号)
临时节点:退出客户端以后删除
有序节点:可重复,key后自带序号
create /sanguo/shuguo/zhangfei #创建永久无序节点
create -s /sanguo/shuguo/guanyu #创建永久有序节点
create -s -e /sanguo/wuguo/zhouyu1 #创建临时有序节点
create -e /sanguo/wuguo/zhouyu2 #创建临时无序节点
3.2.4 监听器原理
3.3 删除节点
3.5 客户端API操作
3.6 监听节点变化
3.7 判断Znode是否存在
3.8 写数据原理
分为两种情况:
1、请求发送给Leader节点;
- Client发送write请求给Leader,Leader收到消息先写
- Leader将write请求转发给下一个Follower节点,Follow收到写入 完成后回一个ack给上一个节点(Leader)
- 大于半数的Leader节点完成写入(共3,完成2),Leader返回给Clinet一个ack表示完成
- Leader继续给其他Follower发送write请求,Follower接收到请求写入,完成后返回ack
2、请求发送给Follower节点;
- Client发送write请求给Follower,Follower收到消息先转发给Leader
- Leader接收到write请求,先执行,然后转发给Follower
- Follower接收到write请求。执行,然后回复一个ack
- 大于半数节点已write,Leader返回一个ack给最先接触到Client的节点(Follower),Follower再返回一个ack给Client
- Leader给剩下的Follower发送write请求,Follower写完回复ack给Leader
四、服务器动态上下线监听案例
package com.atguigu.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.sql.Array;
import java.util.ArrayList;
import java.util.List;
/**
* 2023年11月24日
* <p>
* case1 服务器动态上下限
* <p>
* 1、获取zk连接
* 2、监听节点
* 3、业务代码(睡觉
*/
public class DistributeClient {
ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeClient zk = new DistributeClient();
zk.connect();
zk.watcherList();
zk.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void watcherList() throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren("/servers", true, null);
ArrayList<String> strings = new ArrayList<String>();
for (String child : children) {
byte[] data = zooKeeper.getData("/servers/" + child, false, null);
strings.add(new String(data));
}
System.out.println(strings);
}
private void connect() throws IOException {
String connectString = "192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181";
int sessionTimeout = 300000;
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
try {
watcherList();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
});
}
}
package com.atguigu.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/**
* case 1
* 服务器动态上下线例子
* <p>
* 1. 创建zk连接
* 2. 注册服务器到zk集群
* 3. 业务代码(睡觉
*/
public class DistributeServer {
ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeServer zk = new DistributeServer();
zk.connect();
zk.register(args[0]);
zk.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String hostName) throws InterruptedException, KeeperException {
zooKeeper.create("/servers/"+hostName, hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName + "已注册上");
}
private void connect() throws IOException {
String connectString = "192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181";
int sessionTimeout = 3000000;
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
五、ZooKeeper分布式锁案例
package com.atguigu.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 分布式锁
*/
public class DistributedLock {
public ZooKeeper zooKeeper;
public CountDownLatch connectLatch = new CountDownLatch(1);
public CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
DistributedLock() throws IOException, InterruptedException, KeeperException {
//连接zk
String connectString = "192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181";
int sessionTimeout = 3000;
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//连接完成后释放connectLatch
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
//当前锁节点为最小锁时,释放waitLatch
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
//连接完成后继续执行
connectLatch.await();
}
public void lock() throws InterruptedException, KeeperException {
// 检测locks节点是否存在,不存在就建一个
Stat exists = zooKeeper.exists("/locks", false);
if (exists == null){
zooKeeper.create("/locks",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//创建锁节点
currentNode = zooKeeper.create("/locks/res-",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判断是否是头一个,是的话给锁
List<String> locks = zooKeeper.getChildren("/locks", false);
Collections.sort(locks);
//若数组内无值,说明就是第一位可以直接给锁
if (locks.size() == 0){
}else {
//算出创建锁的位置,判断是否是最小的,是的话就给锁,否则对上一个锁进行监听
int location = locks.indexOf(currentNode.substring("/locks/".length()));
if (location == 0){
//最小给锁
}else if (location == -1){
//错误
System.out.println("错误");
}else {
waitPath = locks.get(location - 1);
zooKeeper.getData(waitPath,true,null);
//等待监听
waitLatch.await();
}
}
}
public void unlock() throws InterruptedException, KeeperException {
zooKeeper.delete(currentNode,-1);
}
}
package com.atguigu.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class Client {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
new Thread(new Runnable() {
public void run() {
try {
DistributedLock distributedLock = new DistributedLock();
distributedLock.lock();
System.out.println("线程1启动,获取锁");
Thread.sleep(5 * 1000);
distributedLock.unlock();
System.out.println("线程1释放锁");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
DistributedLock distributedLock = new DistributedLock();
distributedLock.lock();
System.out.println("线程2启动,获取锁");
Thread.sleep(5 * 1000);
distributedLock.unlock();
System.out.println("线程2释放锁");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}
5.1、分布式锁-成熟框架curator
六、企业面试真题
EPOCH:leader任期