zk基础—5.Curator的使用与剖析一

news2025/4/9 16:32:31

大纲

1.基于Curator进行基本的zk数据操作

2.基于Curator实现集群元数据管理

3.基于Curator实现HA主备自动切换

4.基于Curator实现Leader选举

5.基于Curator实现分布式Barrier

6.基于Curator实现分布式计数器

7.基于Curator实现zk的节点和子节点监听机制

8.基于Curator创建客户端实例的源码分析

9.Curator在启动时是如何跟zk建立连接的

10.基于Curator进行增删改查节点的源码分析

11.基于Curator的节点监听回调机制的实现源码

12.基于Curator的Leader选举机制的实现源码

1.基于Curator进行基本的zk数据操作

Guava is to Java what Curator is to ZooKeeper,引入依赖如下:

<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.12.0</version>
    </dependency>
</dependencies>

Curator实现对znode进行增删改查的示例如下,其中CuratorFramework代表一个客户端实例。注意:可以通过creatingParentsIfNeeded()方法进行指定节点的级联创建。

public class CrudDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();//启动客户端并建立连接
     
        System.out.println("已经启动Curator客户端");

        client.create()
            .creatingParentsIfNeeded()//进行级联创建
            .withMode(CreateMode.PERSISTENT)//指定节点类型
            .forPath("/my/path", "10".getBytes());//增

        byte[] dataBytes = client.getData().forPath("/my/path");//查
        System.out.println(new String(dataBytes));

        client.setData().forPath("/my/path", "11".getBytes());//改
        dataBytes = client.getData().forPath("/my/path");
        System.out.println(new String(dataBytes));

        List<String> children = client.getChildren().forPath("/my");//查
        System.out.println(children);

        client.delete().forPath("/my/path");//删
        Thread.sleep(Integer.MAX_VALUE);
    }
}

2.基于Curator实现集群元数据管理

Curator可以操作zk。比如自研了一套分布式系统类似于Kafka、Canal,想把集群运行的核心元数据都放到zk里去。此时就可以通过Curator创建一些znode,往里面写入对应的值。

写入的值推荐用json格式,比如Kafka就是往zk写json格式数据。这样,其他客户端在需要的时候,就可以从里面读取出集群元数据了。

3.基于Curator实现HA主备自动切换

HDFS、Kafka、Canal都使用了zk进行Leader选举,所以可以基于Curator实现HA主备自动切换。

HDFS的NameNode是可以部署HA架构的,有主备两台机器。如果主机器宕机了,备用的机器可以感知到并选举为Leader,这样备用的机器就可以作为新的NameNode对外提供服务。

Kafka里的Controller负责管理整个集群的协作,Kafka中任何一个Broker都可以变成Controller,类似于Leader的角色。

Canal也会部署主备两台机器,主机器挂掉了,备用机器就可以跟上去。

4.基于Curator实现Leader选举

(1)Curator实现Leader选举的第一种方式之LeaderLatch

(2)Curator实现Leader选举的第二种方式之LeaderSelector

(1)Curator实现Leader选举的第一种方式之LeaderLatch

通过Curator的LeaderLatch来实现Leader选举:

public class LeaderLatchDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();
        
        //"/leader/latch"这其实是一个znode顺序节点
        LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
        leaderLatch.start();
        leaderLatch.await();//直到等待他成为Leader再往后执行

        //类似于HDFS里,两台机器,其中一台成为了Leader就开始工作
        //另外一台机器可以通过await阻塞在这里,直到Leader挂了,自己就会成为Leader继续工作
        Boolean hasLeaderShip = leaderLatch.hasLeadership();//判断是否成为Leader
        System.out.println("是否成为leader:" + hasLeaderShip);

        Thread.sleep(Integer.MAX_VALUE);
    }
}

(2)Curator实现Leader选举的第二种方式之LeaderSelector

通过Curator的LeaderSelector来实现Leader选举如下:其中,LeaderSelector有两个监听器,可以关注连接状态。

public class LeaderSelectorDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();

        LeaderSelector leaderSelector = new LeaderSelector(
            client,
            "/leader/election",
            new LeaderSelectorListener() {
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    System.out.println("你已经成为了Leader......");
                    //在这里干Leader所有的事情,此时方法不能退出
                    Thread.sleep(Integer.MAX_VALUE);
                }
                
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    System.out.println("连接状态的变化,已经不是Leader......");
                    if (connectionState.equals(ConnectionState.LOST)) {
                        throw new CancelLeadershipException();
                    }
                }
            }
        );
        leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为Leader
        Thread.sleep(Integer.MAX_VALUE);
    }
}

5.基于Curator实现的分布式Barrier

(1)分布式Barrier

(2)分布式双重Barrier

(1)分布式Barrier

很多台机器都可以创建一个Barrier,此时它们都被阻塞了。除非满足一个条件(setBarrier()或removeBarrier()),才能不再阻塞它们。

//DistributedBarrier
public class DistributedBarrierDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();

        DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
        barrier.waitOnBarrier();
    }
}

(2)分布式双重Barrier

//DistributedDoubleBarrier
public class DistributedDoubleBarrierDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();

        DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client, "/barrier/double", 10);
        doubleBarrier.enter();//每台机器都会阻塞在enter这里
        //直到10台机器都调用了enter,就会从enter这里往下执行
        //此时可以做一些计算任务

        doubleBarrier.leave();//每台机器都会阻塞在leave这里,直到10台机器都调用了leave
        //此时就可以继续往下执行
    }
}

6.基于Curator实现分布式计数器

如果真的要实现分布式计数器,最好用Redis来实现。因为Redis的并发量更高,性能更好,功能更加的强大,而且还可以使用lua脚本嵌入进去实现复杂的业务逻辑。但是Redis天生的异步同步机制,存在机器宕机导致的数据不同步风险。然而zk在ZAB协议下的数据同步机制,则不会出现宕机导致数据不同步的问题。

//SharedCount:通过一个节点的值来实现
public class SharedCounterDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();

        SharedCount sharedCount = new SharedCount(client, "/shared/count", 0);
        sharedCount.start();

        sharedCount.addListener(new SharedCountListener() {
            public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {
                System.out.println("分布式计数器变化了......");
            }
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                System.out.println("连接状态变化了.....");
            }
        });

        Boolean result = sharedCount.trySetCount(1);
        System.out.println(sharedCount.getCount());
    }
}

7.基于Curator实现zk的节点和子节点监听机制

(1)基于Curator实现zk的子节点监听机制

(2)基于Curator实现zk的节点数据监听机制

我们使用zk主要用于:

一.对元数据进行增删改查、监听元数据的变化

二.进行Leader选举

有三种类型的节点可以监听:

一.子节点监听PathCache

二.节点监听NodeCache

三.整个节点以下的树监听TreeCache

(1)基于Curator实现zk的子节点监听机制

下面是PathCache实现的子节点监听示例:

public class PathCacheDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();

        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
        //cache就是把zk里的数据缓存到客户端里来
        //可以针对这个缓存的数据加监听器,去观察zk里的数据的变化
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {

            }
        });
        pathChildrenCache.start();
    }
}

(2)基于Curator实现zk的节点数据监听机制

下面是NodeCache实现的节点监听示例:

public class NodeCacheDemo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        client.start();

        final NodeCache nodeCache = new NodeCache(client, "/cluster");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                Stat stat = client.checkExists().forPath("/cluster");
                if (stat == null) {
                    
                } else {
                    nodeCache.getCurrentData();
                }
            }
        });
        nodeCache.start();
    }
}

8.基于Curator创建客户端实例的源码分析

(1)创建CuratorFramework实例使用了构造器模式

(2)创建CuratorFramework实例会初始化CuratorZooKeeperClient实例

(1)创建CuratorFramework实例使用了构造器模式

CuratorFrameworkFactory.newClient()方法使用了构造器模式。首先通过builder()方法创建出Builder实例对象,然后把参数都设置成Builder实例对象的属性,最后通过build()方法把Builder实例对象传入目标类的构造方法中。

public class Demo {
    public static void main(String[] args) throws Exception {
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
      CuratorFramework client = CuratorFrameworkFactory.newClient(
              "127.0.0.1:2181",//zk的地址
              5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
              3000,//连接zk时的超时时间
              retryPolicy
      );
      client.start();
      System.out.println("已经启动Curator客户端");
    }
}

public class CuratorFrameworkFactory {
    //创建CuratorFramework实例使用了构造器模式
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }
    ...
    public static Builder builder() {
        return new Builder();
    }
    
    public static class Builder {
        ...
        private EnsembleProvider ensembleProvider;
        private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
        private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
        private RetryPolicy retryPolicy;
        ...
        public Builder connectString(String connectString) {
            ensembleProvider = new FixedEnsembleProvider(connectString);
            return this;
        }
        
        public Builder sessionTimeoutMs(int sessionTimeoutMs) {
            this.sessionTimeoutMs = sessionTimeoutMs;
            return this;
        }
        
        public Builder connectionTimeoutMs(int connectionTimeoutMs) {
            this.connectionTimeoutMs = connectionTimeoutMs;
            return this;
        }
        
        public Builder retryPolicy(RetryPolicy retryPolicy) {
            this.retryPolicy = retryPolicy;
            return this;
        }
        ...
        public CuratorFramework build() {
            return new CuratorFrameworkImpl(this);
        }
    }
    ...
}

public class CuratorFrameworkImpl implements CuratorFramework {
    ...
    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
        this.client = new CuratorZookeeperClient(
            localZookeeperFactory,
            builder.getEnsembleProvider(),
            builder.getSessionTimeoutMs(),
            builder.getConnectionTimeoutMs(),
            builder.getWaitForShutdownTimeoutMs(),
            new Watcher() {//这里注册了一个zk的watcher
                @Override
                public void process(WatchedEvent watchedEvent) {
                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                    processEvent(event);
                }
            },
            builder.getRetryPolicy(),
            builder.canBeReadOnly(),
            builder.getConnectionHandlingPolicy()
        );
        ...
    }
    ...
}

(2)创建CuratorFramework实例会初始化CuratorZooKeeperClient实例

CuratorFramework实例代表了一个zk客户端,CuratorFramework初始化时会初始化一个CuratorZooKeeperClient实例。

CuratorZooKeeperClient是Curator封装ZooKeeper的客户端。

初始化CuratorZooKeeperClient时会传入一个Watcher监听器。

所以CuratorFrameworkFactory的newClient()方法的主要工作是:初始化CuratorFramework -> 初始化CuratorZooKeeperClient -> 初始化ZookeeperFactory + 注册一个Watcher。

客户端发起与zk的连接,以及注册Watcher监听器,则是由CuratorFramework的start()方法触发的。

9.Curator启动时是如何跟zk建立连接的

ConnectionStateManager的start()方法会启动一个线程处理eventQueue。eventQueue里存放了与zk的网络连接变化事件,eventQueue收到这种事件便会通知ConnectionStateListener。

CuratorZookeeperClient的start()方法会初始化好原生zk客户端,和zk服务器建立一个TCP长连接,而且还会注册一个ConnectionState类型的Watcher监听器,以便能收到zk服务端发送的通知事件。

public class CuratorFrameworkImpl implements CuratorFramework {
    private final CuratorZookeeperClient client;
    private final ConnectionStateManager connectionStateManager;
    private volatile ExecutorService executorService;
    ...
    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {
        ...
        this.client = new CuratorZookeeperClient(...);
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), 
            builder.getSessionTimeoutMs(), 
            builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), 
            builder.getConnectionStateListenerDecorator()
        );
        ...
    }
    ...
    @Override
    public void start() {
        log.info("Starting");
        if (!state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)) {
            throw new IllegalStateException("Cannot be started more than once");
        }
        ...
        //1.启动一个线程监听和zk网络连接的变化事件
        connectionStateManager.start();
        //2.添加一个监听器监听和zk网络连接的变化
        final ConnectionStateListener listener = new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState) {
                    logAsErrorConnectionErrors.set(true);
                }
            }
            @Override
            public boolean doNotDecorate() {
                return true;
            }
        };
        this.getConnectionStateListenable().addListener(listener);
        //3.创建原生zk客户端
        client.start();
        //4.创建一个线程池,执行后台的操作
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
        executorService.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                backgroundOperationsLoop();
                return null;
            }
        });
        if (ensembleTracker != null) {
            ensembleTracker.start();
        }
        log.info(schemaSet.toDocumentation());
    }
    ...
}

public class ConnectionStateManager implements Closeable {
    private final ExecutorService service;
    private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    ...
    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) {
        ...
        service = Executors.newSingleThreadExecutor(threadFactory);
        ...
    }
    ...
    public void start() {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        //启动一个线程
        service.submit(
            new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    processEvents();
                    return null;
                }
            }
        );
    }
    
    private void processEvents() {
        while (state.get() == State.STARTED) {
            int useSessionTimeoutMs = getUseSessionTimeoutMs();
            long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
            long pollMaxMs = useSessionTimeoutMs - elapsedMs;

            final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
            if (newState != null) {
                if (listeners.size() == 0) {
                    log.warn("There are no ConnectionStateListeners registered.");
                }
                listeners.forEach(listener -> listener.stateChanged(client, newState));
            } else if (sessionExpirationPercent > 0) {
                synchronized(this) {
                    checkSessionExpiration();
                }
            }
        }
    }
    ...
}

public class CuratorZookeeperClient implements Closeable {
    private final ConnectionState state;
    ...
    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
            int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
            RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
        ...
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
        ...
    }
    ...
    public void start() throws Exception {
        log.debug("Starting");
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Already started");
        }
        state.start();
    }
    ...
}

class ConnectionState implements Watcher, Closeable {
    private final HandleHolder zooKeeper;
    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, 
            int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, 
            AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if (parentWatcher != null) {
            parentWatchers.offer(parentWatcher);
        }
        //把自己作为Watcher注册给HandleHolder
        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }
    ...
    void start() throws Exception {
        log.debug("Starting");
        ensembleProvider.start();
        reset();
    }
    
    synchronized void reset() throws Exception {
        log.debug("reset");
        instanceIndex.incrementAndGet();
        isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        //创建客户端与zk的连接
        zooKeeper.closeAndReset();
        zooKeeper.getZooKeeper();//initiate connection
    }
    ...
}

class HandleHolder {
    private final ZookeeperFactory zookeeperFactory;
    private final Watcher watcher;
    private final EnsembleProvider ensembleProvider;
    private final int sessionTimeout;
    private final boolean canBeReadOnly;
    private volatile Helper helper;
    ...
    HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly) {
        this.zookeeperFactory = zookeeperFactory;
        this.watcher = watcher;
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeout = sessionTimeout;
        this.canBeReadOnly = canBeReadOnly;
    }
    
    private interface Helper {
        ZooKeeper getZooKeeper() throws Exception;
        String getConnectionString();
        int getNegotiatedSessionTimeoutMs();
    }
    
    ZooKeeper getZooKeeper() throws Exception {
        return (helper != null) ? helper.getZooKeeper() : null;
    }
    
    void closeAndReset() throws Exception {
        internalClose(0);
        helper = new Helper() {
            private volatile ZooKeeper zooKeeperHandle = null;
            private volatile String connectionString = null;
            @Override
            public ZooKeeper getZooKeeper() throws Exception {
                synchronized(this) {
                    if (zooKeeperHandle == null) {
                        connectionString = ensembleProvider.getConnectionString();
                        //创建和zk的连接,初始化变量zooKeeperHandle
                        zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }
                    ...
                    return zooKeeperHandle;
                }
            }
            @Override
            public String getConnectionString() {
                return connectionString;
            }
            @Override
            public int getNegotiatedSessionTimeoutMs() {
                return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
            }
        };
    }
    ...
}

//创建客户端与zk的连接
public class DefaultZookeeperFactory implements ZookeeperFactory {
    @Override
    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
    }
}

10.基于Curator进行增删改查节点的源码分析

(1)基于Curator创建znode节点

(2)基于Curator查询znode节点

(3)基于Curator修改znode节点

(4)基于Curator删除znode节点

Curator的CURD操作,底层都是通过调用zk原生的API来完成的。

(1)基于Curator创建znode节点

创建节点也使用了构造器模式:首先通过CuratorFramework的create()方法创建一个CreateBuilder实例,然后通过CreateBuilder的withMode()等方法设置CreateBuilder的变量,最后通过CreateBuilder的forPath()方法 + 重试调用来创建znode节点。

创建节点时会调用CuratorFramework的getZooKeeper()方法获取zk客户端实例,之后就是通过原生zk客户端的API去创建节点了。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");
        //创建节点
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/my/path", "100".getBytes());
    }
}

public class CuratorFrameworkImpl implements CuratorFramework {
    ...
    @Override
    public CreateBuilder create() {
        checkState();
        return new CreateBuilderImpl(this);
    }
    ...
}

public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> { 
    private final CuratorFrameworkImpl client;
    private CreateMode createMode;
    private Backgrounding backgrounding;
    private boolean createParentsIfNeeded;
    ...
    CreateBuilderImpl(CuratorFrameworkImpl client) {
        this.client = client;
        createMode = CreateMode.PERSISTENT;
        backgrounding = new Backgrounding();
        acling = new ACLing(client.getAclProvider());
        createParentsIfNeeded = false;
        createParentsAsContainers = false;
        compress = false;
        setDataIfExists = false;
        storingStat = null;
        ttl = -1;
    }
    
    @Override
    public String forPath(final String givenPath, byte[] data) throws Exception {
        if (compress) {
            data = client.getCompressionProvider().compress(givenPath, data);
        }

        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
        List<ACL> aclList = acling.getAclList(adjustedPath);
        client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

        String returnPath = null;
        if (backgrounding.inBackground()) {
            pathInBackground(adjustedPath, data, givenPath);
        } else {
            //创建节点
            String path = protectedPathInForeground(adjustedPath, data, aclList);
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }
    
    private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception {
        return pathInForeground(adjustedPath, data, aclList);
    }
    
    private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception {
        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Foreground");
        final AtomicBoolean firstTime = new AtomicBoolean(true);
        //重试调用
        String returnPath = RetryLoop.callWithRetry(
            client.getZookeeperClient(),
            new Callable<String>() {
                @Override
                public String call() throws Exception {
                    boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
                    protectedMode.checkSetSessionId(client, createMode);
                    String createdPath = null;
                    if (!localFirstTime && protectedMode.doProtected()) {
                        debugForceFindProtectedNode = false;
                        createdPath = findProtectedNodeInForeground(path);
                    }
                    if (createdPath == null) {
                        //在创建znode节点的时候,首先会调用CuratorFramework.getZooKeeper()获取zk客户端实例
                        //之后就是通过原生zk客户端的API去创建节点了
                        try {
                            if (client.isZk34CompatibilityMode()) {
                                createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
                            } else {
                                createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
                            }
                        } catch (KeeperException.NoNodeException e) {
                            if (createParentsIfNeeded) {
                                //这就是级联创建节点的实现
                                ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
                                if (client.isZk34CompatibilityMode()) {
                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                                } else {
                                    createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
                                }
                            } else {
                                throw e;
                            }
                        } catch (KeeperException.NodeExistsException e) {
                            if (setDataIfExists) {
                                Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
                                if (storingStat != null) {
                                    DataTree.copyStat(setStat, storingStat);
                                }
                                createdPath = path;
                            } else {
                                throw e;
                            }
                        }
                    }
                    if (failNextCreateForTesting) {
                        failNextCreateForTesting = false;
                        throw new KeeperException.ConnectionLossException();
                    }
                    return createdPath;
                }
            }
        );
        trace.setRequestBytesLength(data).setPath(path).commit();
        return returnPath;
    }
    ...
}

public class CuratorFrameworkImpl implements CuratorFramework {
    private final CuratorZookeeperClient client;
    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
        this.client = new CuratorZookeeperClient(
            localZookeeperFactory,
            builder.getEnsembleProvider(),
            builder.getSessionTimeoutMs(),
            builder.getConnectionTimeoutMs(),
            builder.getWaitForShutdownTimeoutMs(),
            new Watcher() {
                ...
            },
            builder.getRetryPolicy(),
            builder.canBeReadOnly(),
            builder.getConnectionHandlingPolicy()
        );
        ...
    }
    ...
    ZooKeeper getZooKeeper() throws Exception {
        return client.getZooKeeper();
    }
    ...
}

public class CuratorZookeeperClient implements Closeable {
    private final ConnectionState state;
    ...
    public ZooKeeper getZooKeeper() throws Exception {
        Preconditions.checkState(started.get(), "Client is not started");
        return state.getZooKeeper();
    }
    ...
}

class ConnectionState implements Watcher, Closeable {
    private final HandleHolder zooKeeper;
    ...
    ZooKeeper getZooKeeper() throws Exception {
        if (SessionFailRetryLoop.sessionForThreadHasFailed()) {
            throw new SessionFailRetryLoop.SessionFailedException();
        }
        Exception exception = backgroundExceptions.poll();
        if (exception != null) {
            new EventTrace("background-exceptions", tracer.get()).commit();
            throw exception;
        }
        boolean localIsConnected = isConnected.get();
        if (!localIsConnected) {
            checkTimeouts();
        }
        //通过HandleHolder获取ZooKeeper实例
        return zooKeeper.getZooKeeper();
    }
    ...
}

(2)基于Curator查询znode节点

查询节点也使用了构造器模式:首先通过CuratorFramework的getData()方法创建一个GetDataBuilder实例,然后通过GetDataBuilder的forPath()方法 + 重试调用来查询znode节点。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //查询节点
        byte[] dataBytes = client.getData().forPath("/my/path");
        System.out.println(new String(dataBytes));
        //查询子节点
        List<String> children = client.getChildren().forPath("/my");
        System.out.println(children);
    }
}

public class CuratorFrameworkImpl implements CuratorFramework {
    ...
    @Override
    public GetDataBuilder getData() {
        checkState();
        return new GetDataBuilderImpl(this);
    }
    
    @Override
    public GetChildrenBuilder getChildren() {
        checkState();
        return new GetChildrenBuilderImpl(this);
    }
    ...
}

public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> {
    private final CuratorFrameworkImpl  client;
    ...
    @Override
    public byte[] forPath(String path) throws Exception {
        client.getSchemaSet().getSchema(path).validateWatch(path, watching.isWatched() || watching.hasWatcher());
        path = client.fixForNamespace(path);
        byte[] responseData = null;
        if (backgrounding.inBackground()) {
            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
        } else {
            //查询节点
            responseData = pathInForeground(path);
        }
        return responseData;
    }
    
    private byte[] pathInForeground(final String path) throws Exception {
        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground");
        //重试调用
        byte[] responseData = RetryLoop.callWithRetry(
            client.getZookeeperClient(),
            new Callable<byte[]>() {
                @Override
                public byte[] call() throws Exception {
                    byte[] responseData;
                    //通过CuratorFramework获取原生zk客户端实例,然后调用其getData()获取节点
                    if (watching.isWatched()) {
                        responseData = client.getZooKeeper().getData(path, true, responseStat);
                    } else {
                        responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat);
                        watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false);
                    }
                    return responseData;
                }
            }
        );
        trace.setResponseBytesLength(responseData).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(responseStat).commit();
        return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData;
    }
    ...
}

(3)基于Curator修改znode节点

修改节点也使用了构造器模式:首先通过CuratorFramework的setData()方法创建一个SetDataBuilder实例,然后通过SetDataBuilder的forPath()方法 + 重试调用来修改znode节点。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //修改节点
        client.setData().forPath("/my/path", "110".getBytes());
        byte[] dataBytes = client.getData().forPath("/my/path");
        System.out.println(new String(dataBytes));
    }
}

public class CuratorFrameworkImpl implements CuratorFramework {
    ...
    @Override
    public SetDataBuilder setData() {
        checkState();
        return new SetDataBuilderImpl(this);
    }
    ...
}

public class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> {
    private final CuratorFrameworkImpl client;
    ...
    @Override
    public Stat forPath(String path, byte[] data) throws Exception {
        client.getSchemaSet().getSchema(path).validateGeneral(path, data, null);
        if (compress) {
            data = client.getCompressionProvider().compress(path, data);
        }
        path = client.fixForNamespace(path);
        Stat resultStat = null;
        if (backgrounding.inBackground()) {
            client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null);
        } else {
            //修改节点
            resultStat = pathInForeground(path, data);
        }
        return resultStat;
    }
    
    private Stat pathInForeground(final String path, final byte[] data) throws Exception {
        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground");
        //重试调用
        Stat resultStat = RetryLoop.callWithRetry(
            client.getZookeeperClient(),
            new Callable<Stat>() {
                @Override
                public Stat call() throws Exception {
                    //通过CuratorFramework获取原生zk客户端实例,然后调用其setData()修改节点
                    return client.getZooKeeper().setData(path, data, version);
                }
            }
        );
        trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit();
        return resultStat;
    }
    ...
}

(4)基于Curator删除znode节点

删除节点也使用了构造器模式:首先通过CuratorFramework的delete()方法创建一个DeleteBuilder实例,然后通过DeleteBuilder的forPath()方法 + 重试调用来删除znode节点。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //删除节点
        client.delete().forPath("/my/path");
    }
}

public class CuratorFrameworkImpl implements CuratorFramework {
    ...
    @Override
    public DeleteBuilder delete() {
        checkState();
        return new DeleteBuilderImpl(this);
    }
    ...
}

public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> {
    private final CuratorFrameworkImpl client;
    ...
    @Override
    public Void forPath(String path) throws Exception {
        client.getSchemaSet().getSchema(path).validateDelete(path);
        final String unfixedPath = path;
        path = client.fixForNamespace(path);
        if (backgrounding.inBackground()) {
            OperationAndData.ErrorCallback<String> errorCallback = null;
            if (guaranteed) {
                errorCallback = new OperationAndData.ErrorCallback<String>() {
                    @Override
                    public void retriesExhausted(OperationAndData<String> operationAndData) {
                        client.getFailedDeleteManager().addFailedOperation(unfixedPath);
                    }
                };
            }
            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null);
        } else {
            //删除节点
            pathInForeground(path, unfixedPath);
        }
        return null;
    }

    private void pathInForeground(final String path, String unfixedPath) throws Exception {
        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("DeleteBuilderImpl-Foreground");
        //重试调用
        RetryLoop.callWithRetry(
            client.getZookeeperClient(),
            new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    try {
                        //通过CuratorFramework获取原生zk客户端实例,然后调用其delete()删除节点
                        client.getZooKeeper().delete(path, version);
                    } catch (KeeperException.NoNodeException e) {
                        if (!quietly) {
                            throw e;
                        }
                    } catch (KeeperException.NotEmptyException e) {
                        if (deletingChildrenIfNeeded) {
                            ZKPaths.deleteChildren(client.getZooKeeper(), path, true);
                        } else {
                            throw e;
                        }
                    }
                    return null;
                }
            }
        );
        trace.setPath(path).commit();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2329758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每日一题(小白)回溯篇4

深度优先搜索题&#xff1a;找到最长的路径&#xff0c;计算这样的路径有多少条&#xff08;使用回溯&#xff09; 分析题意可以得知&#xff0c;每次向前后左右走一步&#xff0c;直至走完16步就算一条走通路径。要求条件是不能超出4*4的范围&#xff0c;不能重复之前的路径。…

k8s进阶之路:本地集群环境搭建

概述 文章将带领大家搭建一个 master 节点&#xff0c;两个 node 节点的 k8s 集群&#xff0c;容器基于 docker&#xff0c;k8s 版本 v1.32。 一、系统安装 安装之前请大家使用虚拟机将 ubuntu24.04 系统安装完毕&#xff0c;我是基于 mac m1 的系统进行安装的&#xff0c;所…

C++ STL 详解 ——list 的深度解析与实践指南

在 C 的标准模板库&#xff08;STL&#xff09;中&#xff0c;list作为一种重要的序列式容器&#xff0c;以其独特的双向链表结构和丰富的操作功能&#xff0c;在许多编程场景下发挥着关键作用。深入理解list的特性与使用方法&#xff0c;能帮助开发者编写出更高效、灵活的代码…

按键切换LCD显示后,显示总在第二阶段,而不在第一阶段的问题

这是一个密码锁的程序&#xff0c;当在输入密码后&#xff0c;原本是要重置密码&#xff0c;但是程序总是在输入密码正确后总是跳转置设置第二个密码&#xff0c;而第一个密码总是跳过。 不断修改后&#xff0c; 解决方法 将if语句换成switch语句&#xff0c;这样就可以分离程序…

护网蓝初面试题

《网安面试指南》https://mp.weixin.qq.com/s/RIVYDmxI9g_TgGrpbdDKtA?token1860256701&langzh_CN 5000篇网安资料库https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247486065&idx2&snb30ade8200e842743339d428f414475e&chksmc0e4732df793fa3bf39…

C++11: 智能指针

C11: 智能指针 &#xff08;一&#xff09;智能指针原理1.RAll2.智能指针 (二)C11 智能指针1. auto_ptr2. unique_ptr3. shared_ptr4. weak_ptr &#xff08;三&#xff09;shared_ptr中存在的问题std::shared_ptr的循环引用 &#xff08;四&#xff09;删除器&#xff08;五&a…

从零实现本地大模型RAG部署

1. RAG概念 RAG&#xff08;Retrieval-Augmented Generation&#xff09;即检索增强生成&#xff0c;是一种结合信息检索与大型语言模型&#xff08;大模型&#xff09;的技术。从外部知识库&#xff08;如文档、数据库或网页&#xff09;中实时检索相关信息&#xff0c;并将其…

【Linux系统篇】:探索文件系统原理--硬件磁盘、文件系统与链接的“三体宇宙”

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;Linux篇–CSDN博客 文章目录 一.认识硬件--磁盘物理存储结构1.存储介质类型2.物理存储单元3…

Tracing the thoughts of a large language model 简单理解

Tracing the thoughts of a large language model 这篇论文通过电路追踪方法(Circuit Tracing)揭示了大型语言模型Claude 3.5 Haiku的内部机制,其核心原理可归纳为以下几个方面: 1. 方法论核心:归因图与替换模型 替换模型(Replacement Model) 使用跨层转码器(CLT)将原…

OpenCV边缘检测技术详解:原理、实现与应用

概述 边缘检测是计算机视觉和图像处理中最基本也是最重要的技术之一&#xff0c;它通过检测图像中亮度或颜色急剧变化的区域来识别物体的边界。边缘通常对应着场景中物体的物理边界、表面方向的变化或深度不连续处。 分类 OpenCV提供了多种边缘检测算法&#xff0c;下面我们介…

BN 层做预测的时候, 方差均值怎么算

✅ 一、Batch Normalization&#xff08;BN&#xff09;回顾 BN 层在训练和推理阶段的行为是不一样的&#xff0c;核心区别就在于&#xff1a; 训练时用 mini-batch 里的均值方差&#xff0c;预测时用全局的“滑动平均”均值方差。 &#x1f9ea; 二、训练阶段&#xff08;Trai…

JS 其他事件类型

页面加载 事件 window.addEvent() window.addEventListener(load,function(){const btn document.querySelector(button)btn.addEventListener(click,function(){alert(按钮)})})也可以给其他标签加该事件 HTML加载事件 找html标签 也可以给页面直接赋值

AI Agent设计模式五:Orchestrator

概念 &#xff1a;中央任务调度中枢 ✅ 优点&#xff1a;全局资源协调&#xff0c;确保任务执行顺序❌ 缺点&#xff1a;单点故障风险&#xff0c;可能成为性能瓶颈 import operator import osfrom langchain.schema import SystemMessage, HumanMessage from langchain_opena…

MySQL基础 [三] - 数据类型

目录 数据类型分类 ​编辑 数值类型 tinyint bit 浮点类型 float decimal 字符串类型 char varchar varchar和char的比较和选择 日期和时间类型 enum和set enum类型 set类型 enum和set的类型查找 数据类型分类 数值类型 tinyint TINYINT[(M)] [UNSIGNED]是 …

不用训练,集成多个大模型产生更优秀的输出

论文标题 Collab: Controlled Decoding using Mixture of Agents for LLM Alignment 论文地址 https://arxiv.org/pdf/2503.21720 作者背景 JP摩根&#xff0c;马里兰大学帕克分校&#xff0c;普林斯顿大学 动机 大模型对齐&#xff08;alignment&#xff09;的主要目的…

随笔1 认识编译命令

1.认识编译命令 1.1 解释gcc编译命令: gcc test1.cpp -o test1 pkg-config --cflags --libs opencv 命令解析&#xff1a; gcc&#xff1a;GNU C/C 编译器&#xff0c;用于编译C/C代码。 test1.cpp&#xff1a;源代码文件。 -o test1&#xff1a;指定输出的可执行文件名为t…

Hyperlane 框架路由功能详解:静态与动态路由全掌握

Hyperlane 框架路由功能详解&#xff1a;静态与动态路由全掌握 Hyperlane 框架提供了强大而灵活的路由功能&#xff0c;支持静态路由和动态路由两种模式&#xff0c;让开发者能够轻松构建各种复杂的 Web 应用。本文将详细介绍这两种路由的使用方法。 静态路由&#xff1a;简单…

铰链损失函数 Hinge Loss和Keras 实现

一、说明 在为了了解 Keras 深度学习框架的来龙去脉&#xff0c;本文介绍铰链损失函数&#xff0c;然后使用 Keras 实现它们以进行练习并了解它们的行为方式。在这篇博客中&#xff0c;您将首先找到两个损失函数的简要介绍&#xff0c;以确保您在我们继续实现它们之前直观地理解…

瑞数信息发布《BOTS自动化威胁报告》,揭示AI时代网络安全新挑战

近日&#xff0c;瑞数信息正式发布《BOTS自动化威胁报告》&#xff0c;力求通过全景式观察和安全威胁的深度分析&#xff0c;为企业在AI时代下抵御自动化攻击提供安全防护策略&#xff0c;从而降低网络安全事件带来的影响&#xff0c;进一步增强业务韧性和可持续性。 威胁一&am…

FLV格式:流媒体视频的经典选择

FLV格式&#xff1a;流媒体视频的经典选择 FLV&#xff08;Flash Video&#xff09;格式曾经是流媒体视频的主力军&#xff0c;在互联网视频的早期时代广泛应用于视频网站和多媒体平台。凭借其高效的压缩和较小的文件体积&#xff0c;FLV成为了许多视频内容创作者和平台的首选…