Zookeeper详解(二)——API 事件监听

news2025/1/11 1:20:36

Java API

znode是zooKeeper集合的核心组件,zookeeper API提供了一小组方法使用zookeeper集合来操纵znode的所有细节。

客户端应该遵循以下步骤,与zookeeper服务器进行清晰和干净的交互。

  • 连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID。
  • 定期向服务器发送心跳。否则,zookeeper服务器将过期会话ID,客户端需要重新连接。
  • 只要会话ID处于活动状态,就可以获取/设置znode。
  • 所有任务完成后,断开与zookeeper服务器的连接。如果客户端长时间不活动,则zookeeper服务器将自动断开客户端。

pom.xml

	<properties>
        <java.version>1.8</java.version>
        <!-- zookeeper -->
        <zookeeper.version>3.4.14</zookeeper.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Begin -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- Spring Boot End -->

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

        <!--添加zookeeper3.4.9版本-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <exclusions>
                <!--排除zookeeper自带的slf4j,不然有引用slf4j会jar冲突-->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
            <version>${zookeeper.version}</version>
        </dependency>
    </dependencies>

连接

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
  • connectionString - zookeeper主机
  • sessionTimeout - 会话超时(以毫秒为单位)
  • watcher - 实现“监视器”对象。zookeeper集合通过监视器对象返回连接状态。
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;

/**
 * Title:Zookeeper连接
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/3
 */
public class ZookeeperConnection {

    public static void main(String[] args) {
        try {
            // 计数器对象,countDown一次放行
            CountDownLatch countDownLatch = new CountDownLatch(1);
            // arg1:服务器的ip和端口
            // arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
            // arg3:监视器对象
            ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()==Event.KeeperState.SyncConnected) {
                        System.out.println("连接创建成功!");
                        countDownLatch.countDown();
                    }
                }
            });
            // 主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
            // 会话编号
            System.out.println(zooKeeper.getSessionId());
            zooKeeper.close();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

新增节点

// 同步方式 
create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 
// 异步方式 
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
  • path - znode路径。例如,/node1 /node1/node11
  • data - 要存储在指定znode路径中的数据
  • acl - 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口
  • ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE,返回打开znode的acl列表。
  • createMode - 节点的类型,这是一个枚举。
  • callBack - 异步回调接口
  • **ctx **- 传递上下文参数
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKCreate {

    String IP="192.168.60.130:2181";
    ZooKeeper zooKeeper;

    @Before
    public void before()throws Exception{
        // 计数器对象
        CountDownLatch countDownLatch=new CountDownLatch(1);
        // arg1:服务器的ip和端口
        // arg2:客户端与服务器之间的会话超时时间  以毫秒为单位的
        // arg3:监视器对象
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()==Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                }
            }
        });
        // 主线程阻塞等待连接对象的创建成功
        countDownLatch.await();
    }

    @After
    public void after()throws Exception{
        zooKeeper.close();
    }

    @Test
    public void create1()throws Exception{
        // arg1:节点的路径
        // arg2:节点的数据
        // arg3:权限列表  world:anyone:cdrwa
        // arg4:节点类型  持久化节点
        zooKeeper.create("/create/node1","node1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void create2() throws Exception {
        // Ids.READ_ACL_UNSAFE world:anyone:r
        zooKeeper.create("/create/node2", "node2".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void create3() throws Exception {
        // world授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("world", "anyone");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.READ, id));
        acls.add(new ACL(ZooDefs.Perms.WRITE, id));
        zooKeeper.create("/create/node3", "node3".getBytes(), acls, CreateMode.PERSISTENT);
    }

    @Test
    public void create4() throws Exception {
        // ip授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("ip", "192.168.60.130");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/create/node4", "node4".getBytes(), acls, CreateMode.PERSISTENT);
    }

    @Test
    public void create5() throws Exception {
        // auth授权模式
        // 添加授权用户
        zooKeeper.addAuthInfo("digest", "itcast:123456".getBytes());
        zooKeeper.create("/create/node5", "node5".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    }

    @Test
    public void create6() throws Exception {
        // auth授权模式
        // 添加授权用户
        zooKeeper.addAuthInfo("digest", "itcast:123456".getBytes());
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("auth", "itcast");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.READ, id));
        zooKeeper.create("/create/node6", "node6".getBytes(), acls, CreateMode.PERSISTENT);
    }

    @Test
    public void create7() throws Exception {
        // digest授权模式
        // 权限列表
        List<ACL> acls = new ArrayList<ACL>();
        // 授权模式和授权对象
        Id id = new Id("digest", "itheima:qlzQzCLKhBROghkooLvb+Mlwv4A=");
        // 权限设置
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/create/node7", "node7".getBytes(), acls, CreateMode.PERSISTENT);
    }

    @Test
    public void create8() throws Exception {
        // 持久化顺序节点
        // Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
        String result = zooKeeper.create("/create/node8", "node8".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println(result);
    }

    @Test
    public void create9() throws Exception {
        //  临时节点
        // Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
        String result = zooKeeper.create("/create/node9", "node9".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(result);
    }

    @Test
    public void create10() throws Exception {
        // 临时顺序节点
        // Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
        String result = zooKeeper.create("/create/node10", "node10".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(result);
    }

    @Test
    public void create11() throws Exception {
        // 异步方式创建节点
        zooKeeper.create("/create/node11", "node11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                // 0 代表创建成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 节点的路径
                System.out.println(name);
                // 上下文参数
                System.out.println(ctx);

            }
        }, "I am context");
        Thread.sleep(10000);
        System.out.println("结束");
    }
}

更新节点

// 同步方式 
setData(String path, byte[] data, int version) 
// 异步方式 
setData(String path, byte[] data, int version,AsyncCallback.StatCallback callBack, Object ctx)
  • path- znode路径
  • data - 要存储在指定znode路径中的数据。
  • version- znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。
  • callBack-异步回调接口
  • ctx-传递上下文参数
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class ZKSet {

    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper;

    @Before
    public void before() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // arg1:zookeeper服务器的ip地址和端口号
        // arg2:连接的超时时间  以毫秒为单位
        // arg3:监听器对象
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                }
            }
        });
        // 使主线程阻塞等待
        countDownLatch.await();
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    @Test
    public void set1() throws Exception {
        // arg1:节点的路径
        // arg2:修改的数据
        // arg3:数据版本号 -1代表版本号不参与更新,写其他数字版本号则必须是zooKeeper的dataVersion,不会无法更新成功
        Stat stat = zooKeeper.setData("/set/node1", "node13".getBytes(), -1);
        // 当前节点的版本号
        System.out.println(stat.getVersion());

    }

    @Test
    public void set2() throws Exception {
        zooKeeper.setData("/set/node1", "node14".getBytes(), -1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                // 0代表修改成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数对象
                System.out.println(ctx);
                // 属性描述对象
                System.out.println(stat.getVersion());
            }
        }, "I am Context");
        Thread.sleep(10000);
        System.out.println("结束");
    }
}

删除节点

// 同步方式 
delete(String path, int version)
// 异步方式 
delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx)
  • path - znode路径。
  • version - znode的当前版本
  • **callBack **- 异步回调接口
  • ctx - 传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;

public class ZKDelete {
    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper;

    @Before
    public void before() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // arg1:zookeeper服务器的ip地址和端口号
        // arg2:连接的超时时间  以毫秒为单位
        // arg3:监听器对象
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                }
            }
        });
        // 使主线程阻塞等待
        countDownLatch.await();
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    @Test
    public void delete1() throws Exception {
        // arg1:删除节点的节点路径
        // arg2:数据版本信息 -1代表删除节点时不考虑版本信息
        zooKeeper.delete("/delete/node1",-1);
    }

    @Test
    public void delete2() throws Exception {
        // 异步使用方式
        zooKeeper.delete("/delete/node2", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                // 0代表删除成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数对象
                System.out.println(ctx);
            }
        },"I am Context");
        Thread.sleep(10000);
        System.out.println("结束");
    }
}

查看节点

// 同步方式 
getData(String path, boolean b, Stat stat) 
// 异步方式
getData(String path, boolean b,AsyncCallback.DataCallback callBack, Object ctx)
  • path - znode路径。
  • b- 是否使用连接对象中注册的监视器。
  • stat - 返回znode的元数据。
  • callBack-异步回调接口
  • ctx-传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class ZKGet {

    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper;

    @Before
    public void before() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // arg1:zookeeper服务器的ip地址和端口号
        // arg2:连接的超时时间  以毫秒为单位
        // arg3:监听器对象
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                }
            }
        });
        // 使主线程阻塞等待
        countDownLatch.await();
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    @Test
    public void get1() throws Exception {
        // arg1:节点的路径
        // arg3:读取节点属性的对象
        Stat stat=new Stat();
        byte [] bys=zooKeeper.getData("/get/node1",false,stat);
        // 打印数据
        System.out.println(new String(bys));
        // 版本信息
        System.out.println(stat.getVersion());
    }

    @Test
    public void get2() throws Exception {
        //异步方式
        zooKeeper.getData("/get/node1", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                // 0代表读取成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数对象
                System.out.println(ctx);
                // 数据
                System.out.println(new String(data));
                // 属性对象
                System.out.println(stat.getVersion());
            }
        },"I am Context");
        Thread.sleep(10000);
        System.out.println("结束");
    }
}

查看子节点

// 同步方式 
getChildren(String path, boolean b) 
// 异步方式 
getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)
  • path - Znode路径。
  • b- 是否使用连接对象中注册的监视器。
  • callBack - 异步回调接口。
  • ctx-传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKGetChid {
    
    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper;

    @Before
    public void before() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // arg1:zookeeper服务器的ip地址和端口号
        // arg2:连接的超时时间  以毫秒为单位
        // arg3:监听器对象
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                }
            }
        });
        // 使主线程阻塞等待
        countDownLatch.await();
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    @Test
    public void get1() throws Exception {
        // arg1:节点的路径
        List<String> list = zooKeeper.getChildren("/get", false);
        for (String str : list) {
            System.out.println(str);
        }
    }

    @Test
    public void get2() throws Exception {
        // 异步用法
        zooKeeper.getChildren("/get", false, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children) {
                // 0代表读取成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数对象
                System.out.println(ctx);
                // 子节点信息
                for (String str : children) {
                    System.out.println(str);
                }
            }
        },"I am Context");
        Thread.sleep(10000);
        System.out.println("结束");
    }
}

检查节点是否存在

// 同步方法 
exists(String path, boolean b) 
// 异步方法 
exists(String path, boolean b,AsyncCallback.StatCallback callBack,Object ctx)
  • path- znode路径。
  • b- 是否使用连接对象中注册的监视器。
  • callBack - 异步回调接口。
  • ctx-传递上下文参数
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class ZKExists {
    
    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper;

    @Before
    public void before() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // arg1:zookeeper服务器的ip地址和端口号
        // arg2:连接的超时时间  以毫秒为单位
        // arg3:监听器对象
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                }
            }
        });
        // 使主线程阻塞等待
        countDownLatch.await();
    }

    @After
    public void after() throws Exception {
        zooKeeper.close();
    }

    @Test
    public void exists1() throws Exception {
        // arg1:节点的路径
        Stat stat=zooKeeper.exists("/exists1",false);
        // 节点的版本信息
        System.out.println(stat.getVersion());
    }

    @Test
    public void exists2() throws Exception {
        // 异步方式
        zooKeeper.exists("/exists1", false, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                // 0 代表方式执行成功
                System.out.println(rc);
                // 节点的路径
                System.out.println(path);
                // 上下文参数
                System.out.println(ctx);
                // 节点的版本信息
                System.out.println(stat.getVersion());
            }
        },"I am Context");
        Thread.sleep(10000);
        System.out.println("结束");
    }
}

事件监听机制

事件监听机制watcher

zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者 .

zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。

watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式

Watcher实现由三个部分组成:

  • Zookeeper服务端
  • Zookeeper客户端
  • 客户端的ZKWatchManager对象

客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端, 接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程

在这里插入图片描述

watcher特性:

特性说明
一次性watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
客户端顺序回watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
轻量级WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容
时效性watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知

watcher接口设计

Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类:KeeperState、EventType

在这里插入图片描述

  • Watcher通知状态**(KeeperState)**

    KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性如下:

    枚举属性说明
    SyncConnected客户端与服务器正常连接时
    Disconnected客户端与服务器断开连接时
    Expired会话session失效时
    AuthFailed身份认证失败时
  • Watcher事件类型**(EventType)**

    EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举属性如下:

    枚举属性说明
    None
    NodeCreatedWatcher监听的数据节点被创建时
    NodeDeletedWatcher监听的数据节点被删除时
    NodeDataChangedWatcher监听的数据节点内容发生变更时(无论内容数据是否变化)
    NodeChildrenChangedWatcher监听的数据节点的子节点列表发生变更时

注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;

捕获相应的事件:

zookeeper客户端连接的状态和zookeeper对znode节点监听的事件类型,下面我们来讲解如何建立zookeeper的watcher监听。在zookeeper中采用zk.getChildren(path, watch)、zk.exists(path, watch)、zk.getData(path, watcher, stat)这样的方式为某个znode注册监听。

表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:

注册方式CreatedChildrenChangedChangedDeleted
zk.exists(“/node-x”,watcher)可监控可监控可监控
zk.getData(“/node-x”,watcher)可监控可监控
zk.getChildren(“/node-x”,watcher)可监控可监控

案例代码

连接状态

客服端与服务器的连接状态

KeeperState通知状态 
SyncConnected:客户端与服务器正常连接时 
Disconnected:客户端与服务器断开连接时
Expired:会话session失效时 
AuthFailed:身份认证失败时 

事件类型为:None

代码

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * Title:自定义Watcher监听类
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/4
 */
public class ZKConnectionWatcher implements Watcher {

    // 计数器对象
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    // 连接对象
    static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 事件类型
            if (event.getType() == Event.EventType.None) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    //连接成功放行
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("断开连接!");
                } else if (event.getState() == Event.KeeperState.Expired) {
                    System.out.println("会话超时!");
                    //超时重连
                    zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("认证失败!");
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
            // 阻塞线程等待连接的创建
            countDownLatch.await();
            // 会话id
            System.out.println(zooKeeper.getSessionId());
            // 添加授权用户
            zooKeeper.addAuthInfo("digest1","itcast1:1234561".getBytes());
            byte[] bs=zooKeeper.getData("/node1",false,null);
            System.out.println(new String(bs));
            Thread.sleep(50000);
            zooKeeper.close();
            System.out.println("结束");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

exists方法

检查节点是否存在

// 使用连接对象ZooKeeper的监视器
exists(String path, boolean b) 
// 自定义监视器 
exists(String path, Watcher w) 
    
// NodeCreated:节点创建 
// NodeDeleted:节点删除 
// NodeDataChanged:节点内容发生变化
  • path- znode路径。
  • b- 是否使用连接对象中注册的监视器。
  • w-监视器对象。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * Title:exists
 * Description:数据发生变化,打印watcher内容
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/4
 */
public class ZKWatcherExists {

    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接zookeeper客户端
        zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("连接对象的参数!");
                // 连接成功
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

    // 数据发生变化,打印watcher内容
    @Test
    public void watcherExists1() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:使用zooKeeper连接对象中的watcher
        zooKeeper.exists("/watcher1", true);
        Thread.sleep(50000);
        System.out.println("结束");
    }


    @Test
    public void watcherExists2() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:自定义watcher对象
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("自定义watcher");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherExists3() throws KeeperException, InterruptedException {
        // watcher一次性,一次注册,一次通知,打印一次就没了
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    // 再次监听,一直监听
                    zooKeeper.exists("/watcher1", this);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        zooKeeper.exists("/watcher1", watcher);
        Thread.sleep(80000);
        System.out.println("结束");
    }

    @Test
    public void watcherExists4() throws KeeperException, InterruptedException {
        // 注册多个监听器对象
        // 数据变化,打印2次watcher
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("1");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("2");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        Thread.sleep(80000);
        System.out.println("结束");
    }
}

getData方法

查看节点

// 使用连接对象的监视器 
getData(String path, boolean b, Stat stat) 
// 自定义监视器 
getData(String path, Watcher w, Stat stat) 

// NodeDeleted:节点删除 
// NodeDataChanged:节点内容发生变化
  • path- znode路径。
  • b- 是否使用连接对象中注册的监视器。
  • w-监视器对象。
  • stat- 返回znode的元数据。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZKWatcherGetData {

    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接zookeeper客户端
        zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("连接对象的参数!");
                // 连接成功
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void watcherGetData1() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:使用连接对象中的watcher
        zooKeeper.getData("/watcher2", true, null);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetData2() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:自定义watcher对象
        zooKeeper.getData("/watcher2", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("自定义watcher");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        }, null);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetData3() throws KeeperException, InterruptedException {
        // 一次性
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if(event.getType()==Event.EventType.NodeDataChanged) {
                        zooKeeper.getData("/watcher2", this, null);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        zooKeeper.getData("/watcher2", watcher, null);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetData4() throws KeeperException, InterruptedException {
        // 注册多个监听器对象
        zooKeeper.getData("/watcher2", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("1");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if(event.getType()==Event.EventType.NodeDataChanged) {
                        zooKeeper.getData("/watcher2", this, null);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        },null);
        zooKeeper.getData("/watcher2", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("2");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if(event.getType()==Event.EventType.NodeDataChanged) {
                        zooKeeper.getData("/watcher2", this, null);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        },null);
        Thread.sleep(50000);
        System.out.println("结束");
    }
}

getChildren方法

查看子节点

// 使用连接对象的监视器 
getChildren(String path, boolean b) 
// 自定义监视器 
getChildren(String path, Watcher w) 
    
// NodeChildrenChanged:子节点发生变化
// NodeDeleted:节点删除
  • path- znode路径。
  • b- 是否使用连接对象中注册的监视器。
  • w-监视器对象。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKWatcherGetChild {
    
    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch connectedSemaphore = new CountDownLatch(1);
        // 连接zookeeper客户端
        zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("连接对象的参数!");
                // 连接成功
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectedSemaphore.countDown();
                }
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        connectedSemaphore.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void watcherGetChild1() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:使用连接对象中的watcher
        zooKeeper.getChildren("/watcher3", true);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetChild2() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:自定义watcher
        zooKeeper.getChildren("/watcher3", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("自定义watcher");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetChild3() throws KeeperException, InterruptedException {
        // 一次性
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        zooKeeper.getChildren("/watcher3", this);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        zooKeeper.getChildren("/watcher3", watcher);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetChild4() throws KeeperException, InterruptedException {
        // 多个监视器对象
        zooKeeper.getChildren("/watcher3", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("1");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        zooKeeper.getChildren("/watcher3", this);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        zooKeeper.getChildren("/watcher3", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("2");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        zooKeeper.getChildren("/watcher3", this);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        Thread.sleep(50000);
        System.out.println("结束");
    }
}

场景案例

配置中心

工作中有这样的一个场景: 数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存。

若数据库的用户名和密码改变的时候,还需要重新加载缓存,比较麻烦,通过ZooKeeper可以轻松完成,当数据库发生变化时自动完成缓存同步。

设计思路:

  1. 连接zookeeper服务器
  2. 读取zookeeper中的配置信息,注册watcher监听器,存入本地变量
  3. 当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件
  4. 重新获取配置信息
import java.util.concurrent.CountDownLatch;

import com.wzq.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

/**
 * Title:zookeeper实现配置中心
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/4
 */
public class MyConfigCenter implements Watcher {

    // zk的连接串
    String IP = "192.168.60.130:2181";
    // 计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    // 连接对象
    static ZooKeeper zooKeeper;

    // 用于本地化存储配置信息
    private String url;
    private String username;
    private String password;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 捕获事件状态
            if (event.getType() == EventType.None) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接成功");
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("连接断开!");
                } else if (event.getState() == Event.KeeperState.Expired) {
                    System.out.println("连接超时!");
                    // 超时后服务器端已经将连接释放,需要重新连接服务器端
                    zooKeeper = new ZooKeeper("192.168.60.130:2181", 6000,
                            new ZKConnectionWatcher());
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("验证失败!");
                }
                // 当配置信息发生变化时,再次读取
            } else if (event.getType() == EventType.NodeDataChanged) {
                initValue();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    // 构造方法
    public MyConfigCenter() {
        initValue();
    }
    
    // 连接zookeeper服务器,读取配置信息
    public void initValue() {
        try {
            // 创建连接对象
            zooKeeper = new ZooKeeper(IP, 5000, this);
            // 阻塞线程,等待连接的创建成功
            countDownLatch.await();
            // 读取配置信息
            this.url = new String(zooKeeper.getData("/config/url", true, null));
            this.username = new String(zooKeeper.getData("/config/username", true, null));
            this.password = new String(zooKeeper.getData("/config/password", true, null));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        try {
            MyConfigCenter myConfigCenter = new MyConfigCenter();
            for (int i = 1; i <= 20; i++) {
                Thread.sleep(5000);
                System.out.println("url:"+myConfigCenter.getUrl());
                System.out.println("username:"+myConfigCenter.getUsername());
                System.out.println("password:"+myConfigCenter.getPassword());
                System.out.println("########################################");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    
    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
    
}

分布式唯一id

在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID。

设计思路:

  1. 连接zookeeper服务器
  2. 指定路径生成临时有序节点
  3. 取序列号及为分布式环境下的唯一ID
import java.util.concurrent.CountDownLatch;

import com.wzq.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class GloballyUniqueId implements Watcher {
    
    // zk的连接串
    String IP = "192.168.60.130:2181";
    // 计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    // 用户生成序号的节点
    String defaultPath = "/uniqueId";
    // 连接对象
    ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 捕获事件状态
            if (event.getType() == Event.EventType.None) {
                if (event.getState() == KeeperState.SyncConnected) {
                    System.out.println("连接成功");
                    countDownLatch.countDown();
                } else if (event.getState() == KeeperState.Disconnected) {
                    System.out.println("连接断开!");
                } else if (event.getState() == KeeperState.Expired) {
                    System.out.println("连接超时!");
                    // 超时后服务器端已经将连接释放,需要重新连接服务器端
                    zooKeeper = new ZooKeeper(IP, 6000,
                            new ZKConnectionWatcher());
                } else if (event.getState() == KeeperState.AuthFailed) {
                    System.out.println("验证失败!");
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    // 构造方法
    public GloballyUniqueId() {
        try {
            //打开连接
            zooKeeper = new ZooKeeper(IP, 5000, this);
            // 阻塞线程,等待连接的创建成功
            countDownLatch.await();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    // 生成id的方法
    public String getUniqueId() {
        String path = "";
        try {
            //创建临时有序节点
            path = zooKeeper.create(defaultPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        // /uniqueId0000000001
        return path.substring(9);
    }

    public static void main(String[] args) {
        GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
        for (int i = 1; i <= 5; i++) {
            String id = globallyUniqueId.getUniqueId();
            System.out.println(id);
        }
    }

}

分布式锁

分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具ZooKeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如何实现排他锁。

设计思路:

  1. 每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock000000001
  2. 客户端取得/Locks下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
  3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock000000002,那么则监听Lock000000001
  4. 当前一位锁节点(Lock000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock000000002)的逻辑
  5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Title:Zookeeper实现分布式锁
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/4
 */
public class MyLock {

    //  zk的连接串
    String IP = "192.168.60.130:2181";
    //  计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    //ZooKeeper配置信息
    ZooKeeper zooKeeper;

    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    private String lockPath;

    // 打开zookeeper连接
    public MyLock() {
        try {
            zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.None) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            System.out.println("连接成功!");
                            countDownLatch.countDown();
                        }
                    }
                }
            });
            countDownLatch.await();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    //获取锁
    public void acquireLock() throws Exception {
        //创建锁节点
        createLock();
        //尝试获取锁
        attemptLock();
    }

    //创建锁节点
    private void createLock() throws Exception {
        //判断Locks是否存在,不存在创建
        Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // 创建临时有序节点
        lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("节点创建成功:" + lockPath);
    }

    //监视器对象,监视上一个节点是否被删除
    Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                synchronized (this) {
                    notifyAll();
                }
            }
        }
    };

    //尝试获取锁
    private void attemptLock() throws Exception {
        // 获取Locks节点下的所有子节点
        List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
        // 对子节点进行排序
        Collections.sort(list);
        // /Locks/Lock_000000001
        int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
        if (index == 0) {
            System.out.println("获取锁成功!");
            return;
        } else {
            // 上一个节点的路径
            String path = list.get(index - 1);
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
            if (stat == null) {
                attemptLock();
            } else {
                synchronized (watcher) {
                    watcher.wait();
                }
                attemptLock();
            }
        }

    }

    //释放锁
    public void releaseLock() throws Exception {
        //删除临时有序节点
        zooKeeper.delete(this.lockPath, -1);
        zooKeeper.close();
        System.out.println("锁已经释放:" + this.lockPath);
    }

    public static void main(String[] args) {
        try {
            MyLock myLock = new MyLock();
            myLock.createLock();
        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }
}
/**
 * Title:售票案例,测试分布式锁
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2021/2/4
 */
public class TicketSeller {

    private void sell(){
        System.out.println("售票开始");
        // 线程随机休眠数毫秒,模拟现实中的费时操作
        int sleepMillis = 5000;
        try {
            //代表复杂逻辑执行了一段时间
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("售票结束");
    }

    public void sellTicketWithLock() throws Exception {
        MyLock lock = new MyLock();
        // 获取锁
        lock.acquireLock();
        sell();
        //释放锁
        lock.releaseLock();
    }

    public static void main(String[] args) throws Exception {
        //多线程
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    TicketSeller ticketSeller = new TicketSeller();
                    ticketSeller.sellTicketWithLock();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

集群搭建

单机环境下,jdk、zookeeper 安装完毕,基于一台虚拟机,进行zookeeper伪集群搭建,zookeeper集群中包含3个节点,节点对外提供服务端口号分别为2181、2182、2183

  1. 基于zookeeper-3.4.10复制三份zookeeper安装好的服务器文件,目录名称分别为zookeeper2181、zookeeper2182、zookeeper2183

    cp ‐r zookeeper‐3.4.10 zookeeper2181
    cp ‐r zookeeper‐3.4.10 zookeeper2182 
    cp ‐r zookeeper‐3.4.10 zookeeper2183
    
  2. 修改zookeeper2181服务器对应配置文件

    #服务器对应端口号 
    clientPort=2181 
    #数据快照文件所在路径 
    dataDir=/home/zookeeper/zookeeper2181/data 
    #集群配置信息 
    #server.A=B:C:D 
    #A:是一个数字,表示这个是服务器的编号
    #B:是这个服务器的ip地址 
    #C:Zookeeper服务器之间的通信端口 
    #D:Leader选举的端口,投票
    server.1=192.168.60.130:2287:3387 
    server.2=192.168.60.130:2288:3388 
    server.3=192.168.60.130:2289:3389
    
  3. 在上一步dataDir 指定的目录下,创建 myid 文件,然后在该文件添加上一步server 配置的对应 A 数字。

    #zookeeper2181对应的数字为1 
    #/home/zookeeper/zookeeper2181/data目录下执行命令 
    echo "1" > myid
    
  4. zookeeper2182、zookeeper2183参照步骤2/3进行相应配置

  5. 分别启动三台服务器,检验集群状态

    登录命令:

    ./zkCli.sh ‐server 192.168.60.130:2181 
    ./zkCli.sh ‐server 192.168.60.130:2182 
    ./zkCli.sh ‐server 192.168.60.130:2183
    

API连接集群

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
  • connectionString - zooKeeper集合主机。
  • sessionTimeout - 会话超时(以毫秒为单位)。
  • watcher - 实现“监视器”界面的对象。ZooKeeper集合通过监视器对象返回连接状态。
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;

public class ZookeeperConnection {
    
    public static void main(String[] args) {
        try {
            // 计数器对象
            CountDownLatch countDownLatch=new CountDownLatch(1);
            // arg1:服务器的ip和端口
            // arg2:客户端与服务器之间的会话超时时间  以毫秒为单位的
            // arg3:监视器对象
            ZooKeeper zooKeeper=new ZooKeeper("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()==Event.KeeperState.SyncConnected) {
                        System.out.println("连接创建成功!");
                        countDownLatch.countDown();
                    }
                }
            });
            // 主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
            // 会话编号
            System.out.println(zooKeeper.getSessionId());
            zooKeeper.close();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

zab协议

一致性协议**:zab**协议,跟redis数据区分,redis主节点写数据后返回确认给客户端,AP,之后再同步,Zookeeper则是半数以上的从节点写成功后才返回确认给客户端,CP。

zab协议 的全称是 Zookeeper Atomic Broadcast (zookeeper原子广播)。zookeeper 是通过 zab协议来保证分布式事务的最终一致性

基于zab协议,zookeeper集群中的角色主要有以下三类,如下表所示:

在这里插入图片描述

zab广播模式工作原理,通过类似两阶段提交协议的方式解决数据一致性:

在这里插入图片描述

  1. leader从客户端收到一个写请求
  2. leader生成一个新的事务并为这个事务生成一个唯一的ZXID
  3. leader将这个事务提议(propose)发送给所有的follows节点
  4. follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack给leader
  5. 当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请求
  6. 当follower收到commit请求时,从历史队列中将事务请求commit

读数据则是每个节点都有一个数据副本,都可以读,都最新。

#bin目录下查看节点的身份
./zkServer.sh status
Mode leader
Mode follower

leader选举

服务器状态

  • looking:寻找leader状态。当服务器处于该状态时,它会认为当前集群中没有leader,因此需要进入leader选举状态。
  • leading: 领导者状态。表明当前服务器角色是leader。
  • following: 跟随者状态。表明当前服务器角色是follower。
  • observing:观察者状态。表明当前服务器角色是observer。

服务器启动时期的leader选举

在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都试图找到leader,于是进入leader选举过程。选举过程如下:

  1. 每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。

  2. 集群中的每台服务器接收来自集群中各个服务器的投票。

  3. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行pk,pk规则如下:

    3-1,优先检查zxid。zxid比较大的服务器优先作为leader。

    3-2,如果zxid相同,那么就比较myid。myid较大的服务器作为leader服务器。

    对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

  4. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于server1、server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了leader

  5. 改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是follower,那么就变更为following,如果是leader,就变更为leading。

服务器运行时期的Leader选举

在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader服务器宕机或新加入,此时也不会影响leader,但是一旦leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本一致。

假设正在运行的有server1、server2、server3三台服务器,当前leader是server2,若某一时刻leader挂了,此时便开始Leader选举。选举过程如下:

  1. 变更状态。leader挂后,余下的服务器都会将自己的服务器状态变更为looking,然后开始进入leader选举过程。
  2. 每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器。
  3. 接收来自各个服务器的投票。与启动时过程相同。
  4. 处理投票。与启动时过程相同,此时,server3将会成为leader。
  5. 统计投票。与启动时过程相同。
  6. 改变服务器的状态。与启动时过程相同。

observer角色及其配置

observer角色特点:

  1. 不参与集群的leader选举
  2. 不参与集群中写数据时的ack反馈

为了使用observer角色,在任何想变成observer角色的配置文件中加入如下配置:

peerType=observer

并在所有server的配置文件中,配置成observer模式的server的那行配置追加:observer,例如:

server.3=192.168.60.130:2289:3389:observer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/137573.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp 之 接入小程序客服

目录 前言 小程序客服 代码只需要一步 配置也需要一步​​​​​​​ 前言 小程序客服 因老大 看到别人家有在线客服这个功能&#xff0c;就让我也做一个&#xff0c;这个功能很简单 效果图1 代码只需要一步 <button type"default" open-type"con…

MATLAB | 绘图复刻(六) | 分组环形热图

有粉丝问我Ecology Letters, (2021) 24: 1018–1028 Soil carbon persistence governed by plant input and mineral protection at regional and global scales 这篇文章中的Figure 2咋画&#xff0c;原图长这样&#xff1a; 复刻效果&#xff1a; 完整步骤 0 数据定义 按…

node.js创建网站实例1

1.node.js安装 我的电脑环境&#xff1a;win10 网址&#xff1a;https://nodejs.org/en/ 我下载了18.12.1版本 一路next默认安装&#xff0c;安装完成后&#xff0c;运行cmd&#xff0c;查看版本号 会同时安装npm&#xff0c;也可以同时查看版本号 2.创建第一个网站实例hell…

内卷对于2022是一种无奈,也是一种修行

其实我们谁也不知道2023年对于我们普通的开发人员来说会有什么样的试炼&#xff0c;因为2022年身边有太多的人&#xff0c;为了工作&#xff0c;为了生活&#xff0c;为了家庭&#xff0c;为了理想&#xff0c;不得不选择走向别人看似很卷的那条路。 对于我们周围的人来说&…

【Vim】基本操作及命令集详解

概述 Vim 是从 vi 发展出来的一个文本编辑器。vi 内置在Linux系统中&#xff0c;是vim的简化版编辑器&#xff0c;vim则需要进行安装使用。Vim代码补全、编译及错误跳转等方便编程的功能特别丰富&#xff0c;可以实现高效率移动和高效的输入&#xff0c;在程序员中被广泛使用。…

CPT203-Software Engineering(3)

文章目录9. Software Design9.1 Architecture Design9.1.1 Architectural patterns9.2 Component-level Design9.2.1 Component9.2.2 Views of component9.2.3 Component-level design process9.3 User Interface Design9.3.1 Interface Design Process9.3.2 Interface Design …

蓝桥杯Python练习题16-最大最小公倍数

资源限制   内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述   已知一个正整数N&#xff0c;问从1~N中任选出三个数&#xff0c;他们的最小公倍数最大可以为多少。 输入格式   输入一…

三维数学(一)

视频教程&#xff1a;https://www.bilibili.com/video/BV12s411g7gU?p155 向量 一个数字列表&#xff0c;表示各个维度上的有向位移&#xff1b;同时也是一个有大小有方向的物理量&#xff0c;大小及向量的模长&#xff0c;而方向即空间中向量的指向&#xff0c;可以表示物体…

TikTok Shop 越南站点收入已达Lazada 的 80%

让我们一起来看看今日都有哪些新鲜事吧&#xff01;01 TikTok Shop 越南站点收入已达Lazada 的 80% 据越南电商平台数据分析软件Metric.vn 统计&#xff0c;Shopee、Lazada、Tiki 和 Sendo 仍然主导着越南电子商务市场&#xff0c;1-11 月&#xff0c;共销售了 13 亿件产品。其…

简化开发小技巧-Mybatis-Plus的使用和常用操作

目录 简介 快速使用 pom 代码 mapper service 使用 常用操作 简单或操作查询 多条件或查询 更新字段为null 方法一&#xff0c;如果要更新的字段是String类型&#xff0c; 方法二&#xff0c; 使用mybatis-plus的字段注入。 方法三&#xff0c;使用UpdateWrapper…

基于R的Bilibili视频数据建模及分析——预处理篇

基于R的Bilibili视频数据建模及分析——预处理篇 文章目录基于R的Bilibili视频数据建模及分析——预处理篇0、写在前面1、项目介绍1.1 项目背景1.2 数据来源1.3 数据集展示2、数据预处理2.1 删除空数据2.2 增加id字段2.3 处理数值字段3、参考资料0、写在前面 实验环境 Python版…

Stable Diffusion背后原理(Latent Diffusion Models)

前言 2023年第一篇博客&#xff0c;大家新年好呀~ 这次来关注一下Stable Diffusion背后的原理&#xff0c;即 High-Resolution Image Synthesis with Latent Diffusion Models 这篇论文。 之前关注的那些工作只能工作到 256256256 \times 256256256 像素(resize成这个后才输…

设计模式简介

一、设计模式简介 编写软件过程中&#xff0c;程序员面临着来自耦合性&#xff0c;内聚性 以及可维护性&#xff0c;可扩展性&#xff0c;重用性&#xff0c;灵活性等多方面的挑战&#xff0c;设计模式是为了让程序&#xff08;软件&#xff09;&#xff0c;具有更好的&#xf…

04SpringCloudAlibaba服务注册中心—Consul

目录 Consul简介 Consul是什么What is Consul? | Consul by HashiCorp Consul能做什么 Consul下载&#xff1a;Downloads | Consul by HashiCorp Consul使用&#xff1a;Spring Cloud Consul 中文文档 参考手册 中文版 安装并运行Consul 1、官网安装说明&#xff1a;In…

开发板测试手册——系统启动、文件传送操作步骤详解(1)

目 录 前 言 4 1 评估板快速测试 5 1.1 系统启动测试 5 1.2 文件传送测试 11 1.2.1 通过 Linux 系统启动卡 11 1.2.2 通过 OpenSSH 12 1.3 LED 测试 15 1.4 KEY 测试 15 1.5 DDR 读写测试 16 1.6 SD 卡读写测试 17 1.7 eMMC 读写测试 18 前 言 本指导文档适用开发…

2022 CSDN年度报告已出炉

2022年已过&#xff0c;我们迎来了2023年&#xff0c;那么在2022年&#xff0c;你在CSDN平台都做了些什么&#xff0c;收获了什么呢&#xff1f;2022 CSDN年度报告已出炉&#xff0c;来看看你的2022年度报告吧。 点此查看2022年度报告 或扫码查看你的2022 CSDN年度报告哦&…

23种设计模式(三)——策略模式【组件协作】

文章目录意图什么时候使用策略真实世界类比策略模式的实现策略模式的优缺点亦称&#xff1a;Strategy 意图 定义了一组策略&#xff0c;分别在不同类中封装起来&#xff0c;每种策略都可以根据当前场景相互替换&#xff0c;从而使策略的变化可以独立于操作者。比如我们要去某个…

Type-challenges: tuple to object / union

type-challenges/README.zh-CN.md at main type-challenges/type-challenges GitHub https://github.com/TIMPICKLE/type-challenges/blob/main/questions/00010-medium-tuple-to-union/README.zh-CN.md lets do it , mate! 首先概念澄清&#xff1a; JS&#xff1a; con…

TKE 注册节点,IDC 轻量云原生上云的最佳路径

林顺利&#xff0c;腾讯云原生产品经理&#xff0c;负责分布式云产品迭代和注册节点客户扩展&#xff0c;专注于云原生混合云新形态的推广实践。 背景 企业在持续业务运维过程中&#xff0c;感受到腾讯云 TKE 带来的便捷性和极致的使用体验&#xff0c;将新业务的发布以及老业…

回答问题】ChatGPT上线了!给我推荐20个比较流行的图像全景分割模型

给我推荐20个比较流行的图像全景分割模型 好的&#xff0c;这是 20 个比较流行的图像全景分割模型&#xff1a;DeepLabv3 PSPNet FCN U-Net Mask R-CNN UNet SegNet ENet ICNet ENet GCN DANet BiSeNet DenseASPP DenseASPPv2 DenseASPPv3 DenseASPPv4 DenseASPPv5 DenseASPPv…