前置知识:
Zookeeper学习笔记(1)—— 基础知识-CSDN博客
Zookeeper学习笔记(2)—— Zookeeper API简单操作-CSDN博客
Zookeeper 服务器动态上下线监听案例
需求分析
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线
具体实现
首先创建节点servers:create /servers "servers"
服务器向zookeeper注册的代码
package com.why.zkCase;
import org.apache.zookeeper.*;
import org.junit.Before;
import java.io.IOException;
//服务端向zookeeper注册
public class DistributeServer {
private static String connetString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //客户端连接ip
private static int sessionTimeout = 2000; //超时时间
private ZooKeeper zkClient = null; //客户端对象
private String parentNode = "/servers"; //父节点路径
@Before
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connetString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数
System.out.println("事件类型:" + watchedEvent.getType());
System.out.println("事件路径:" + watchedEvent.getPath());
}
});
}
//注册服务器
public void registServer(String hostname) throws InterruptedException, KeeperException {
String create = zkClient.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online" + create);
}
//业务逻辑
public void business(String hostname) throws Exception {
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 连接注册服务器信息
server.registServer(args[0]);
// 3 启动业务功能
server.business(args[0]);
}
}
客户端代码
package com.why.zkCase;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
//客户端
public class DistributeClient {
private static String connetString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //客户端连接ip
private static int sessionTimeout = 2000; //超时时间
private ZooKeeper zkClient = null; //客户端对象
private String parentNode = "/servers"; //父节点路径
//创建到zk的客户端连接
@Before
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connetString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数
System.out.println("事件类型:" + watchedEvent.getType());
System.out.println("事件路径:" + watchedEvent.getPath());
// 再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//获取服务器列表信息
public void getServerList() throws InterruptedException, KeeperException {
//获取服务器子节点信息,并对父节点进行监听
List<String> children = zkClient.getChildren(parentNode, true);
//存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
//遍历所有节点,获取主机名称信息
for (String child : children)
{
byte[] data = zkClient.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
//打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 3 业务进程启动
client.business();
}
}
测试
命令行操作
启动DistributeClient客户端
在zk的命令行中新建节点:create -e -s /servers/hadoop103 "hadoop103"
在idea的控制台可以看到:
删除hadoop103:delete /servers/hadoop1030000000001
可以看到:
idea操作
启动 DistributeClient 客户端
启动 DistributeServer 服务:
添加参数:
然后启动;
可以看到:
同时client也可以监听到服务器上线通知: