Zookeeper基础操作

news2024/11/16 9:44:22

搭建Zookeeper服务器

windows下部署

下载地址: https://mirrors.cloud.tencent.com/apache/zookeeper/zookeeper-3.7.1/

修改配置文件
在这里插入图片描述

  • 打开conf目录,将 zoo_sample.cfg复制一份,命名为 zoo.cfg
  • 打开 zoo.cfg,修改 dataDir路径,新增日志 dataLogDir路径

dataDir=…/data
dataLogDir=…/log

zoo.cfg 配置文件说明

 # zookeeper时间配置中的基本单位 (毫秒)
 tickTime=2000
 # 允许follower初始化连接到leader最大时长,它表示tickTime时间倍数 即:initLimit*tickTime
 initLimit=10
 # 允许follower与leader数据同步最大时长,它表示tickTime时间倍数 
 syncLimit=5
 #zookeper 数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中)
 dataDir=/tmp/zookeeper
 #对客户端提供的端口号
 clientPort=2181
 #单个客户端与zookeeper最大并发连接数
 maxClientCnxns=60
 # 保存的数据快照数量,之外的将会被清除
 autopurge.snapRetainCount=3
 #自动触发清除任务时间间隔,小时为单位。默认为0,表示不自动清除。
 autopurge.purgeInterval=1

启动Zookeeper
在这里插入图片描述

linux下部署

前提:由于zookeeper是使用java语言开发的,所以,在安装zookeeper之前务必先在本机安装配置好java环境!

  • 上传zookeeper
    在这里插入图片描述
  • 解压zookeeper
    在这里插入图片描述
  • 配置conf
    与windows的差不多

#zookeeper内部的基本单位,单位是毫秒,这个表示一个tickTime为2000毫秒,在zookeeper的其他配置中,都是基于tickTime来做换算的
tickTime=2000
#集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)。
initLimit=10
#syncLimit:集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)
syncLimit=5
#数据存放文件夹,zookeeper运行过程中有两个数据需要存储,一个是快照数据(持久化数据)另一个是事务日志
dataDir=/tmp/zookeeper
#客户端访问端口
clientPort=2181

配置环境变量
vim /etc/profile

export ZOOKEEPER_PREFIX=/root/software/apache-zookeeper-3.7.1-bin
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin

执行下面的命令,使配置生效

source profile

启动服务

zkServer.sh start

可以看到我们的zkServer以及启动好了。
可以查看下启动状态:

zkServer.sh status

客户端连接

zkCli.sh

根目录下有一个自带的/zookeeper子节点,它来保存Zookeeper的配额管理信息,不要轻易删除。
在这里插入图片描述

Zookeeper命令操作

Zookeeper 数据模型
ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
在这里插入图片描述
Zookeeper这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
在这里插入图片描述
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

节点可以分为四大类:

  • PERSISTENT 持久化节点
  • EPHEMERAL 临时节点 :-e
  • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
  • EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

Zookeeper服务端常用命令
在这里插入图片描述
•启动 ZooKeeper 服务

./zkServer.sh start

•查看 ZooKeeper 服务状态

./zkServer.sh status

•停止 ZooKeeper 服务

./zkServer.sh stop 

•重启 ZooKeeper 服务

./zkServer.sh restart 

Zookeeper客户端常用命令

基本CRUD

  • 连接Zookeeper客户端
# 本地连接
zkCli.sh
# 远程连接
zkCli.sh -server ip:2181
  • 断开连接
quit
  • 查看命令帮助
help
  • 显示制定目录下节点
# ls 目录
ls /
  • 创建节点
# create /节点path value
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] create /app1 yuyang123
Created /app1
[zk: localhost:2181(CONNECTED) 2] ls /
[app1, zookeeper]
[zk: localhost:2181(CONNECTED) 3] create /app2
Created /app2
[zk: localhost:2181(CONNECTED) 4] ls /
[app1, app2, zookeeper]
  • 获取节点值
# get /节点path
[zk: localhost:2181(CONNECTED) 15] get /app1
yuyang123
[zk: localhost:2181(CONNECTED) 16] get /app2
null
  • 设置节点值
# set /节点path value
[zk: localhost:2181(CONNECTED) 17] set /app2 yuyang456
[zk: localhost:2181(CONNECTED) 18] get /app2
yuyang456
  • 删除单个节点
# delete /节点path
[zk: localhost:2181(CONNECTED) 19] delete /app2
[zk: localhost:2181(CONNECTED) 20] get /app2
Node does not exist: /app2
[zk: localhost:2181(CONNECTED) 21] ls /
[app1, zookeeper]
  • 删除带有子节点的节点
# deleteall /节点path
[zk: localhost:2181(CONNECTED) 22] create /app1
Node already exists: /app1
[zk: localhost:2181(CONNECTED) 23] create /app1/p1
Created /app1/p1
[zk: localhost:2181(CONNECTED) 24] create /app1/p2
Created /app1/p2
[zk: localhost:2181(CONNECTED) 25] delete /app1
Node not empty: /app1
[zk: localhost:2181(CONNECTED) 26] deleteall /app1
[zk: localhost:2181(CONNECTED) 27] ls /
[zookeeper]

创建临时&顺序节点

  • 创建临时节点 (-e)
    • 临时节点是在会话结束后,自动被删除的
# create -e /节点path value
[zk: localhost:2181(CONNECTED) 29] create -e /app1 yuyang123
Created /app1
[zk: localhost:2181(CONNECTED) 30] get /app1
yuyang123
[zk: localhost:2181(CONNECTED) 31] quit

# 退出后再次连接,临时节点已经删除
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
  • 创建顺序节点 (-s)
    • 创建出的节点,根据先后顺序,会在节点之后带上一个数值,越后执行数值越大,适用于分布式锁的应用场景- 单调递增.
# create -s /节点path value
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] create -s /app2
Created /app20000000003
[zk: localhost:2181(CONNECTED) 2] ls /
[app20000000003, zookeeper]
[zk: localhost:2181(CONNECTED) 3] create -s /app2 
Created /app20000000004
[zk: localhost:2181(CONNECTED) 4] ls /
[app20000000003, app20000000004, zookeeper]
[zk: localhost:2181(CONNECTED) 5] create -s /app2 
Created /app20000000005
[zk: localhost:2181(CONNECTED) 6] ls /
[app20000000003, app20000000004, app20000000005, zookeeper]

# 创建临时顺序节点
[zk: localhost:2181(CONNECTED) 7] create -es /app3
Created /app30000000006
[zk: localhost:2181(CONNECTED) 8] ls /
[app20000000003, app20000000004, app20000000005, app30000000006, zookeeper]
# 退出
[zk: localhost:2181(CONNECTED) 9] quit

# 重新链接,临时顺序节点已经被删除
[zk: localhost:2181(CONNECTED) 0] ls /
[app20000000003, app20000000004, app20000000005, zookeeper]
  • 查询节点详细信息
# ls –s /节点path 
[zk: localhost:2181(CONNECTED) 5] ls / -s
[app20000000003, app20000000004, app20000000005, zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x14
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 4
  • czxid:节点被创建的事务ID
  • ctime: 创建时间
  • mzxid: 最后一次被更新的事务ID
  • mtime: 修改时间
  • pzxid:子节点列表最后一次被更新的事务ID
  • cversion:子节点的版本号
  • dataversion:数据版本号
  • aclversion:权限版本号
  • ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
  • dataLength:节点存储的数据的长度
  • numChildren:当前节点的子节点个数

Zookeeper JavaAPI操作

Curator介绍
Curator是Netflix公司开源的一套zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如Leader选举、分布式锁等,减少了技术人员在使用Zookeeper时的底层细节开发工作。

Curator框架主要解决了三类问题:

  • 封装ZooKeeper Client与ZooKeeper Server之间的连接处理(提供连接重试机制等)。
  • 提供了一套Fluent风格的API,并且在Java客户端原生API的基础上进行了增强(创捷多层节点、删除多层节点等)。
  • 提供ZooKeeper各种应用场景(分布式锁、leader选举、共享计数器、分布式队列等)的抽象封装。

引入Curator

<!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

建立连接

方式1

public class CuratorTest {

    /**
     * 建立连接
     */
    @Test
    public void testConnect(){

        /**
         * String connectString     连接字符串。 zk地址和端口: "192.168.58.100:2181,192.168.58.101:2181"
         * int sessionTimeoutMs     会话超时时间 单位ms
         * int connectionTimeoutMs  连接超时时间 单位ms
         * RetryPolicy retryPolicy  重试策略
         */
        //1. 第一种方式

        //重试策略 baseSleepTimeMs 重试之间等待的初始时间,maxRetries 重试的最大次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);

        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.100:2181", 60 * 1000,
                15 * 1000, retryPolicy);

        //开启连接
        client.start();

    }
}

重试策略

  • RetryNTimes: 重试没有次数限制
  • RetryOneTime:只重试没有次数限制,一般也不常用
  • ExponentialBackoffRetry: 只重试一次的重试策略

方式2

public class CuratorTest {

    private CuratorFramework client;

    /**
     * 建立连接
     */
    @Test
    public void testConnect(){

        /**
         * String connectString     连接字符串。 zk地址和端口: "192.168.58.100:2181,192.168.58.101:2181"
         * int sessionTimeoutMs     会话超时时间 单位ms
         * int connectionTimeoutMs  连接超时时间 单位ms
         * RetryPolicy retryPolicy  重试策略
         */
        //1. 第一种方式

        //重试策略 baseSleepTimeMs 重试之间等待的初始时间,maxRetries 重试的最大次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);

//      client   = CuratorFrameworkFactory.newClient("192.168.58.100:2181", 60 * 1000,
//                15 * 1000, retryPolicy);

        //2. 第二种方式,建造者方式创建
        client = CuratorFrameworkFactory.builder()
                .connectString("192.168.58.100:2181")
                .sessionTimeoutMs(60*1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("yuyang")  //根节点名称设置
                .build();

        //开启连接
        client.start();
    }
}
  • 添加节点
    修改testConnect注解,@Before
 /**
     * 建立连接
     */
    @Before
    public void testConnect()

创建节点:create 持久 临时 顺序 数据

public class CuratorTest {
    /**
     * 创建节点 create 持久 临时 顺序 数据
     */
    //1.创建节点
    @Test
    public void testCreate1() throws Exception {

        // 如果没有创建节点,没有指定数据,则默认将当前客户端的IP 作为数据存储
        String path = client.create().forPath("/app1");
        System.out.println(path);
    }

    @After
    public void close(){
        client.close();
    }
}

	//2.创建节点 带有数据
    @Test
    public void testCreate2() throws Exception {
        String path = client.create().forPath("/app2","hehe".getBytes());
        System.out.println(path);
    }
    
    //3.设置节点类型 默认持久化
    @Test
    public void testCreate3() throws Exception {
        //设置临时节点
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
        System.out.println(path);
    }
    
//1.查询数据 getData
@Test
public void testGet1() throws Exception {
    byte[] data = client.getData().forPath("/app1");
    System.out.println(new String(data));
}

//2.查询子节点 getChildren()
@Test
public void testGet2() throws Exception {
    List<String> path = client.getChildren().forPath("/");
    System.out.println(path);
}

//3.查询节点状态信息
@Test
public void testGet3() throws Exception {
    Stat status = new Stat();
    System.out.println(status);
    //查询节点状态信息: ls -s
    client.getData().storingStatIn(status).forPath("/app1");
    System.out.println(status);
}
  • 修改节点
    //1. 基本数据修改
    @Test
    public void testSet() throws Exception {
        client.setData().forPath("/app1","hahaha".getBytes());
    }

    //根据版本修改(乐观锁)
    @Test
    public void testSetVersion() throws Exception {
        //查询版本
        Stat status = new Stat();
        //查询节点状态信息: ls -s
        client.getData().storingStatIn(status).forPath("/app1");
        int version = status.getVersion();
        System.out.println(version);  //2

        client.setData().withVersion(version).forPath("/app1","hehe".getBytes());
    }
  • 删除节点
  //1.删除单个节点
    @Test
    public void testDelete1() throws Exception {

        client.delete().forPath("/app4");
    }

    //删除带有子节点的节点
    @Test
    public void testDelete2() throws Exception {

        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }

    //必须删除成功(超时情况下,重试删除)
    @Test
    public void testDelete3() throws Exception {

        client.delete().guaranteed().forPath("/app2");
    }

    //回调 删除完成后执行
    @Test
    public void testDelete4() throws Exception {

        client.delete().guaranteed().inBackground((curatorFramework, curatorEvent) -> {
            System.out.println("我被删除了");
            System.out.println(curatorEvent);
        }).forPath("/app1");
    }

Watch事件监听

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
在这里插入图片描述
zkCli客户端使用watch
添加 -w 参数可实时监听节点与子节点的变化,并且实时收到通知。非常适用保障分布式情况下的数据一至性。

其使用方式如下

命令描述
ls -w path监听子节点的变化(增,删) [监听目录]
get -w path监听节点数据的变化
stat -w path监听节点属性的变化

Zookeeper事件类型

  • NodeCreated: 节点创建
  • NodeDeleted: 节点删除
  • NodeDataChanged:节点数据变化
  • NodeChildrenChanged:子节点列表变化
  • DataWatchRemoved:节点监听被移除
  • ChildWatchRemoved:子节点监听被移除

1)get -w path 监听节点数据变化
2) ls -w /path 监听子节点的变化(增,删) [监听目录]
3) ls -R -w /path 例子二 循环递归的监听

curator客户端使用watch

ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐。

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache : 只是监听某一个特定的节点
  • PathChildrenCache : 监控一个ZNode的子节点.
  • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

1)watch监听 NodeCache

public class CuratorWatchTest {
		/**
     * 演示 NodeCache : 给指定一个节点注册监听
     */
    @Test
    public void testNodeCache() throws Exception {
        //1. 创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client, "/app1");  //监听的是 /yuyang和其子目录app1

        //2. 注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了。。。。。。");
                //获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //3. 设置为true,开启监听
        nodeCache.start(true);
        while(true){

        }
    } 
}

2)watch监听 PathChildrenCache

    /**
     * 演示 PathChildrenCache: 监听某个节点的所有子节点
     */
    @Test
    public void testPathChildrenCache() throws Exception {

        //1.创建监听器对象 (第三个参数表示缓存每次节点更新后的数据)
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);

        //2.绑定监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("子节点发生变化了。。。。。。");
                System.out.println(pathChildrenCacheEvent);

                if(PathChildrenCacheEvent.Type.CHILD_UPDATED == pathChildrenCacheEvent.getType()){
                    //更新子节点
                    System.out.println("子节点更新了!");
                    //在一个getData中有很多数据,我们只拿data部分
                    byte[] data = pathChildrenCacheEvent.getData().getData();
                    System.out.println("更新后的值为:" + new String(data));

                }else if(PathChildrenCacheEvent.Type.CHILD_ADDED == pathChildrenCacheEvent.getType()){
                    //添加子节点
                    System.out.println("添加子节点!");
                    String path = pathChildrenCacheEvent.getData().getPath();
                    System.out.println("子节点路径为: " + path);

                }else if(PathChildrenCacheEvent.Type.CHILD_REMOVED == pathChildrenCacheEvent.getType()){
                    //删除子节点
                    System.out.println("删除了子节点");
                    String path = pathChildrenCacheEvent.getData().getPath();
                    System.out.println("子节点路径为: " + path);
                }
            }
        });

        //3. 开启
        pathChildrenCache.start();

        while(true){

        }
    }

事件对象信息分析

PathChildrenCacheEvent{
	type=CHILD_UPDATED, 
	data=ChildData
	{
		path='/app2/m1', 
		stat=164,166,1670114647087,1670114698259,1,0,0,0,3,0,164, 
		data=[49, 50, 51]
	}
}

在这里插入图片描述
3)watch监听 TreeCache

TreeCache相当于NodeCache(只监听当前结点)+ PathChildrenCache(只监听子结点)的结合版,即监听当前和子结点。

  /**
     * 演示 TreeCache: 监听某个节点的所有子节点
     */
    @Test
    public void testCache() throws Exception {

        //1.创建监听器对象
        TreeCache treeCache = new TreeCache(client, "/app2");

        //2.绑定监听器
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("节点变化了");
                System.out.println(treeCacheEvent);

                if(TreeCacheEvent.Type.NODE_UPDATED == treeCacheEvent.getType()){
                    //更新节点
                    System.out.println("节点更新了!");
                    //在一个getData中有很多数据,我们只拿data部分
                    byte[] data = treeCacheEvent.getData().getData();
                    System.out.println("更新后的值为:" + new String(data));

                }else if(TreeCacheEvent.Type.NODE_ADDED == treeCacheEvent.getType()){
                    //添加子节点
                    System.out.println("添加节点!");
                    String path = treeCacheEvent.getData().getPath();
                    System.out.println("子节点路径为: " + path);

                }else if(TreeCacheEvent.Type.NODE_REMOVED == treeCacheEvent.getType()){
                    //删除子节点
                    System.out.println("删除节点");
                    String path = treeCacheEvent.getData().getPath();
                    System.out.println("删除节点路径为: " + path);
                }
            }
        });

        //3. 开启
        treeCache.start();

        while(true){

        }
    }

一次性监听方式:Watcher
利用 Watcher 来对节点进行监听操作,可以典型业务场景需要使用可考虑,但一般情况不推荐使用。

public class CuratorWatchTest {
	@Autowired
    private CuratorFramework client;

    /**
     * 建立连接
     */
    @Before
    public void testConnect(){

        /**
         * String connectString,  连接字符串 zk地址 端口: "192.168.58.100:2181,,,,"
         * int sessionTimeoutMs,  会话超时时间
         * int connectionTimeoutMs,  连接超时时间
         * RetryPolicy retryPolicy   重试策略
         */
        //1. 第一种方式
        RetryPolicy retryPolicy =new ExponentialBackoffRetry(3000,10);

        //2. 第二种方式
        client = CuratorFrameworkFactory.builder()
                .connectString("192.168.58.100:2181")
                .sessionTimeoutMs(60*1000)
                .connectionTimeoutMs(15*1000)
                .retryPolicy(retryPolicy)
                .namespace("yuyang")  //当前程序创建目录的根目录
                .build();

        client.start();
    }

    /**
     * 演示一次性监听
     */
    @Test
    public  void testOneListener() throws Exception {

        byte[] data = client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听器 watchedEvent: " + watchedEvent);
            }
        }).forPath("/test");

        System.out.println("监听节点内容:" + new String(data));

        while(true){

        }
    }
    @After
    public void close(){
        client.close();
    }
}

上面这段代码对 /test 节点注册了一个 Watcher 监听事件,并且返回当前节点的内容。后面进行两次数据变更,实际上第二次变更时,监听已经失效,无法再次获得节点变动事件了

Curator事件监听机制
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐。

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache : 只是监听某一个特定的节点
  • PathChildrenCache : 监控一个ZNode的子节点.
  • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

事务&异步操作演示

CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个 ZooKeeper 事务。

可以复合create、 setData、 check、and/or delete 等操作然后调用 commit() 作为一个原子操作提交。

 /**
	* 事务操作
	*/	
@Test
public void TestTransaction() throws Exception {

  //1. 创建Curator对象,用于定义事务操作
  CuratorOp createOp = client.transactionOp().create().forPath("/app3", "app1-data".getBytes());
  CuratorOp setDataOp = client.transactionOp().setData().forPath("/app2", "app2-data".getBytes());
  CuratorOp deleteOp = client.transactionOp().delete().forPath("/app2");

  //2. 添加事务操
  Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp);

  //3. 遍历事务操作结果
  for (CuratorTransactionResult result : results) {
    System.out.println(result.getForPath() + " - " + result.getType());
  }
}

异步操作

前面提到的增删改查都是同步的,但是 Curator 也提供了异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。

BackgroundCallback 接口中一个重要的回调值为 CuratorEvent,里面包含事件类型、响应码和节点的详细信息。

// 异步操作
    @Test
    public void TestAsync() throws Exception {
        while(true){
            // 异步获取子节点列表
            GetChildrenBuilder builder = client.getChildren();
            builder.inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    System.out.println("子节点列表:" + curatorEvent.getChildren());
                }
            }).forPath("/");
            TimeUnit.SECONDS.sleep(5);
        }
    }

Zookeeper权限控制

zk权限控制介绍

Zookeeper作为一个分布式协调框架,内部存储了一些分布式系统运行时的状态的数据,比如master选举、比如分布式锁。对这些数据的操作会直接影响到分布式系统的运行状态。因此,为了保证zookeeper中的数据的安全性,避免误操作带来的影响。Zookeeper提供了一套ACL权限控制机制来保证数据的安全。

ACL权限控制,使用:scheme:id:perm来标识。

  • Scheme(权限模式),标识授权策略
  • ID(授权对象)
  • Permission:授予的权限

ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,每个znode支持设置多种权限控制方案和多个权限,子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点。

Scheme 权限模式

Zookeeper提供以下权限模式,所谓权限模式,就是使用什么样的方式来进行授权。

  • world: 默认方式,相当于全部都能访问。

  • auth:代表已经认证通过的用户

    cli中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户

  • digest:即用户名:密码这种方式认证,这也是业务系统中最常用的。

    username:password 字符串来产生一个MD5串,然后该串被用来作为ACL ID。认证是通过明文发送username:password 来进行的,当用在ACL时,表达式为username:base64 ,base64是password的SHA1摘要的编码。

  • ip:通过ip地址来做权限控制

    比如 ip:192.168.1.1 表示权限控制都是针对这个ip地址的。也可以针对网段 ip:192.168.1.1/24,此时addr中的有效位与客户端addr中的有效位进行比对。

ID 授权对象

指权限赋予的用户或一个指定的实体,不同的权限模式下,授权对象不同。

Id ipId = new Id("ip", "192.168.58.100");
Id ANYONE_ID_UNSAFE = new Id("world", "anyone");

3.4 Permission权限类型

指通过权限检查后可以被允许的操作,create /delete /read/write/admin

  • Create 允许对子节点Create 操作
  • Read 允许对本节点GetChildren 和GetData 操作
  • Write 允许对本节点SetData 操作
  • Delete 允许对子节点Delete 操作
  • Admin 允许对本节点setAcl 操作

权限模式(Schema)和授权对象主要用来确认权限验证过程中使用的验证策略:

比如ip地址、digest:username:password,匹配到验证策略并验证成功后,再根据权限操作类型来决定当前客户端的访问权限。

在控制台实现操作

在Zookeeper中提供了ACL相关的命令

getAcl        getAcl <path>     读取ACL权限
setAcl        setAcl <path> <acl>     设置ACL权限
addauth      addauth <scheme> <auth>     添加认证用户

1)word方式
创建一个节点后默认就是world模式

[zk: localhost:2181(CONNECTED) 6] create /auth
Created /auth

[zk: localhost:2181(CONNECTED) 7] getAcl /auth
'world,'anyone
: cdrwa

[zk: localhost:2181(CONNECTED) 8] create /auth2
Created /auth2

[zk: localhost:2181(CONNECTED) 9] getAcl /auth2
'world,'anyone
: cdrwa

[zk: localhost:2181(CONNECTED) 10] 

其中, cdrwa,分别对应 create . delete read write admin

2)IP方式

在ip模式中,首先连接到zkServer的命令需要使用如下方式

zkCli.sh -server 127.0.0.1:2181 

接着按照IP的方式操作如下

[zk: 127.0.0.1:2181(CONNECTED) 0] create /ip-model
Created /ip-model

[zk: 127.0.0.1:2181(CONNECTED) 1] setAcl /ip-model ip:127.0.0.1:cdrwa

[zk: 127.0.0.1:2181(CONNECTED) 3] getAcl /ip-model
'ip,'127.0.0.1
: cdrwa

3) Auth模式

auth模式的操作如下。

[zk: 127.0.0.1:2181(CONNECTED) 5] create /spike
Created /spike

[zk: 127.0.0.1:2181(CONNECTED) 6] addauth digest spike:123456

[zk: 127.0.0.1:2181(CONNECTED) 9] setAcl /spike auth:spike:cdrwa

[zk: 127.0.0.1:2181(CONNECTED) 10] getAcl /spike
'digest,'spike:pPeKgz2N9Xc8Um6wwnzFUMteLxk=
: cdrwa

当我们退出当前的会话后,再次连接,执行如下操作,会提示没有权限

[zk: localhost:2181(CONNECTED) 0] get /spike
Insufficient permission : /spike 

这时候,我们需要重新授权。

[zk: localhost:2181(CONNECTED) 1] addauth digest spike:123456
[zk: localhost:2181(CONNECTED) 2] get /spike
null 

**4) Digest模式 **

使用语法,会发现使用方式和Auth模式相同

setAcl /digest digest:用户名:密码:权限

但是有一个不一样的点,密码需要用加密后的,否则无法被识别。

密码: 用户名和密码加密后的字符串。

使用下面程序生成密码

public class TestAcl {

    @Test
    public void createPw() throws NoSuchAlgorithmException {
        String up = "yuyang:yuyang";
        byte[] digest = MessageDigest.getInstance("SHA1").digest(up.getBytes());
        String encodeStr = Base64.getEncoder().encodeToString(digest);
        System.out.println(encodeStr);
    }
}

得到: 5FAC7McRhLdx0QUWsfEbK8pqwxc=

再回到client上进行如下操作

[zk: localhost:2181(CONNECTED) 14] create /digest
Created /digest

[zk: localhost:2181(CONNECTED) 15] setAcl /digest digest:yuyang:5FAC7McRhLdx0QUWsfEbK8pqwxc=:cdrwa

[zk: localhost:2181(CONNECTED) 16] getAcl /digest
'digest,'yuyang:5FAC7McRhLdx0QUWsfEbK8pqwxc=: cdrwa

当退出当前会话后,需要再次授权才能访问**/digest**节点

[zk: localhost:2181(CONNECTED) 0] get /digest
Insufficient permission : /digest

[zk: localhost:2181(CONNECTED) 1] addauth digest yuyang:yuyang

[zk: localhost:2181(CONNECTED) 2] get /digest
null

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/827568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

0802|IO进程线程day5 作业(打印时钟在终端上,若终端输入quit,结束时钟)

作业1&#xff1a;守护进程 守护进程的创建&#xff08;5步&#xff09;&#xff1a; 创建孤儿进程&#xff1a;所有工作都在子进程中执行&#xff0c;从形式上脱离终端控制。 fork(), 退出父进程 创建新的会话组&#xff1a;使子进程完全独立出来&#xff0c;防止兄弟进程对其…

蓝桥杯上岸必背!!!(并查集补更)

蓝桥杯上岸必背&#xff01;&#xff01;&#xff01;(并查集补更) 大家好 我是寸铁&#x1f4aa; 冲刺蓝桥杯省一模板大全来啦 &#x1f525; 蓝桥杯4月8号就要开始了 &#x1f64f; 距离蓝桥杯省赛倒数第3天 ❗️ ❗️ ❗️ 还没背熟模板的伙伴们背起来 &#x1f4aa; &…

git 常用命令有哪些

Git 是我们开发工作中使用频率极高的工具&#xff0c;下面总结下他的基本指令有哪些&#xff0c;顺便温习一下。 前言 一般项目中长存2个分支&#xff1a; 主分支&#xff08;master&#xff09; 和开发分支&#xff08;develp&#xff09; 项目存在三种短期分支 &#xff1a…

大数据技术之Clickhouse---入门篇---SQL操作、副本

星光下的赶路人star的个人主页 积一勺以成江河&#xff0c;累微尘以崇峻极 文章目录 1、SQL操作1.1 Insert1.2 Update 和 Delete1.3 查询操作1.4 alter操作1.5 导出数据 2、副本2.1 副本写入流程2.2 配置步骤 1、SQL操作 基本上来说传统关系型数据库&#xff08;以 MySQL 为例…

忘记数据库密码如何处理

windows 5.6.51版本及以前 #当前账号设置密码 set password password(123456); #当前账号取消密码 set password ; &#xff08;1&#xff09;用管理员身份打开控制台输入 net stop m5&#xff08;我的电脑MySQL名字为m5&#xff0c;根据自己的更改&#xff09; &#xff08;…

爱尔眼科四川省区“同心博爱 光明工程”“西部健康公益行”炉霍站启动

8月1日&#xff0c;“同心博爱 光明工程”“西部健康公益行”炉霍站出征仪式在四川爱尔眼科医院隆重举行。 此次公益活动由民革成都市委会、中共锦江区委统战部指导&#xff0c;如意树爱心促进会主办&#xff0c;民革锦江区总支部、爱尔眼科四川省区支持&#xff0c;四川爱尔眼…

Linux系统CPU和磁盘性能进程分析工具pidstat

一、pidstat对CPU的分析 Linux 上的pidstat(1)工具按进程或线程打印CPU 用量&#xff0c;包括用户态和系统态时间的分解。默认情况下&#xff0c;仅循环输出活动进程的信息。例如&#xff1a; 这个例子捕捉到了系统备份&#xff0c;包含了tar(1)命令&#xff0c;从文件系统读取…

低通、高通、带通、阻通滤波器

目录 低通、高通、带通、阻通滤波器 低通、高通、带通、带阻滤波器的区别 通俗理解&#xff1a; 1、低通滤波器 2、高通滤波器 3、带通滤波器 4、带阻滤波器 5、全通滤波器 低通、高通、带通、阻通滤波器 低通、高通、带通、带阻滤波器的区别 低通滤波器&#xff1a;只…

运维高级--tomcat和jpress

1. 简述静态网页和动态网页的区别。 静态网页&#xff1a;事先创建好的网页&#xff0c;通常通过HTML、CSS和JavaScript等静态文件组成&#xff0c;不需要和服务器进行交互&#xff0c;加载速度快 动态网页&#xff1a;根据用户需求动态生成网页&#xff0c;动态网页通常使用…

3D软件性能的基准测试工具,侧重于测试处理器和显卡,赶紧试试

如果需要&#xff0c;Cinebench 可帮助您评估计算机执行渲染任务的性能。Cinebench 由 Maxon 开发&#xff0c;利用行业标准基准测试技术来评估您的系统在要求苛刻的图形任务中的性能。 它主要侧重于测试处理器和显卡&#xff0c;以确定其渲染复杂 3D 图像和动画的效率。Cineb…

Spring 容器原始 Bean 是如何创建的?

以下内容基于 Spring6.0.4。 这个话题其实非常庞大&#xff0c;我本来想从 getBean 方法讲起&#xff0c;但一想这样讲完估计很多小伙伴就懵了&#xff0c;所以我们还是一步一步来&#xff0c;今天我主要是想和小伙伴们讲讲 Spring 容器创建 Bean 最最核心的 createBeanInstan…

笔试编程题常用框架/方法

目录 考核方式 ACM模式 JavaScript(V8) JavaScript(Node) 数组 折半 / 二分查找 螺旋矩阵* 前缀和-区间求和 差分数组-区间增减 滑动窗口-子串 链表 双指针&#xff08;快慢指针&#xff09; 有序数组的平方 删除/覆盖数组元素 最小长度的子数组 三数之和abcta…

【RTT驱动框架分析05】-spi驱动框架分析

spi 1.应用层的spi操作 spi 消息结构 struct rt_spi_message {const void *send_buf;//发送数据的缓存void *recv_buf;//接收数据的缓存rt_size_t length;//数据长度struct rt_spi_message *next;//指向下一个消息结构unsigned cs_take : 1;//是否执行获取csunsigned cs_…

SQL-每日一题【1174. 即时食物配送 II】

题目 配送表: Delivery 如果顾客期望的配送日期和下单日期相同&#xff0c;则该订单称为 「即时订单」&#xff0c;否则称为「计划订单」。 「首次订单」是顾客最早创建的订单。我们保证一个顾客只会有一个「首次订单」。 写一条 SQL 查询语句获取即时订单在所有用户的首次订…

零碎小知识点汇总——记录工作中遇到的问题——基础积累

1.npm install安装包时&#xff0c;常用的-S -D有什么区别&#xff1f; 参考链接&#xff1a;https://blog.csdn.net/sunyctf/article/details/127667543 主要的区别就是依赖配置写入package.json文件的位置不同而已 npm install有一个别名&#xff1a;npm i -S:写入dependen…

抢夺本地生活万亿蛋糕:“抖音美团们”的攻防战

有人就有市场&#xff0c;有市场就有竞争。面对本地生活这一大“富矿”&#xff0c;互联网大厂们正在开启一场争夺战。 前有抖音、小红书在团购业务上火速推进&#xff0c;近日&#xff0c;随着拼多多正式上线本地生活入口&#xff0c;这个领域的玩家越来越多。 艾瑞咨询数据…

Linux中的特殊进程(孤儿进程、僵尸进程、守护进程)

一、孤儿进程 1&#xff09;父进程退出&#xff0c;子进程不退出&#xff0c;此时子进程被1号&#xff08;init&#xff09;进程收养&#xff0c;变成孤儿进程。 2&#xff09;孤儿进程会脱离终端控制&#xff0c;且运行在后端&#xff0c;不能用ctrlc杀死后端进程&#xff0c;…

Linux ALSA音频工具aplay、arecord、amixer的使用方法

ALSA 是Advanced Linux Sound Architecture的缩写&#xff0c;先进的Linux音频架构&#xff0c;为Linux操作系统提供音频和MIDI功能。 aplay命令 aplay是播放命令。 rootimx6ul7d:~# aplay -h Usage: aplay [OPTION]... [FILE]...-h, --help help--version …

MyBatis-Plus 和达梦数据库实现高效数据持久化

一、添加依赖 首先&#xff0c;我们需要在项目的 pom.xml 文件中添加 MyBatis-Plus 和达梦数据库的依赖&#xff1a; <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifac…

pyspark_自定义udf_解析json列【附代码】

pyspark_自定义udf_解析json列【附代码】 一、背景&#xff1a;二、调研方案&#xff1a;三、利用Pyspark udf自定义函数实现大数据并行计算整体流程案例代码运行结果&#xff1a;案例代码&#xff1a;代码地址&#xff1a;代码 一、背景&#xff1a; 车联网数据有很多车的时…