zk基础—5.Curator的使用与剖析二

news2025/4/7 17:46:21

大纲

1.基于Curator进行基本的zk数据操作

2.基于Curator实现集群元数据管理

3.基于Curator实现HA主备自动切换

4.基于Curator实现Leader选举

5.基于Curator实现分布式Barrier

6.基于Curator实现分布式计数器

7.基于Curator实现zk的节点和子节点监听机制

8.基于Curator创建客户端实例的源码分析

9.Curator在启动时是如何跟zk建立连接的

10.基于Curator进行增删改查节点的源码分析

11.基于Curator的节点监听回调机制的实现源码

12.基于Curator的Leader选举机制的实现源码

11.Curator节点监听回调机制的实现源码

(1)PathCache子节点监听机制的实现源码

(2)NodeCache节点监听机制的实现源码

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

(4)PathCache实现自动重复注册监听器的效果

(5)NodeCache实现节点变化事件监听的效果

(1)PathCache子节点监听机制的实现源码

PathChildrenCache会调用原生zk客户端对象的getChildren()方法,并往该方法传入一个监听器childrenWatcher。当子节点发生事件,就会通知childrenWatcher这个原生的Watcher,然后该Watcher便会调用注册到PathChildrenCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //PathCache,监听/cluster下的子节点变化
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                ...
            }
        });
        pathChildrenCache.start();
    }
}

public class PathChildrenCache implements Closeable {
    private final WatcherRemoveCuratorFramework client;
    private final String path;
    private final boolean cacheData;
    private final boolean dataIsCompressed;
    private final CloseableExecutorService executorService;
    private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
    ...
    //初始化
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {
        this.client = client.newWatcherRemoveCuratorFramework();
        this.path = PathUtils.validatePath(path);
        this.cacheData = cacheData;
        this.dataIsCompressed = dataIsCompressed;
        this.executorService = executorService;
        ensureContainers = new EnsureContainers(client, path);
    }
    
    //获取用来存放Listener的容器listeners
    public ListenerContainer<PathChildrenCacheListener> getListenable() {
        return listeners;
    }
    
    //启动对子节点的监听
    public void start() throws Exception {
        start(StartMode.NORMAL);
    }
    
    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            //处理连接状态的变化
            handleStateChange(newState);
        }
    };
    
    public void start(StartMode mode) throws Exception {
        ...
        //对建立的zk连接添加Listener
        client.getConnectionStateListenable().addListener(connectionStateListener);
        ...
        //把PathChildrenCache自己传入RefreshOperation中
        //下面的代码其实就是调用PathChildrenCache的refresh()方法
        offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
        ...
    }
    
    //提交一个任务到线程池进行处理
    void offerOperation(final Operation operation) {
        if (operationsQuantizer.add(operation)) {
            submitToExecutor(
                new Runnable() {
                    @Override
                    public void run() {
                        ...
                        operationsQuantizer.remove(operation);
                        //其实就是调用PathChildrenCache的refresh()方法
                        operation.invoke();
                        ...
                    }
                }
            );
        }
    }
    
    private synchronized void submitToExecutor(final Runnable command) {
        if (state.get() == State.STARTED) {
            //提交一个任务到线程池进行处理
            executorService.submit(command);
        }
    }
    ...
}

class RefreshOperation implements Operation {
    private final PathChildrenCache cache;
    private final PathChildrenCache.RefreshMode mode;
    
    RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {
        this.cache = cache;
        this.mode = mode;
    }
    
    @Override
    public void invoke() throws Exception {
        //调用PathChildrenCache的refresh方法,也就是发起对子节点的监听
        cache.refresh(mode);
    }
    ...
}

public class PathChildrenCache implements Closeable {
    ...
    private volatile Watcher childrenWatcher = new Watcher() {
        //重复注册监听器
        //当子节点发生变化事件时,该方法就会被触发调用
        @Override
        public void process(WatchedEvent event) {
            //下面的代码其实依然是调用PathChildrenCache的refresh()方法
            offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
        }
    };
    
    void refresh(final RefreshMode mode) throws Exception {
        ensurePath();
        //创建一个回调,在下面执行client.getChildren()成功时会触发执行该回调
        final BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                if (reRemoveWatchersOnBackgroundClosed()) {
                    return;
                }
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    //处理子节点数据
                    processChildren(event.getChildren(), mode);
                } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
                    if (mode == RefreshMode.NO_NODE_EXCEPTION) {
                        log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);
                        ensureContainers.reset();
                    } else {
                        log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path);
                        ensureContainers.reset();
                        offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION));
                    }
                }
            }
        };
        //下面的代码最后会调用到原生zk客户端的getChildren方法发起对子节点的监听
        //并且添加一个叫childrenWatcher的监听,一个叫callback的后台异步回调
        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
    }
    ...
}

//子节点发生变化事件时,最后都会触发执行EventOperation的invoke()方法
class EventOperation implements Operation {
    private final PathChildrenCache cache;
    private final PathChildrenCacheEvent event;
    
    EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {
        this.cache = cache;
        this.event = event;
    }
    
    @Override
    public void invoke() {
        //调用PathChildrenCache的Listener
        cache.callListeners(event);
    }
    ...
}

(2)NodeCache节点监听机制的实现源码

NodeCache会调用原生zk客户端对象的exists()方法,并往该方法传入一个监听器watcher。当子节点发生事件,就会通知watcher这个原生的Watcher,然后该Watcher便会调用注册到NodeCache的Listener。注意:在传入的监听器Watcher中会实现重复注册Watcher。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");

        //NodeCache
        final NodeCache nodeCache = new NodeCache(client, "/cluster");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                Stat stat = client.checkExists().forPath("/cluster");
                if (stat == null) {
                } else {
                    nodeCache.getCurrentData();
                }
            }
        });
        nodeCache.start();
    }
}

public class NodeCache implements Closeable {
    private final WatcherRemoveCuratorFramework client;
    private final String path;
    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
    ...
    private ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
                if (isConnected.compareAndSet(false, true)) {
                    reset();
                }
            } else {
                isConnected.set(false);
            }
        }
    };
    
    //初始化一个Watcher,作为监听器添加到下面reset()方法执行的client.checkExists()方法中
    private Watcher watcher = new Watcher() {
        //重复注册监听器
        @Override
        public void process(WatchedEvent event) {
            reset();
        }
    };
    
    //初始化一个回调,在下面reset()方法执行client.checkExists()成功时会触发执行该回调
    private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            processBackgroundResult(event);
        }
    };
    
    //初始化NodeCache
    public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) {
        this.client = client.newWatcherRemoveCuratorFramework();
        this.path = PathUtils.validatePath(path);
        this.dataIsCompressed = dataIsCompressed;
    }
    
    //获取存放Listener的容器ListenerContainer
    public ListenerContainer<NodeCacheListener> getListenable() {
        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
        return listeners;
    }
    
    //启动对节点的监听
    public void start() throws Exception {
        start(false);
    }
    
    public void start(boolean buildInitial) throws Exception {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        //对建立的zk连接添加Listener
        client.getConnectionStateListenable().addListener(connectionStateListener);
        if (buildInitial) {
            //调用原生的zk客户端的exists()方法,对节点进行监听
            client.checkExists().creatingParentContainersIfNeeded().forPath(path);
            internalRebuild();
        }
        reset();
    }
    
    private void reset() throws Exception {
        if ((state.get() == State.STARTED) && isConnected.get()) {
            //下面的代码最后会调用原生的zk客户端的exists()方法,对节点进行监听
            //并且添加一个叫watcher的监听,一个叫backgroundCallback的后台异步回调
            client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
        }
    }
    
    private void processBackgroundResult(CuratorEvent event) throws Exception {
        switch (event.getType()) {
            case GET_DATA: {
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    ChildData childData = new ChildData(path, event.getStat(), event.getData());
                    setNewData(childData);
                }
                break;
            }
            case EXISTS: {
                if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
                    setNewData(null);
                } else if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    if (dataIsCompressed) {
                        client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    } else {
                        client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
                    }
                }
                break;
            }
        }
    }
    ...
}

(3)getChildren()方法对子节点注册监听器和后台异步回调说明

getChildren()方法注册的Watcher只有一次性,其注册的回调是一个异步回调。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test", "10".getBytes());
        System.out.println("创建节点'/test");

        client.getChildren().usingWatcher(new CuratorWatcher() {
            public void process(WatchedEvent event) throws Exception {
                //只要通知过一次zk节点的变化,这里就不会再被通知了
                //也就是第一次的通知才有效,这里被执行过一次后,就不会再被执行
                System.out.println("收到一个zk的通知: " + event);
            }
        }).inBackground(new BackgroundCallback() {
            //后台回调通知,表示会让zk.getChildren()在后台异步执行
            //后台异步执行client.getChildren()方法完毕,便会回调这个方法进行通知
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("收到一个后台回调通知: " + event);
            }
        }).forPath("/test");
    }
}

(4)PathCache实现自动重复注册监听器的效果

每当节点发生变化时,就会触发childEvent()方法的调用。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            //只要子节点发生变化,无论变化多少次,每次变化都会触发这里childEvent的调用
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("监听的子节点发生变化,收到了事件通知:" + pathChildrenCacheEvent);
            }
        });
        pathChildrenCache.start();
        System.out.println("完成子节点的监听和启动");
    }
}

(5)NodeCache实现节点变化事件监听的效果

每当节点发生变化时,就会触发nodeChanged()方法的调用。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        final NodeCache nodeCache = new NodeCache(client, "/test/child/id");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            //只要节点发生变化,无论变化多少次,每次变化都会触发这里nodeChanged的调用
            public void nodeChanged() throws Exception {
                Stat stat = client.checkExists().forPath("/test/child/id");
                if (stat != null) {
                    byte[] dataBytes = client.getData().forPath("/test/child/id");
                    System.out.println("节点数据发生了变化:" + new String(dataBytes));
                } else {
                    System.out.println("节点被删除");
                }
            }
        });
        nodeCache.start();
    }
}

12.基于Curator的Leader选举机制的实现源码

(1)第一种Leader选举机制LeaderLatch的源码

(2)第二种Leader选举机制LeaderSelector的源码

利用Curator的CRUD+ 监听回调机制,就能满足大部分系统使用zk的场景了。需要注意的是:如果使用原生的zk去注册监听器来监听节点或者子节点,当节点或子节点发生了对应的事件,会通知客户端一次,但是下一次再有对应的事件就不会通知了。使用zk原生的API时,客户端需要每次收到事件通知后,重新注册监听器。然而Curator的PathCache + NodeCache,会自动重新注册监听器。

(1)第一种Leader选举机制LeaderLatch的源码

Curator客户端会通过创建临时顺序节点的方式来竞争成为Leader的,LeaderLatch这种Leader选举的实现方式与分布式锁的实现几乎一样。

每个Curator客户端创建完临时顺序节点后,就会对/leader/latch目录调用getChildren()方法来获取里面所有的子节点,调用getChildren()方法的结果会通过backgroundCallback回调进行通知,接着客户端便对获取到的子节点进行排序来判断自己是否是第一个子节点。

如果客户端发现自己是第一个子节点,那么就是Leader。如果客户端发现自己不是第一个子节点,就对上一个节点添加一个监听器。在添加监听器时,会使用getData()方法获取自己的上一个节点,getData()方法执行成功后会调用backgrondCallback回调。

当上一个节点对应的客户端释放了Leader角色,上一个节点就会消失,此时就会通知第二个节点对应的客户端,执行getData()方法添加的监听器。

所以如果getData()方法的监听器被触发了,即发现上一个节点不存在了,客户端会调用getChildren()方法重新获取子节点列表,判断是否是Leader。

注意:使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                switch (newState) {
                    case LOST:
                        //当Leader与zk断开时,需要暂停当前Leader的工作
                }
            }
        });
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
        leaderLatch.start();
        leaderLatch.await();//阻塞等待直到当前客户端成为Leader
        Boolean hasLeaderShip = leaderLatch.hasLeadership();
        System.out.println("是否成为Leader: " + hasLeaderShip);
    }
}

public class LeaderLatch implements Closeable {
    private final WatcherRemoveCuratorFramework client;
    private final ConnectionStateListener listener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            handleStateChange(newState);
        }
    };
    ...
    //Add this instance to the leadership election and attempt to acquire leadership.
    public void start() throws Exception {
        ...
        //对建立的zk连接添加Listener
        client.getConnectionStateListenable().addListener(listener);
        reset();
        ...
    }
    
    @VisibleForTesting
    void reset() throws Exception {
        setLeadership(false);
        setNode(null);
        //callback作为成功创建临时顺序节点后的回调
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                ...
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    setNode(event.getName());
                    if (state.get() == State.CLOSED) {
                        setNode(null);
                    } else {
                        //成功创建临时顺序节点,需要通过getChildren()再去获取子节点列表
                        getChildren();
                    }
                } else {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        //创建临时顺序节点
        client.create().creatingParentContainersIfNeeded().withProtection()
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback)
            .forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
    
    //获取子节点列表
    private void getChildren() throws Exception {
        //callback作为成功获取子节点列表后的回调
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }
    
    //检查自己是否是第一个节点
    private void checkLeadership(List<String> children) throws Exception {
        if (debugCheckLeaderShipLatch != null) {
            debugCheckLeaderShipLatch.await();
        }
        final String localOurPath = ourPath.get();
        //对获取到的节点进行排序
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if (ourIndex < 0) {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        } else if (ourIndex == 0) {
            //如果自己是第一个节点,则标记自己为Leader
            setLeadership(true);
        } else {
            //如果自己不是第一个节点,则对前一个节点添加监听
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if ((state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null)) {
                        //重新获取子节点列表
                        getChildren();
                    }
                }
            };
            BackgroundCallback callback = new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
                        reset();
                    }
                }
            };
            //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            //使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
    ...
    //阻塞等待直到成为Leader
    public void await() throws InterruptedException, EOFException {
        synchronized(this) {
            while ((state.get() == State.STARTED) && !hasLeadership.get()) {
                wait();//Objetc对象的wait()方法,阻塞等待
            }
        }
        if (state.get() != State.STARTED) {
            throw new EOFException();
        }
    }
    
    //设置当前客户端成为Leader,并进行notifyAll()通知之前阻塞的线程
    private synchronized void setLeadership(boolean newValue) {
        boolean oldValue = hasLeadership.getAndSet(newValue);
        if (oldValue && !newValue) { // Lost leadership, was true, now false
            listeners.forEach(new Function<LeaderLatchListener, Void>() {
                @Override
                public Void apply(LeaderLatchListener listener) {
                    listener.notLeader();
                    return null;
                }
            });
        } else if (!oldValue && newValue) { // Gained leadership, was false, now true
            listeners.forEach(new Function<LeaderLatchListener, Void>() {
                @Override
                public Void apply(LeaderLatchListener input) {
                    input.isLeader();
                    return null;
                }
            });
        }
        notifyAll();//唤醒之前执行了wait()方法的线程
    }
}

(2)第二种Leader选举机制LeaderSelector的源码

通过判断是否成功获取到分布式锁,来判断是否竞争成为Leader。正因为是通过持有分布式锁来成为Leader,所以LeaderSelector.takeLeadership()方法不能退出,否则就会释放锁。而一旦释放了锁,其他客户端就会竞争锁成功而成为新的Leader。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        LeaderSelector leaderSelector = new LeaderSelector(
            client,
            "/leader/election",
            new LeaderSelectorListener() {
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    System.out.println("你已经成为了Leader......");
                    //在这里干Leader所有的事情,此时方法不能退出
                    Thread.sleep(Integer.MAX_VALUE);
                }
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    System.out.println("连接状态的变化,已经不是Leader......");
                    if (connectionState.equals(ConnectionState.LOST)) {
                        throw new CancelLeadershipException();
                    }
                }
            }
        );
        leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为Leader
        Thread.sleep(Integer.MAX_VALUE);
    }
}

public class LeaderSelector implements Closeable {
    private final CuratorFramework client;
    private final LeaderSelectorListener listener;
    private final CloseableExecutorService executorService;
    private final InterProcessMutex mutex;
    ...
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) {
        Preconditions.checkNotNull(client, "client cannot be null");
        PathUtils.validatePath(leaderPath);
        Preconditions.checkNotNull(listener, "listener cannot be null");

        this.client = client;
        this.listener = new WrappedListener(this, listener);
        hasLeadership = false;
        this.executorService = executorService;
        //初始化一个分布式锁
        mutex = new InterProcessMutex(client, leaderPath) {
            @Override
            protected byte[] getLockNodeBytes() {
                return (id.length() > 0) ? getIdBytes(id) : null;
            }
        };
    }
    
    public void start() {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        Preconditions.checkState(!executorService.isShutdown(), "Already started");
        Preconditions.checkState(!hasLeadership, "Already has leadership");
        client.getConnectionStateListenable().addListener(listener);
        requeue();
    }
    
    public boolean requeue() {
        Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
        return internalRequeue();
    }
    
    private synchronized boolean internalRequeue() {
        if (!isQueued && (state.get() == State.STARTED)) {
            isQueued = true;
            //将选举的工作作为一个任务交给线程池执行
            Future<Void> task = executorService.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    ...
                    doWorkLoop();
                    ...
                    return null;
                }
            });
            ourTask.set(task);
            return true;
        }
        return false;
    }
    
    private void doWorkLoop() throws Exception {
        ...
        doWork();
        ...
    }
    
    @VisibleForTesting
    void doWork() throws Exception {
        hasLeadership = false;
        try {
            //尝试获取一把分布式锁,获取失败会进行阻塞
            mutex.acquire();
            //执行到这一行代码,说明获取分布式锁成功
            hasLeadership = true;
            try {
                if (debugLeadershipLatch != null) {
                    debugLeadershipLatch.countDown();
                }
                if (debugLeadershipWaitLatch != null) {
                    debugLeadershipWaitLatch.await();
                }
                //回调用户重写的takeLeadership()方法
                listener.takeLeadership(client);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (Throwable e) {
                ThreadUtils.checkInterrupted(e);
            } finally {
                clearIsQueued();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } finally {
            if (hasLeadership) {
                hasLeadership = false;
                boolean wasInterrupted = Thread.interrupted();  // clear any interrupted tatus so that mutex.release() works immediately
                try {
                    //释放锁
                    mutex.release();
                } catch (Exception e) {
                    if (failedMutexReleaseCount != null) {
                        failedMutexReleaseCount.incrementAndGet();
                    }
                    ThreadUtils.checkInterrupted(e);
                    log.error("The leader threw an exception", e);
                } finally {
                    if (wasInterrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    ...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2329318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android使用OpenGL和MediaCodec录制

目录 一,什么是opengl 二,什么是Android OpenGL ES 三, OpenGL 绘制流程 四, OpenGL坐标系 五, OpenGL 着色器 六, GLSL编程语言 七,使用MediaCodec录制在Opengl中渲染架构 八,代码实现 8.1 自定义渲染view继承GLSurfaceView 8.2 自定义渲染器TigerRender 8.3 创建编…

《如何避免虚无》速读笔记

文章目录 书籍信息概览躺派&#xff08;出世&#xff09;卷派&#xff08;入世&#xff09;虚无篇&#xff1a;直面虚无自我篇&#xff1a;认识自我孤独篇&#xff1a;应对孤独幸福篇&#xff1a;追寻幸福超越篇&#xff1a;超越自我 书籍信息 书名&#xff1a;《如何避免虚无…

哈尔滨工业大学:大模型时代的具身智能

大家好&#xff0c;我是樱木。 机器人在工业领域&#xff0c;已经逐渐成熟。具身容易&#xff0c;智能难。 机器人-》智能机器人&#xff0c;需要自主能力&#xff0c;加上通用能力。 智能机器人-》人类&#xff0c;这个阶段就太有想象空间了。而最受关注的-类人机器人。 如何…

理解OSPF 特殊区域NSSA和各类LSA特点

本文基于上文 理解OSPF Stub区域和各类LSA特点 在理解了Stub区域之后&#xff0c;我们再来理解一下NSSA区域&#xff0c;NSSA区域用于需要引入少量外部路由&#xff0c;同时又需要保持Stub区域特性的情况 一、 网络总拓扑图 我们在R1上配置黑洞路由&#xff0c;来模拟NSSA区域…

如何通过优化HMI设计大幅提升产品竞争力?

一、HMI设计的重要性与竞争力提升 HMI&#xff08;人机交互界面&#xff09;设计在现代产品开发中扮演着至关重要的角色。良好的HMI设计不仅能够提升用户体验&#xff0c;还能显著增强产品的竞争力。在功能趋同的市场环境中&#xff0c;用户体验成为产品竞争的关键。HMI设计通…

Linux信号——信号的处理(3)

信号是什么时候被处理&#xff1f; 进程从内核态&#xff0c;切换到用户态的时候&#xff0c;信号会被检测处理。 内核态&#xff1a;操作系统的状态&#xff0c;权限级别高 用户态&#xff1a;你自己的状态 内核态和用户态 进程地址空间第三次 所谓的系统调用本质其实是一堆…

Pod的调度

在默认情况下&#xff0c;一个Pod在哪个Node节点上运行&#xff0c;是由Scheduler组件采用相应的算法计算出来的&#xff0c;这个过程是不受人工控制的。但是在实际使用中&#xff0c;这并不满足的需求&#xff0c;因为很多情况下&#xff0c;我们想控制某些Pod到达某些节点上&…

LabVIEW面向对象编程设计方法

一、概述 面向对象编程&#xff08;OOP&#xff09;在软件开发中占据重要地位&#xff0c;尤其是在大规模软件项目中。它与小型程序开发思路不同&#xff0c;更注重未来功能的升级与扩展。在设计阶段&#xff0c;需思考如何构建既灵活又稳定的系统&#xff0c;这涉及众多设计方…

Dify票据识别遇到的分支判断不准确问题

已测试这篇文章中 https://zhuanlan.zhihu.com/p/5465385787 使用多分支条件判断使用不同的大模型识别图片内容 发现了细节问题。在使用时若不注意&#xff0c;分支会出现走向不准的问题。 需要关注部分 下方红框处。1&#xff0c;2后不能跟点。否则会出问。除此之外&#xff0…

《全栈+双客户端Turnkey方案》架构设计图

今天分享一些全栈双客户端Turnkey方案的架构与结构图。 1&#xff1a;三种分布式部署方案:网关方案&#xff0c;超级服务器单服方案&#xff0c;直连逻辑服方案 2: 单服多线程核心架构: 系统服务逻辑服服务 3: 系统服务的多线程池调度设计 4:LogicServer Update与ECS架构&…

某碰瓷国赛美赛,号称第三赛事的数模竞赛

首先我非常不能理解的就是怎么好意思自称第三赛事的呢&#xff1f;下面我们进行一个简单讨论&#xff0c;当然这里不对国赛和美赛进行讨论。首先我们来明确一点&#xff0c;比赛的含金量由什么来定&#xff1f;这个可能大家的评价指标可能不唯一&#xff0c;我通过DeepSeek选取…

【大模型深度学习】如何估算大模型需要的显存

一、模型参数量 参数量的单位 参数量指的是模型中所有权重和偏置的数量总和。在大模型中&#xff0c;参数量的单位通常以“百万”&#xff08;M&#xff09;或“亿”&#xff08;B&#xff0c;也常说十亿&#xff09;来表示。 百万&#xff08;M&#xff09;&#xff1a;表示…

Mysql 数据库编程技术01

一、数据库基础 1.1 认识数据库 为什么学习数据库 瞬时数据&#xff1a;比如内存中的数据&#xff0c;是不能永久保存的。持久化数据&#xff1a;比如持久化至数据库中或者文档中&#xff0c;能够长久保存。 数据库是“按照数据结构来组织、存储和管理数据的仓库”。是一个长…

Mysql慢查询设置 和 建立索引

1 .mysql慢查询的设置 slow_query_log ON //或 slow_query_log_file /usr/local/mysql/data/slow.log long_query_time 2 修改后重启动mysql 1.1 查看设置后的参数 mysql> show variables like slow_query%; --------------------------------------------------…

【Android】界面布局-相对布局RelativeLayout-例子

题目 完成下面相对布局&#xff0c;要求&#xff1a; 中间的button在整个屏幕的中央&#xff0c;其他的以它为基准排列。Hints&#xff1a;利用layout_toEndof,_toRightof,_toLeftof,_toStartof完成。 结果演示 代码实现 <?xml version"1.0" encoding"u…

Spring Boot 中使用 Redis:从入门到实战

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

7-1 素数求和(线性筛实现)

7-1 素数求和。 分数 10 中等 全屏浏览 切换布局 作者 魏英 单位 浙江科技大学 输入两个正整数m和n&#xff08;1<m<n<500&#xff09;统计并输出m和n之间的素数个数以及这些素数的和。 输入格式: 输入两个正整数m和n&#xff08;1<m<n<500&#xff0…

ZKmall开源商城多云高可用架构方案:AWS/Azure/阿里云全栈实践

随着企业数字化转型的加速&#xff0c;云计算服务已成为IT战略中的核心部分。ZKmall开源商城作为一款高性能的开源商城系统&#xff0c;其在多云环境下的高可用架构方案备受关注。下面将结合AWS、Azure和阿里云三大主流云平台&#xff0c;探讨ZKmall的多云高可用架构全栈实践。…

leetcode二叉树刷题调试不方便的解决办法

1. 二叉树不易构建 在leetcode中刷题时&#xff0c;如果没有会员就需要将代码拷贝到本地的编译器进行调试。但是leetcode中有一类题可谓是毒瘤&#xff0c;那就是二叉树的题。 要调试二叉树有关的题需要根据测试用例给出的前序遍历&#xff0c;自己构建一个二叉树&#xff0c;…

颜色性格测试:探索你的内在性格色彩

颜色性格测试&#xff1a;探索你的内在性格色彩 在我们的日常生活中&#xff0c;颜色无处不在&#xff0c;而我们对颜色的偏好往往能反映出我们内在的性格特质。今天我要分享一个有趣的在线工具 —— 颜色性格测试&#xff0c;它能通过你最喜欢的颜色来分析你的性格倾向。 &…