1. Zookeeper原生客户端库存在的缺点
复杂性高 :原生客户端库提供了底层的 API,需要开发者手动处理很多细节,如连接管理、会话管理、异常处理等。这增加了开发的复杂性,容易出错。连接管理繁琐 :使用原生客户端库时,开发者需要手动管理与 ZooKeeper 的连接。这包括连接的建立、重连、会话超时处理等。异常处理复杂 :原生客户端库的 API 抛出多种异常,如 KeeperException
、InterruptedException
等。开发者需要手动处理这些异常,增加了代码的复杂性。缺少高级功能 :原生客户端库缺少一些高级功能,如连接池管理、自动重试、负载均衡等。这些功能在实际应用中非常有用,但需要开发者自己实现或使用第三方库。缺少封装和抽象 :原生客户端库提供了底层的 API,缺少更高层次的封装和抽象。开发者需要自己编写大量的代码来实现常见的功能,如分布式锁、配置管理等。性能调优困难 :原生客户端库的性能调优需要开发者手动进行,如调整连接超时时间、会话超时时间等。这需要对 ZooKeeper 的工作原理有深入的理解。缺少社区支持 :相比于一些更高级的客户端库(如 Curator),原生客户端库的社区支持相对较少。开发者在使用过程中遇到问题时,可能难以找到解决方案。
2. Apache Curator介绍
2.1 基本概述
定义 :Apache Curator是专为Apache ZooKeeper设计的Java/JVM客户端库,通过提供高级API框架及一系列实用工具,大幅降低使用ZooKeeper的复杂度并提升应用的可靠性。开发背景 :Curator最初由Netflix公司开源,目前是Apache的顶级项目。
2.2 核心功能
高可用性连接管理 :自动处理与ZooKeeper服务器的连接断开和重新连接,确保连接的稳定性和可靠性。易于使用的API :封装复杂的ZooKeeper原语,提供更直观、简洁的使用方式,降低开发难度。模式(Recipes) :预置了一系列常见的分布式计算模式,如leader选举、分布式锁、缓存机制等,开发者可以快速实现这些分布式系统经典难题。服务发现与负载均衡 :支持动态的服务注册与发现,便于构建云原生应用,提高系统的可扩展性和灵活性。异步DSL :针对Java 8及以上版本提供了异步编程的支持,提高了响应速度和程序效率。
3. 使用指南
3.1 添加 Maven 依赖
< dependency>
< groupId> org.apache.curator</ groupId>
< artifactId> curator-framework</ artifactId>
< version> 2.12.0</ version>
</ dependency>
< dependency>
< groupId> org.apache.curator</ groupId>
< artifactId> curator-recipes</ artifactId>
< version> 2.12.0</ version>
</ dependency>
3.2 创建 Curator 客户端
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
import org. apache. zookeeper. CreateMode ;
public class CuratorExample {
public static void main ( String [ ] args) throws Exception {
String connectString = "192.168.200.138:2181" ;
String path = "/curator1" ;
byte [ ] data = "myData" . getBytes ( ) ;
ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ;
CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ;
client. start ( ) ;
client. create ( ) . withMode ( CreateMode . PERSISTENT ) . forPath ( path, data) ;
byte [ ] retrievedData = client. getData ( ) . forPath ( path) ;
System . out. println ( "Retrieved data: " + new String ( retrievedData) ) ;
client. close ( ) ;
}
}
3.3 增删改查操作及Watcher监听
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. framework. api. CuratorEvent ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
import org. apache. zookeeper. CreateMode ;
import org. apache. zookeeper. WatchedEvent ;
import org. apache. zookeeper. Watcher ;
public class CuratorExample {
public static void main ( String [ ] args) throws Exception {
String connectString = "192.168.200.138:2181" ;
String path = "/curator1" ;
byte [ ] data1 = "myData1" . getBytes ( ) ;
byte [ ] data2 = "myData2" . getBytes ( ) ;
ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ;
CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ;
client. start ( ) ;
client. getCuratorListenable ( ) . addListener ( ( CuratorFramework c, CuratorEvent event) -> {
switch ( event. getType ( ) ) {
case WATCHED :
WatchedEvent watchedEvent = event. getWatchedEvent ( ) ;
if ( watchedEvent. getType ( ) == Watcher. Event. EventType. NodeDataChanged ) {
System . out. println ( "监听的数据变化为: " + new String ( c. getData ( ) . forPath ( path) ) ) ;
System . out. println ( "触发事件" ) ;
}
}
} ) ;
client. create ( ) . withMode ( CreateMode . PERSISTENT ) . forPath ( path, data1) ;
byte [ ] retrievedData = client. getData ( ) . watched ( ) . forPath ( path) ;
System . out. println ( "原始数据: " + new String ( retrievedData) ) ;
client. setData ( ) . forPath ( path, data2) ;
Thread . sleep ( 2000 ) ;
client. delete ( ) . forPath ( path) ;
Thread . sleep ( 2000 ) ;
}
}
3.4 进行永久监听
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. framework. recipes. cache. ChildData ;
import org. apache. curator. framework. recipes. cache. NodeCache ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
import org. apache. zookeeper. CreateMode ;
public class PermanentWatcherExample {
public static void main ( String [ ] args) throws Exception {
String connectString = "192.168.200.138:2181" ;
String path = "/curator1" ;
byte [ ] data1 = "myData1" . getBytes ( ) ;
byte [ ] data2 = "myData2" . getBytes ( ) ;
byte [ ] data3 = "myData3" . getBytes ( ) ;
ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ;
CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ;
client. start ( ) ;
client. create ( ) . withMode ( CreateMode . PERSISTENT ) . forPath ( path, data1) ;
NodeCache nodeCache = new NodeCache ( client, path) ;
nodeCache. start ( ) ;
nodeCache. getListenable ( ) . addListener ( ( ) -> {
ChildData currentData = nodeCache. getCurrentData ( ) ;
if ( currentData != null ) {
System . out. println ( "触发了永久监听的回调,当前值为:" + new String ( currentData. getData ( ) ) ) ;
}
} ) ;
client. setData ( ) . forPath ( path, data1) ;
Thread . sleep ( 2000 ) ;
client. setData ( ) . forPath ( path, data2) ;
Thread . sleep ( 2000 ) ;
client. setData ( ) . forPath ( path, data3) ;
Thread . sleep ( 2000 ) ;
client. delete ( ) . forPath ( path) ;
}
}
3.5 使用分布式锁
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. framework. recipes. locks. InterProcessMutex ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
public class DistributedLockExample {
public static void main ( String [ ] args) throws Exception {
String connectString = "192.168.200.138:2181" ;
String path = "/myLock" ;
ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ;
CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ;
client. start ( ) ;
InterProcessMutex lock = new InterProcessMutex ( client, path) ;
lock. acquire ( ) ;
try {
System . out. println ( "Lock acquired, executing critical section..." ) ;
Thread . sleep ( 2000 ) ;
} finally {
lock. release ( ) ;
System . out. println ( "Lock released." ) ;
}
client. close ( ) ;
}
}