数据的发布和订阅:
(1)数据的发布与订阅是一个一对多的关系。多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使它们能够自动的更新自己的状态。发布和订阅可以让发布方和订阅放独立封装。
(2)当一个对象改变的时候,需要同时改变其它的对象,而且不知道多少个对象需要改变时,那么就可以使用发布和订阅模式。数据的发布和订阅在分布式中的应用主要有配置管理,和服务发现。
(3)配置管理是指如果集群中的机器拥有某些相同的配置时,并且这些配置信息需要动态的改变,我们就可以使用数据的发布和订阅模式把配置做统一的管理。让这些机器各自订阅配置信息的改变。当配置发生改变的时候,这些机器就可以得到通知。并且更新为最新的配置。
(4)服务发现是指,对集群中的服务上下线做统一的管理,每台服务器都可以作为数据的发布方,向集群注册自己的基本信息。而让某些监控服务器作为订阅工作服务器的基本信息。
(5)当工作服务器的基本信息发生改变的时候,比如说上下线,服务器角色改变,服务范围的变更,那么监控服务器可以得到通知,并且响应这些变化。
基本模型如下
左侧浅紫色的区域代表的是zk集群,右侧的方块代表的是工作服务器集群。其中,前3个方块代表的是工作服务器。绿色的方块代表的是管理服务器。最下面的方块代表的是控制服务器。Zk中有三类的节点,首先是config节点,它用于我们的配置管理,manageServer可以通过config来下发配置信息。workServer可以通过订阅config来改变更新自己的配置信息。
Servers节点用于服务发现,每个workServer在启动的时候,都会在Servers下创建一个临时节点,manager节点充当的monitor,监控servers节点下的子节点的改变,来更新工作服务器的列表信息。最后我们可以通过control Server,由command节点作为中介向manageServer发送控制指令。controlServer向command节点写入控制信息,manageServer订阅command节点的数据改变,来监听并且执行命令。
代码基本流程图
1.manage server 程序主体工作流程
2.work server 程序主体流程
3.系统核心类基本模型
serverConfig 用来记录workServer的配置信息
serverData 用来记录workServer的基本信息
subscribeZkClient 作为整个类的入口,用来启动workServer和manageServer.
demo 如下:
/**
* 下面demo就是一个典型的发布订阅系统:
* 集群中每台机器在启动阶段,都会到该节点上获取数据库的配置信息,同时客户端还需要在在节
* 点注册一个数据变更的watcher监听,一旦该数据节点发生变更,就会受到通知信息。
*/
public class ConfigTest {
/**
* 配置中心父节点
*/
private static final String PATH = "/server/database_config";
private static final String zkAddress = "127.0.0.1:2181";
private static final int timeout = 1000;
private static CuratorFramework client = null;
private static CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 客户端的连接状态监听
*/
static ConnectionStateListener clientListener = new ConnectionStateListener() {
public void stateChanged(CuratorFramework client,
ConnectionState newState) {
if (newState == ConnectionState.CONNECTED) {
System.out.println("connected established");
countDownLatch.countDown();
} else if (newState == ConnectionState.LOST) {
System.out.println("connection lost,waiting for reconection");
try {
System.out.println("reinit---");
reinit();
System.out.println("inited---");
} catch (Exception e) {
System.err.println("re-inited failed");
}
} else if (newState == ConnectionState.SUSPENDED) {
System.out.println("suspended");
} else {
System.out.println(newState);
}
}
};
public static void main(String[] args) throws Exception {
//1、初始化curator
init();
//2、判断父节点是否存在,不在的话创建该节点(这个节点本身应在服务器手动添加的)
Stat stat = client.checkExists().forPath(PATH);
if (stat == null) {
client.create().creatingParentsIfNeeded().forPath(PATH);
}
//3、对path的变更进行监听
watcherPath(PATH, pathWatcher);
//4、模拟阻塞场景,可以客户端改变数据测试
Thread.sleep(Integer.MAX_VALUE);
}
public static void init() throws Exception {
client = CuratorFrameworkFactory.builder().connectString(zkAddress)
.sessionTimeoutMs(timeout)
.retryPolicy(new RetryNTimes(5, 5000)).build();
// 客户端注册连接状态监听器,进行连接配置(客户端连接的状态会被相应 的监听器监听)
client.getConnectionStateListenable().addListener(clientListener);
client.start();
// 连接成功后,才进行下一步的操作(连接成功会触发监听器中的countDownLatch.await())
countDownLatch.await();
}
public static void reinit() {
try {
unregister();
init();
} catch (Exception e) {
// TODO: handle exception
}
}
public static void unregister() {
try {
if (client != null) {
client.close();
client = null;
}
} catch (Exception e) {
System.out.println("unregister failed");
}
}
/**
* 对path进行监听配置
*
* @param path
* @param watcher
* @return
* @throws Exception
*/
public static String watcherPath(String path, CuratorWatcher watcher)
throws Exception {
//只是改变数据时候会反应(或者是使用getChildren、exist。或者是在创建客户端构造函数进行watcher监听)
byte[] buffer = client.getData().usingWatcher(watcher).forPath(path);
System.out.println("获取节点的信息:" + new String(buffer));
return new String(buffer);
}
/**
* 读取path数据
*
* @param path
* @return
* @throws Exception
*/
public static String readPath(String path) throws Exception {
byte[] buffer = client.getData().forPath(path);
return new String(buffer);
}
/**
*
* 对path进行改变监听的watcher
*/
private static CuratorWatcher pathWatcher = new CuratorWatcher() {
public void process(WatchedEvent event) throws Exception {
System.out.println(event.getType());
// 当数据变化后,重新获取数据信息
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
//获取更改后的数据,进行相应的业务处理
String value = readPath(event.getPath());
System.out.println(value);
}
}
};
}```