Java API
znode是zooKeeper集合的核心组件,zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。
客户端应该遵循以下步骤,与zookeeper服务器进行清晰和干净的交互。
- 连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID。
- 定期向服务器发送心跳。否则,zookeeper服务器将过期会话ID,客户端需要重新连接。
- 只要会话ID处于活动状态,就可以获取/设置znode。
- 所有任务完成后,断开与zookeeper服务器的连接。如果客户端长时间不活动,则zookeeper服务器将自动断开客户端。
pom.xml
<properties>
<java.version>1.8</java.version>
<!-- zookeeper -->
<zookeeper.version>3.4.14</zookeeper.version>
</properties>
<dependencies>
<!-- Spring Boot Begin -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot End -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!--添加zookeeper3.4.9版本-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<!--排除zookeeper自带的slf4j,不然有引用slf4j会jar冲突-->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<version>${zookeeper.version}</version>
</dependency>
</dependencies>
连接
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
- connectionString - zookeeper主机
- sessionTimeout - 会话超时(以毫秒为单位)
- watcher - 实现“监视器”对象。zookeeper集合通过监视器对象返回连接状态。
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* Title:Zookeeper连接
* Description:
* @author WZQ
* @version 1.0.0
* @date 2021/2/3
*/
public class ZookeeperConnection {
public static void main(String[] args) {
try {
// 计数器对象,countDown一次放行
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 会话编号
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
新增节点
// 同步方式
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
// 异步方式
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
- path - znode路径。例如,/node1 /node1/node11
- data - 要存储在指定znode路径中的数据
- acl - 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口
- ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE,返回打开znode的acl列表。
- createMode - 节点的类型,这是一个枚举。
- callBack - 异步回调接口
- **ctx **- 传递上下文参数
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZKCreate {
String IP="192.168.60.130:2181";
ZooKeeper zooKeeper;
@Before
public void before()throws Exception{
// 计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
}
@After
public void after()throws Exception{
zooKeeper.close();
}
@Test
public void create1()throws Exception{
// arg1:节点的路径
// arg2:节点的数据
// arg3:权限列表 world:anyone:cdrwa
// arg4:节点类型 持久化节点
zooKeeper.create("/create/node1","node1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void create2() throws Exception {
// Ids.READ_ACL_UNSAFE world:anyone:r
zooKeeper.create("/create/node2", "node2".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void create3() throws Exception {
// world授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("world", "anyone");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.READ, id));
acls.add(new ACL(ZooDefs.Perms.WRITE, id));
zooKeeper.create("/create/node3", "node3".getBytes(), acls, CreateMode.PERSISTENT);
}
@Test
public void create4() throws Exception {
// ip授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("ip", "192.168.60.130");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create/node4", "node4".getBytes(), acls, CreateMode.PERSISTENT);
}
@Test
public void create5() throws Exception {
// auth授权模式
// 添加授权用户
zooKeeper.addAuthInfo("digest", "itcast:123456".getBytes());
zooKeeper.create("/create/node5", "node5".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}
@Test
public void create6() throws Exception {
// auth授权模式
// 添加授权用户
zooKeeper.addAuthInfo("digest", "itcast:123456".getBytes());
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("auth", "itcast");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.READ, id));
zooKeeper.create("/create/node6", "node6".getBytes(), acls, CreateMode.PERSISTENT);
}
@Test
public void create7() throws Exception {
// digest授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("digest", "itheima:qlzQzCLKhBROghkooLvb+Mlwv4A=");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create/node7", "node7".getBytes(), acls, CreateMode.PERSISTENT);
}
@Test
public void create8() throws Exception {
// 持久化顺序节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zooKeeper.create("/create/node8", "node8".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(result);
}
@Test
public void create9() throws Exception {
// 临时节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zooKeeper.create("/create/node9", "node9".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(result);
}
@Test
public void create10() throws Exception {
// 临时顺序节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zooKeeper.create("/create/node10", "node10".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(result);
}
@Test
public void create11() throws Exception {
// 异步方式创建节点
zooKeeper.create("/create/node11", "node11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// 0 代表创建成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 节点的路径
System.out.println(name);
// 上下文参数
System.out.println(ctx);
}
}, "I am context");
Thread.sleep(10000);
System.out.println("结束");
}
}
更新节点
// 同步方式
setData(String path, byte[] data, int version)
// 异步方式
setData(String path, byte[] data, int version,AsyncCallback.StatCallback callBack, Object ctx)
- path- znode路径
- data - 要存储在指定znode路径中的数据。
- version- znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。
- callBack-异步回调接口
- ctx-传递上下文参数
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class ZKSet {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;
@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}
@After
public void after() throws Exception {
zooKeeper.close();
}
@Test
public void set1() throws Exception {
// arg1:节点的路径
// arg2:修改的数据
// arg3:数据版本号 -1代表版本号不参与更新,写其他数字版本号则必须是zooKeeper的dataVersion,不会无法更新成功
Stat stat = zooKeeper.setData("/set/node1", "node13".getBytes(), -1);
// 当前节点的版本号
System.out.println(stat.getVersion());
}
@Test
public void set2() throws Exception {
zooKeeper.setData("/set/node1", "node14".getBytes(), -1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0代表修改成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
// 属性描述对象
System.out.println(stat.getVersion());
}
}, "I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}
删除节点
// 同步方式
delete(String path, int version)
// 异步方式
delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx)
- path - znode路径。
- version - znode的当前版本
- **callBack **- 异步回调接口
- ctx - 传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class ZKDelete {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;
@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}
@After
public void after() throws Exception {
zooKeeper.close();
}
@Test
public void delete1() throws Exception {
// arg1:删除节点的节点路径
// arg2:数据版本信息 -1代表删除节点时不考虑版本信息
zooKeeper.delete("/delete/node1",-1);
}
@Test
public void delete2() throws Exception {
// 异步使用方式
zooKeeper.delete("/delete/node2", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
// 0代表删除成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}
查看节点
// 同步方式
getData(String path, boolean b, Stat stat)
// 异步方式
getData(String path, boolean b,AsyncCallback.DataCallback callBack, Object ctx)
- path - znode路径。
- b- 是否使用连接对象中注册的监视器。
- stat - 返回znode的元数据。
- callBack-异步回调接口
- ctx-传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class ZKGet {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;
@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}
@After
public void after() throws Exception {
zooKeeper.close();
}
@Test
public void get1() throws Exception {
// arg1:节点的路径
// arg3:读取节点属性的对象
Stat stat=new Stat();
byte [] bys=zooKeeper.getData("/get/node1",false,stat);
// 打印数据
System.out.println(new String(bys));
// 版本信息
System.out.println(stat.getVersion());
}
@Test
public void get2() throws Exception {
//异步方式
zooKeeper.getData("/get/node1", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
// 0代表读取成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
// 数据
System.out.println(new String(data));
// 属性对象
System.out.println(stat.getVersion());
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}
查看子节点
// 同步方式
getChildren(String path, boolean b)
// 异步方式
getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)
- path - Znode路径。
- b- 是否使用连接对象中注册的监视器。
- callBack - 异步回调接口。
- ctx-传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZKGetChid {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;
@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}
@After
public void after() throws Exception {
zooKeeper.close();
}
@Test
public void get1() throws Exception {
// arg1:节点的路径
List<String> list = zooKeeper.getChildren("/get", false);
for (String str : list) {
System.out.println(str);
}
}
@Test
public void get2() throws Exception {
// 异步用法
zooKeeper.getChildren("/get", false, new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
// 0代表读取成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
// 子节点信息
for (String str : children) {
System.out.println(str);
}
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}
检查节点是否存在
// 同步方法
exists(String path, boolean b)
// 异步方法
exists(String path, boolean b,AsyncCallback.StatCallback callBack,Object ctx)
- path- znode路径。
- b- 是否使用连接对象中注册的监视器。
- callBack - 异步回调接口。
- ctx-传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class ZKExists {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;
@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}
@After
public void after() throws Exception {
zooKeeper.close();
}
@Test
public void exists1() throws Exception {
// arg1:节点的路径
Stat stat=zooKeeper.exists("/exists1",false);
// 节点的版本信息
System.out.println(stat.getVersion());
}
@Test
public void exists2() throws Exception {
// 异步方式
zooKeeper.exists("/exists1", false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0 代表方式执行成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数
System.out.println(ctx);
// 节点的版本信息
System.out.println(stat.getVersion());
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}
事件监听机制
事件监听机制watcher
zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者 .
zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。
watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式
Watcher实现由三个部分组成:
- Zookeeper服务端
- Zookeeper客户端
- 客户端的ZKWatchManager对象
客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端, 接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程
watcher特性:
特性 | 说明 |
---|---|
一次性 | watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册 |
客户端顺序回 | watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行 |
轻量级 | WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容 |
时效性 | watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知 |
watcher接口设计
Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类:KeeperState、EventType
-
Watcher通知状态**(KeeperState)**
KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性如下:
枚举属性 说明 SyncConnected 客户端与服务器正常连接时 Disconnected 客户端与服务器断开连接时 Expired 会话session失效时 AuthFailed 身份认证失败时 -
Watcher事件类型**(EventType)**
EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举属性如下:
枚举属性 说明 None 无 NodeCreated Watcher监听的数据节点被创建时 NodeDeleted Watcher监听的数据节点被删除时 NodeDataChanged Watcher监听的数据节点内容发生变更时(无论内容数据是否变化) NodeChildrenChanged Watcher监听的数据节点的子节点列表发生变更时
注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;
捕获相应的事件:
zookeeper客户端连接的状态和zookeeper对znode节点监听的事件类型,下面我们来讲解如何建立zookeeper的watcher监听。在zookeeper中采用zk.getChildren(path, watch)、zk.exists(path, watch)、zk.getData(path, watcher, stat)
这样的方式为某个znode注册监听。
表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:
注册方式 | Created | ChildrenChanged | Changed | Deleted |
---|---|---|---|---|
zk.exists(“/node-x”,watcher) | 可监控 | 可监控 | 可监控 | |
zk.getData(“/node-x”,watcher) | 可监控 | 可监控 | ||
zk.getChildren(“/node-x”,watcher) | 可监控 | 可监控 |
案例代码
连接状态
客服端与服务器的连接状态
KeeperState通知状态
SyncConnected:客户端与服务器正常连接时
Disconnected:客户端与服务器断开连接时
Expired:会话session失效时
AuthFailed:身份认证失败时
事件类型为:None
代码
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* Title:自定义Watcher监听类
* Description:
* @author WZQ
* @version 1.0.0
* @date 2021/2/4
*/
public class ZKConnectionWatcher implements Watcher {
// 计数器对象
static CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
try {
// 事件类型
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
//连接成功放行
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("断开连接!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时!");
//超时重连
zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("认证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
// 阻塞线程等待连接的创建
countDownLatch.await();
// 会话id
System.out.println(zooKeeper.getSessionId());
// 添加授权用户
zooKeeper.addAuthInfo("digest1","itcast1:1234561".getBytes());
byte[] bs=zooKeeper.getData("/node1",false,null);
System.out.println(new String(bs));
Thread.sleep(50000);
zooKeeper.close();
System.out.println("结束");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
exists方法
检查节点是否存在
// 使用连接对象ZooKeeper的监视器
exists(String path, boolean b)
// 自定义监视器
exists(String path, Watcher w)
// NodeCreated:节点创建
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化
- path- znode路径。
- b- 是否使用连接对象中注册的监视器。
- w-监视器对象。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* Title:exists
* Description:数据发生变化,打印watcher内容
* @author WZQ
* @version 1.0.0
* @date 2021/2/4
*/
public class ZKWatcherExists {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
// 数据发生变化,打印watcher内容
@Test
public void watcherExists1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用zooKeeper连接对象中的watcher
zooKeeper.exists("/watcher1", true);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherExists2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherExists3() throws KeeperException, InterruptedException {
// watcher一次性,一次注册,一次通知,打印一次就没了
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
// 再次监听,一直监听
zooKeeper.exists("/watcher1", this);
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.exists("/watcher1", watcher);
Thread.sleep(80000);
System.out.println("结束");
}
@Test
public void watcherExists4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
// 数据变化,打印2次watcher
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(80000);
System.out.println("结束");
}
}
getData方法
查看节点
// 使用连接对象的监视器
getData(String path, boolean b, Stat stat)
// 自定义监视器
getData(String path, Watcher w, Stat stat)
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化
- path- znode路径。
- b- 是否使用连接对象中注册的监视器。
- w-监视器对象。
- stat- 返回znode的元数据。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZKWatcherGetData {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watcherGetData1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.getData("/watcher2", true, null);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetData2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.getData("/watcher2", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
}, null);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetData3() throws KeeperException, InterruptedException {
// 一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if(event.getType()==Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.getData("/watcher2", watcher, null);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetData4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
zooKeeper.getData("/watcher2", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if(event.getType()==Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
},null);
zooKeeper.getData("/watcher2", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if(event.getType()==Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
},null);
Thread.sleep(50000);
System.out.println("结束");
}
}
getChildren方法
查看子节点
// 使用连接对象的监视器
getChildren(String path, boolean b)
// 自定义监视器
getChildren(String path, Watcher w)
// NodeChildrenChanged:子节点发生变化
// NodeDeleted:节点删除
- path- znode路径。
- b- 是否使用连接对象中注册的监视器。
- w-监视器对象。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZKWatcherGetChild {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch connectedSemaphore = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSemaphore.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
connectedSemaphore.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watcherGetChild1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.getChildren("/watcher3", true);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetChild2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher
zooKeeper.getChildren("/watcher3", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetChild3() throws KeeperException, InterruptedException {
// 一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zooKeeper.getChildren("/watcher3", this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.getChildren("/watcher3", watcher);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetChild4() throws KeeperException, InterruptedException {
// 多个监视器对象
zooKeeper.getChildren("/watcher3", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zooKeeper.getChildren("/watcher3", this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
zooKeeper.getChildren("/watcher3", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zooKeeper.getChildren("/watcher3", this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
Thread.sleep(50000);
System.out.println("结束");
}
}
场景案例
配置中心
工作中有这样的一个场景: 数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存。
若数据库的用户名和密码改变的时候,还需要重新加载缓存,比较麻烦,通过ZooKeeper可以轻松完成,当数据库发生变化时自动完成缓存同步。
设计思路:
- 连接zookeeper服务器
- 读取zookeeper中的配置信息,注册watcher监听器,存入本地变量
- 当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件
- 重新获取配置信息
import java.util.concurrent.CountDownLatch;
import com.wzq.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
/**
* Title:zookeeper实现配置中心
* Description:
* @author WZQ
* @version 1.0.0
* @date 2021/2/4
*/
public class MyConfigCenter implements Watcher {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;
// 用于本地化存储配置信息
private String url;
private String username;
private String password;
@Override
public void process(WatchedEvent event) {
try {
// 捕获事件状态
if (event.getType() == EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("连接断开!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("连接超时!");
// 超时后服务器端已经将连接释放,需要重新连接服务器端
zooKeeper = new ZooKeeper("192.168.60.130:2181", 6000,
new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("验证失败!");
}
// 当配置信息发生变化时,再次读取
} else if (event.getType() == EventType.NodeDataChanged) {
initValue();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 构造方法
public MyConfigCenter() {
initValue();
}
// 连接zookeeper服务器,读取配置信息
public void initValue() {
try {
// 创建连接对象
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建成功
countDownLatch.await();
// 读取配置信息
this.url = new String(zooKeeper.getData("/config/url", true, null));
this.username = new String(zooKeeper.getData("/config/username", true, null));
this.password = new String(zooKeeper.getData("/config/password", true, null));
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
try {
MyConfigCenter myConfigCenter = new MyConfigCenter();
for (int i = 1; i <= 20; i++) {
Thread.sleep(5000);
System.out.println("url:"+myConfigCenter.getUrl());
System.out.println("username:"+myConfigCenter.getUsername());
System.out.println("password:"+myConfigCenter.getPassword());
System.out.println("########################################");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
分布式唯一id
在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID。
设计思路:
- 连接zookeeper服务器
- 指定路径生成临时有序节点
- 取序列号及为分布式环境下的唯一ID
import java.util.concurrent.CountDownLatch;
import com.wzq.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class GloballyUniqueId implements Watcher {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// 用户生成序号的节点
String defaultPath = "/uniqueId";
// 连接对象
ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
try {
// 捕获事件状态
if (event.getType() == Event.EventType.None) {
if (event.getState() == KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == KeeperState.Disconnected) {
System.out.println("连接断开!");
} else if (event.getState() == KeeperState.Expired) {
System.out.println("连接超时!");
// 超时后服务器端已经将连接释放,需要重新连接服务器端
zooKeeper = new ZooKeeper(IP, 6000,
new ZKConnectionWatcher());
} else if (event.getState() == KeeperState.AuthFailed) {
System.out.println("验证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 构造方法
public GloballyUniqueId() {
try {
//打开连接
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建成功
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
// 生成id的方法
public String getUniqueId() {
String path = "";
try {
//创建临时有序节点
path = zooKeeper.create(defaultPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception ex) {
ex.printStackTrace();
}
// /uniqueId0000000001
return path.substring(9);
}
public static void main(String[] args) {
GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
for (int i = 1; i <= 5; i++) {
String id = globallyUniqueId.getUniqueId();
System.out.println(id);
}
}
}
分布式锁
分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具ZooKeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如何实现排他锁。
设计思路:
- 每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock000000001
- 客户端取得/Locks下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
- 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock000000002,那么则监听Lock000000001
- 当前一位锁节点(Lock000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock000000002)的逻辑
- 监听客户端重新执行第2步逻辑,判断自己是否获得了锁
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* Title:Zookeeper实现分布式锁
* Description:
* @author WZQ
* @version 1.0.0
* @date 2021/2/4
*/
public class MyLock {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//ZooKeeper配置信息
ZooKeeper zooKeeper;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;
// 打开zookeeper连接
public MyLock() {
try {
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
//获取锁
public void acquireLock() throws Exception {
//创建锁节点
createLock();
//尝试获取锁
attemptLock();
}
//创建锁节点
private void createLock() throws Exception {
//判断Locks是否存在,不存在创建
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建临时有序节点
lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("节点创建成功:" + lockPath);
}
//监视器对象,监视上一个节点是否被删除
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
}
};
//尝试获取锁
private void attemptLock() throws Exception {
// 获取Locks节点下的所有子节点
List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
// 对子节点进行排序
Collections.sort(list);
// /Locks/Lock_000000001
int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
if (index == 0) {
System.out.println("获取锁成功!");
return;
} else {
// 上一个节点的路径
String path = list.get(index - 1);
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
if (stat == null) {
attemptLock();
} else {
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}
//释放锁
public void releaseLock() throws Exception {
//删除临时有序节点
zooKeeper.delete(this.lockPath, -1);
zooKeeper.close();
System.out.println("锁已经释放:" + this.lockPath);
}
public static void main(String[] args) {
try {
MyLock myLock = new MyLock();
myLock.createLock();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
/**
* Title:售票案例,测试分布式锁
* Description:
* @author WZQ
* @version 1.0.0
* @date 2021/2/4
*/
public class TicketSeller {
private void sell(){
System.out.println("售票开始");
// 线程随机休眠数毫秒,模拟现实中的费时操作
int sleepMillis = 5000;
try {
//代表复杂逻辑执行了一段时间
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票结束");
}
public void sellTicketWithLock() throws Exception {
MyLock lock = new MyLock();
// 获取锁
lock.acquireLock();
sell();
//释放锁
lock.releaseLock();
}
public static void main(String[] args) throws Exception {
//多线程
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
TicketSeller ticketSeller = new TicketSeller();
ticketSeller.sellTicketWithLock();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
集群搭建
单机环境下,jdk、zookeeper 安装完毕,基于一台虚拟机,进行zookeeper伪集群搭建,zookeeper集群中包含3个节点,节点对外提供服务端口号分别为2181、2182、2183
-
基于zookeeper-3.4.10复制三份zookeeper安装好的服务器文件,目录名称分别为zookeeper2181、zookeeper2182、zookeeper2183
cp ‐r zookeeper‐3.4.10 zookeeper2181 cp ‐r zookeeper‐3.4.10 zookeeper2182 cp ‐r zookeeper‐3.4.10 zookeeper2183
-
修改zookeeper2181服务器对应配置文件
#服务器对应端口号 clientPort=2181 #数据快照文件所在路径 dataDir=/home/zookeeper/zookeeper2181/data #集群配置信息 #server.A=B:C:D #A:是一个数字,表示这个是服务器的编号 #B:是这个服务器的ip地址 #C:Zookeeper服务器之间的通信端口 #D:Leader选举的端口,投票 server.1=192.168.60.130:2287:3387 server.2=192.168.60.130:2288:3388 server.3=192.168.60.130:2289:3389
-
在上一步dataDir 指定的目录下,创建 myid 文件,然后在该文件添加上一步server 配置的对应 A 数字。
#zookeeper2181对应的数字为1 #/home/zookeeper/zookeeper2181/data目录下执行命令 echo "1" > myid
-
zookeeper2182、zookeeper2183参照步骤2/3进行相应配置
-
分别启动三台服务器,检验集群状态
登录命令:
./zkCli.sh ‐server 192.168.60.130:2181 ./zkCli.sh ‐server 192.168.60.130:2182 ./zkCli.sh ‐server 192.168.60.130:2183
API连接集群
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
- connectionString - zooKeeper集合主机。
- sessionTimeout - 会话超时(以毫秒为单位)。
- watcher - 实现“监视器”界面的对象。ZooKeeper集合通过监视器对象返回连接状态。
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class ZookeeperConnection {
public static void main(String[] args) {
try {
// 计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
ZooKeeper zooKeeper=new ZooKeeper("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 会话编号
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
zab协议
一致性协议**:zab**协议,跟redis数据区分,redis主节点写数据后返回确认给客户端,AP,之后再同步,Zookeeper则是半数以上的从节点写成功后才返回确认给客户端,CP。
zab协议 的全称是 Zookeeper Atomic Broadcast (zookeeper原子广播)。zookeeper 是通过 zab协议来保证分布式事务的最终一致性
基于zab协议,zookeeper集群中的角色主要有以下三类,如下表所示:
zab广播模式工作原理,通过类似两阶段提交协议的方式解决数据一致性:
- leader从客户端收到一个写请求
- leader生成一个新的事务并为这个事务生成一个唯一的ZXID
- leader将这个事务提议(propose)发送给所有的follows节点
- follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack给leader
- 当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请求
- 当follower收到commit请求时,从历史队列中将事务请求commit
读数据则是每个节点都有一个数据副本,都可以读,都最新。
#bin目录下查看节点的身份
./zkServer.sh status
Mode leader
Mode follower
leader选举
服务器状态
- looking:寻找leader状态。当服务器处于该状态时,它会认为当前集群中没有leader,因此需要进入leader选举状态。
- leading: 领导者状态。表明当前服务器角色是leader。
- following: 跟随者状态。表明当前服务器角色是follower。
- observing:观察者状态。表明当前服务器角色是observer。
服务器启动时期的leader选举
在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都试图找到leader,于是进入leader选举过程。选举过程如下:
-
每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
-
集群中的每台服务器接收来自集群中各个服务器的投票。
-
处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行pk,pk规则如下:
3-1,优先检查zxid。zxid比较大的服务器优先作为leader。
3-2,如果zxid相同,那么就比较myid。myid较大的服务器作为leader服务器。
对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
-
统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于server1、server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了leader
-
改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是follower,那么就变更为following,如果是leader,就变更为leading。
服务器运行时期的Leader选举
在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader服务器宕机或新加入,此时也不会影响leader,但是一旦leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本一致。
假设正在运行的有server1、server2、server3三台服务器,当前leader是server2,若某一时刻leader挂了,此时便开始Leader选举。选举过程如下:
- 变更状态。leader挂后,余下的服务器都会将自己的服务器状态变更为looking,然后开始进入leader选举过程。
- 每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器。
- 接收来自各个服务器的投票。与启动时过程相同。
- 处理投票。与启动时过程相同,此时,server3将会成为leader。
- 统计投票。与启动时过程相同。
- 改变服务器的状态。与启动时过程相同。
observer角色及其配置
observer角色特点:
- 不参与集群的leader选举
- 不参与集群中写数据时的ack反馈
为了使用observer角色,在任何想变成observer角色的配置文件中加入如下配置:
peerType=observer
并在所有server的配置文件中,配置成observer模式的server的那行配置追加:observer,例如:
server.3=192.168.60.130:2289:3389:observer