SpringBoot - ZooKeeper
- 1、Curator框架的集成
- 2、构建zookeeper客户端
- 3、Master选举
- 3.1、LeaderSelector
- 3.2、LeaderLatch
- 4、成员组注册
- 5、节点监听
1、Curator框架的集成
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
2、构建zookeeper客户端
@Log4j2
@Configuration
public class ZookeeperConfig implements DisposableBean {
private CuratorFramework client = null;
/**
* 构建zookeeper连接客户端
*
* @param zookeeperProperties
* @return
*/
@Bean
public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.xx.xxx:2181,192.168.xx.xxx:2181,192.168.xx.xxx:2181")
// 根节点
.namespace("zookeeper")
// 连接重试策略
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
// 连接服务
client.start();
return client;
}
@Override
public void destroy() {
client.close();
}
}
3、Master选举
两种方式实现:LeaderSelector
与LeaderLatch
。
3.1、LeaderSelector
基于InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader。
- 当实例被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。
- autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。默认为false。
@Component
public class Master implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private CuratorFramework zkClient;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
LeaderSelector selector = new LeaderSelector(zkClient, "/masterLeader", new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
//成为leader了
System.out.println("do leader work");
Thread.sleep(5000);
System.out.println("end work");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("stateChanged:" + newState);
}
});
// 设置可以重新竞选Master
selector.autoRequeue();
// 开始Master选举
selector.start();
}
}
3.2、LeaderLatch
- 需要手动调用close()方法来释放leader权限释放leader才会进行下一轮选举。
- 添加回调实现成功选举和失败选举。
@Component
public class Master implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private CuratorFramework zkClient;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
startMaster();
}
private void startMaster() {
LeaderLatch latch = new LeaderLatch(zkClient, "/masterLeader");
// 定义监听器
LeaderLatchListener listener = new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println("leader...");
}
@Override
public void notLeader() {
System.out.println("notLeader...");
}
};
// 添加监听器
latch.addListener(listener);
try {
// 开始选举
latch.start();
} catch (Exception e) {
System.out.println("监听失败...");
}
}
}
4、成员组注册
@Component
public class MemberGroup implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private CuratorFramework zkClient;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
GroupMember memberGroup = new GroupMember(zkClient, "/member", UUID.randomUUID().toString());
memberGroup.start();
}
}
注册集群节点,效果图如下:
5、节点监听
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, "/member/rules", true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
// 添加
case CHILD_ADDED:
break;
// 移除
case CHILD_REMOVED:
break;
// 其他
default:
break;
}
}
});
// 开始进行节点监听
pathChildrenCache.start();