Zookeeper基础入门-2【ZooKeeper 分布式锁案例】

news2025/4/22 12:01:53

Zookeeper基础入门-2【ZooKeeper 分布式锁案例】

  • 四、ZooKeeper-IDEA环境搭建
    • 4.1.环境搭建
      • 4.1.1.创建maven工程:zookeeper
      • 4.1.2.在pom文件添加依赖
      • 4.1.3.在项目的src/main/resources 目录下,新建文件为“log4j.properties”
      • 4.1.4.创建包名com.orange.zk
    • 4.2.ZooKeeper 客户端API操作
      • 4.2.1.初始化ZooKeeper对象
      • 4.2.2.创建节点
      • 4.2.3.获取子节点并监听节点变化
      • 4.2.4.判断节点Node是否存在
    • 4.3.客户端向服务端写数据流程
      • 4.3.1.写流程之写入请求直接发送给Leader节点
      • 4.3.2.写流程之写入请求发送给follower节点
  • 五、服务器动态上下线监听案例
    • 5.1.需求
    • 5.2.需求分析--服务器动态上下线
    • 5.3.具体实现
      • 5.3.1.先在集群上创建/servers 节点
      • 5.3.2.创建包名:com.orange.zkcase1
      • 5.3.3.服务器端向Zookeeper注册代码
      • 5.3.4.客户端代码
    • 5.4.测试
      • 5.4.1.在Linux 命令行上操作增加减少服务器
        • 5.4.1.1.启动DistributeClient 客户端
        • 5.4.1.2.在host128 上zk 的客户端/servers 目录上创建临时带序号节点
        • 5.4.1.3.观察Idea 控制台变化
        • 5.4.1.4.执行删除操作
        • 5.4.1.5.观察Idea 控制台变化
      • 5.4.2.在Idea 上操作增加减少服务器
        • 5.4.2.1.启动DistributeClient 客户端(如果已经启动过,不需要重启)
        • 5.4.2.2.启动DistributeServer 服务
          • 5.4.2.2.1.点击Edit Configurations…
          • 5.4.2.2.2.在弹出的窗口中(Program arguments)输入想启动的主机,例如,host130
          • 5.4.2.2.3.回到DistributeServer的main方法,右 键,在弹出的窗口中点击 Run “DistributeServer.main()”
          • 5.4.2.2.4.观察DistributeServer 控制台
          • 5.4.2.2.5.观察DistributeClient 控制台
  • 六、ZooKeeper 分布式锁案例
    • 6.1.分布式锁
    • 6.2.ZooKeeper分布式锁原理
    • 6.3.分布式锁案例分析
    • 6.3.原生Zookeeper 实现分布式锁案例
      • 6.3.1.分布式锁实现
      • 6.3.2.分布式锁测试
        • 6.3.2.1.创建两个线程
        • 6.3.2.2.观察控制台变化
    • 6.4.Curator框架实现分布式锁案例
      • 6.4.1.Curator有五种锁方案:
      • 6.4.1.原生的Java API 开发存在的问题
      • 6.4.2.Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题
      • 6.4.3.Curator 案例实操
        • 6.4.3.1.添加依赖
        • 6.4.3.2.代码实现
        • 6.4.3.3.控制台变化
  • 七、模拟12306售票案例
    • 7.1.代码实现
    • 7.2.测试
    • 7.3.控制台变化
  • 八、企业面试真题(面试重点)
    • 8.1.选举机制
    • 8.2.生产集群安装多少zk 合适
    • 8.3.常用命令
  • endl

四、ZooKeeper-IDEA环境搭建

保证三台Zookeeper 集群服务端启动

[root@host128 ~]# jpsall
显示集群的所有java进程状态
=============== host128 ===============
66496 Jps
2445 QuorumPeerMain
=============== host129 ===============
66162 Jps
2413 QuorumPeerMain
=============== host130 ===============
65947 Jps
2383 QuorumPeerMain
执行结束

4.1.环境搭建

4.1.1.创建maven工程:zookeeper

4.1.2.在pom文件添加依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>zookeeper-test01</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <properties>
    <maven.complier.source>8</maven.complier.source>
    <maven.complier.target>8</maven.complier.target>
  </properties>

  <dependencies>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.8.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.5.7</version>
    </dependency>

  </dependencies>
</project>

4.1.3.在项目的src/main/resources 目录下,新建文件为“log4j.properties”

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.1.4.创建包名com.orange.zk

在这里插入图片描述

4.2.ZooKeeper 客户端API操作

4.2.1.初始化ZooKeeper对象

/**
 * Description: zookeeper客户端
 */
public class Client {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    /**
     * 初始化ZooKeeper对象,必须要先创建,再进行创建节点,否则报空指针异常
     *
     * @throws IOException
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                
            }
        });
    }

}

4.2.2.创建节点

    @Test
    public void create() throws InterruptedException, KeeperException {
        //"/tang":创建的节点的路径;
        //"t.avi".getBytes():节点里面的值,需要转化为字节传输;
        //ZooDefs.Ids.OPEN_ACL_UNSAFE:权限控制,设置访问权限;
        //CreateMode.PERSISTENT:创建的节点类型,如这个是永久不带序号节点。
        String nodeCreated = zkClient.create("/tang", "t.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
# 指定启动host128的客户端,而不是localhost的
/opt/module/zookeeper-3.5.7/bin/zkCli.sh -server host128:2181
[zk: host128:2181(CONNECTED) 0] ls /
[tang, zookeeper]
[zk: host128:2181(CONNECTED) 1] get -s /tang
t.avi
cZxid = 0x100000002
ctime = Wed Feb 28 12:35:13 CST 2024
mZxid = 0x100000002
mtime = Wed Feb 28 12:35:13 CST 2024
pZxid = 0x100000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

4.2.3.获取子节点并监听节点变化

/**
 * Description: zookeeper客户端
 */
public class Client {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    /**
     * 初始化ZooKeeper对象,必须要先创建,再进行创建节点,否则报空指针异常
     *
     * @throws IOException
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //收到时间通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
                //再次启动监听
                try {
                    System.out.println("===============================");
                    List<String> children = zkClient.getChildren("/", true);

                    for (String child : children) {
                        System.out.println(child);
                    }
                    System.out.println("===============================");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Test
    public void create() throws InterruptedException, KeeperException {
        //"/tang":创建的节点的路径;
        //"t.avi".getBytes():节点里面的值,需要转化为字节传输;
        //ZooDefs.Ids.OPEN_ACL_UNSAFE:权限控制,设置访问权限;
        //CreateMode.PERSISTENT:创建的节点类型,如这个是永久不带序号节点。
        String nodeCreated = zkClient.create("/tang", "t.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * 监听节点变化信息
     */
    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        System.out.println("-----------------------------");
        List<String> children = zkClient.getChildren("/", true);

        for (String child : children){
            System.out.println(child);
        }
        System.out.println("-----------------------------");
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

}
None--null
===============================
tang
zookeeper
-----------------------------
tang
zookeeper
===============================

监听器只能监听一次,如果再发生变化需要重新注册监听器,要想每次节点发生变化都能检测到并且在控制台打印,就在初始化监听器里面再注册一个监听器,每次监听完又马上注册一个新的监听器。

# 指定启动host128的客户端,而不是localhost的
/opt/module/zookeeper-3.5.7/bin/zkCli.sh -server host128:2181
[zk: host128:2181(CONNECTED) 8] create /test01 "test01"
Created /test01
[zk: host128:2181(CONNECTED) 9] create /test02 "test02"
Created /test02
NodeChildrenChanged--/
===============================
tang
zookeeper
test01
===============================
NodeChildrenChanged--/
===============================
tang
test02
zookeeper
test01
===============================

4.2.4.判断节点Node是否存在

    /**
     * 判断节点是否存在
     */
    @Test
    public void exist() throws InterruptedException, KeeperException {
        Stat stat = zkClient.exists("/tang", false);
        System.out.println(stat == null ? "not data" : "exist");
    }

4.3.客户端向服务端写数据流程

4.3.1.写流程之写入请求直接发送给Leader节点

在这里插入图片描述
1.当client向zookeeper的leader上写数据,发送一个写请求

2.这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.

3.当Leader收到半数以上server写成功应答,此时认为写成功,Client会收到Leader写成功应答。

4.3.2.写流程之写入请求发送给follower节点

在这里插入图片描述
1.当client向zookeeper集群的某个server上写数据,发送一个写请求

2.如果接收到请求的不是Leader,那么server会把请求转发给Leader,因为zookeeper的集群中只有一个是Leader,这个Leader会把写请求广播给各个server,当各个server写成功后就会通知Leader.

3.当Leader收到半数以上server写成功应答,此时认为写成功,Leader会告知向他提交申请的server

4.Server会进一步将通知Client写成功, 此时就认为写成功了。

五、服务器动态上下线监听案例

5.1.需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

5.2.需求分析–服务器动态上下线

在这里插入图片描述

5.3.具体实现

5.3.1.先在集群上创建/servers 节点

[zk: host128:2181(CONNECTED) 10] create /servers "servers"
Created /servers

5.3.2.创建包名:com.orange.zkcase1

在这里插入图片描述

5.3.3.服务器端向Zookeeper注册代码

/**
 * Description: 服务端和zookeeper集群创建连接
 */
public class DistributeServer {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        DistributeServer server = new DistributeServer();
        //1.连接zookeeper集群,获取zk连接,创建zk
        server.getConnect();

        //2.注册服务器到zk集群
        server.regist(args[0]);

        //3.启动业务逻辑
        server.business();
    }

    private void business() throws InterruptedException {
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 注册服务器,创建节点
     */
    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zkClient.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online");
    }

    /**
     * 连接上zookeeper集群
     */
    private void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

5.3.4.客户端代码

/**
 * Description: 客户端监听集群节点的动态变化
 */
public class DistributeClient {

    //注意:connectString逗号左右不能有空格,否则连接不上
    private String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private int sessionTimeout = 200000;
    private ZooKeeper zkClient;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();
        //1.获取zk连接
        client.getConnect();

        //2.监听服务器 /servers 下面子节点的增加和删除
        client.getServerList(); //获取servers上的所有节点的上线和下线

        //3.业务逻辑
        client.business();
    }

    private void business() throws InterruptedException {
        //延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        //获取servers下的所有节点信息
        List<String> children = zkClient.getChildren("/servers", true);//对父节点监听

        ArrayList<String> servers = new ArrayList<String>(); //集合用来存所有的服务器节点
        //遍历所有节点  获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zkClient.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        //打印服务器列表信息
        System.out.println(servers);
    }

    // 创建zookeeper客户端
    private void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //收到事件通知后的回调函数(用户的业务逻辑)
                try {
                    //再次启动监听,避免只监听一次
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

5.4.测试

5.4.1.在Linux 命令行上操作增加减少服务器

5.4.1.1.启动DistributeClient 客户端
5.4.1.2.在host128 上zk 的客户端/servers 目录上创建临时带序号节点
[zk: host128:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: host128:2181(CONNECTED) 1] create /servers "servers"
Created /servers
[zk: host128:2181(CONNECTED) 2] create -e -s /servers/host128 "hsot128"
Created /servers/host1280000000000
[zk: host128:2181(CONNECTED) 3] create -e -s /servers/host129 "hsot129"
Created /servers/host1290000000001
[zk: host128:2181(CONNECTED) 4] create -e -s /servers/host130 "hsot130"
Created /servers/host1300000000002
5.4.1.3.观察Idea 控制台变化
[]
[]
[hsot128]
[hsot129, hsot128]
[hsot130, hsot129, hsot128]
5.4.1.4.执行删除操作
[zk: host128:2181(CONNECTED) 6] ls /servers
[host1280000000000, host1290000000001, host1300000000002]
[zk: host128:2181(CONNECTED) 7] delete /servers/host1280000000000
[zk: host128:2181(CONNECTED) 8] delete /servers/host1290000000001
5.4.1.5.观察Idea 控制台变化
[hsot130, hsot129]
[hsot130]

5.4.2.在Idea 上操作增加减少服务器

5.4.2.1.启动DistributeClient 客户端(如果已经启动过,不需要重启)
5.4.2.2.启动DistributeServer 服务
5.4.2.2.1.点击Edit Configurations…

在这里插入图片描述

5.4.2.2.2.在弹出的窗口中(Program arguments)输入想启动的主机,例如,host130

在这里插入图片描述

5.4.2.2.3.回到DistributeServer的main方法,右 键,在弹出的窗口中点击 Run “DistributeServer.main()”

在这里插入图片描述

5.4.2.2.4.观察DistributeServer 控制台
host130 is online
5.4.2.2.5.观察DistributeClient 控制台
#host130 已经上线
[hsot130]

六、ZooKeeper 分布式锁案例

6.1.分布式锁

比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁

在这里插入图片描述

6.2.ZooKeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
    1. 客户端获取锁时,在lock节点下创建临时顺序节点
    1. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除
    1. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器监听删除事件
    1. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

在这里插入图片描述

6.3.分布式锁案例分析

在这里插入图片描述

1)接收到请求后,在/locks节点下创建一个临时顺序节点

2)判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听

3)获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复第二步判断

6.3.原生Zookeeper 实现分布式锁案例

6.3.1.分布式锁实现

package com.orange.zkcase2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Description: zookeeper分布式锁案例
 */
public class DistributedLock {
    //注意:connectString逗号左右不能有空格,否则连接不上
    private final String connectString = "192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181";
    //tickTime为2000,initLimit为10
    //LF初次连接时的超时时间应起码大于延迟时间tickTime*initLimit的值否则会因为超时而连接失败。
    private final int sessionTimeout = 200000;
    private final ZooKeeper zkClient;

    //增加代码健壮性
    //zookeeper连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    //zookeeper等待
    private CountDownLatch waitLatch = new CountDownLatch(1);

    //当前client等待的子节点的路径
    private String waitPath;
    //当前client创建的子节点
    private String currentNode;

    /**
     * 和zk创建连接,并创建根节点
     */
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //1.获取连接 建立服务端与客户端连接
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("-----process-------");
                // connectLatch 如果连接上zk  可以释放
                // 连接建立时, 打开latch, 唤醒wait在该latch上的线程
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // 发生了waitPath的删除事件 需要释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        //等待zookeeper正常连接后,代码才往下继续执行
        connectLatch.await();

        //2.判断根节点 /locks 是否存在
        Stat stat = zkClient.exists("/locks", false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            // 创建根节点,根节点必须是永久节点
            zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    //对zk 加锁
    public void zkLock() {
        try {
            //创建对应的临时带序号临时节点,返回值为创建的节点路径
            currentNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + "当前节点为:" + currentNode);

            //注意, 没有必要监听"/locks"的子节点的变化情况
            //判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听它序号前一个节点
            List<String> children = zkClient.getChildren("/locks", false);
            //如果children只要一个子节点,那就直接获取锁; 如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                System.out.println(Thread.currentThread().getName() + "对zk 加锁, 当前节点:" + currentNode);
                return;
            } else {
                ///对根节点下的所有临时顺序节点进行从小到大排序,有序递增
                Collections.sort(children);
                //获取当前节点名称 seq-00000000
                String thisNode = currentNode.substring("/locks/".length());
                System.out.println(Thread.currentThread().getName() + "当前节点名称为:" + thisNode);
                // 通过seq-00000000 获取该节点在children集合的位置
                int index = children.indexOf(thisNode);
                System.out.println(Thread.currentThread().getName() + "当前节点在集合的位置为:" + index);

                //判断
                if (index == -1) {
                    System.out.println(Thread.currentThread().getName() + "数据异常");
                } else if (index == 0) {
                    //只有一个节点,就可以获取锁了
                    System.out.println(Thread.currentThread().getName() + "对zk 加锁, 当前节点:" + currentNode);
                    return;
                } else {
                    //获得排名比 currentNode 前 1 位的节点
                    waitPath = "/locks/" + children.get(index - 1);
                    System.out.println(Thread.currentThread().getName() + "前一个节点为:" + waitPath);
                    //在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper会回调监听器的 process方法
                    //需要监听 它前一个节点变化
                    zkClient.getData(waitPath, true, null);

                    //入等待锁状态,等待监听
                    waitLatch.await();

                    return;
                }
            }
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    //对zk 解锁
    public void unZkLock() {
        try {
            System.out.println(Thread.currentThread().getName() + "解锁,删除当前节点:" + currentNode);
            //删除节点
            zkClient.delete(currentNode, -1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

6.3.2.分布式锁测试

6.3.2.1.创建两个线程
package com.orange.zkcase2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * Description: 测试分布式锁
 */
public class DistributedLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        // 创建分布式锁
        // final修饰的对象必须被初始化,不能被修改。
        // 非final的对象可以被重新赋值,锁对象就不受管控了。
        // 当一个锁被其他对象占有时,当前线程可以对锁对象重新赋值(相当于从新创建了一个锁对象),从而也拿到了运行的权利。

        //创建分布式锁 1
        final DistributedLock lock1 = new DistributedLock();
        //创建分布式锁 2
        final DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程0 启动,获取到锁");
                    Thread.sleep(5*1000);//延迟5秒

                    lock1.unZkLock();
                    System.out.println("线程0 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                //获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程1 启动,获取到锁");
                    Thread.sleep(5 * 1000);//延迟5秒

                    lock2.unZkLock();
                    System.out.println("线程1 释放锁");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}

6.3.2.2.观察控制台变化
-----process-------
根节点不存在

-----process-------
Thread-1当前节点为:/locks/seq-0000000000
Thread-0当前节点为:/locks/seq-0000000001
Thread-1当前节点名称为:seq-0000000000
Thread-1当前节点在集合的位置为:0
Thread-1对zk 加锁, 当前节点:/locks/seq-0000000000
线程1 启动,获取到锁
Thread-0当前节点名称为:seq-0000000001
Thread-0当前节点在集合的位置为:1
Thread-0前一个节点为:/locks/seq-0000000000
Thread-1解锁,删除当前节点:/locks/seq-0000000000
-----process-------
线程0 启动,获取到锁
线程1 释放锁
Thread-0解锁,删除当前节点:/locks/seq-0000000001
线程0 释放锁

6.4.Curator框架实现分布式锁案例

6.4.1.Curator有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

6.4.1.原生的Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

6.4.2.Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题

官方文档:https://curator.apache.org/index.html

6.4.3.Curator 案例实操

6.4.3.1.添加依赖
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>4.3.0</version>
    </dependency>
6.4.3.2.代码实现
package com.orange.zkcase3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Description: Curator 框架实现分布式锁案例
 */
public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        //创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //获取到锁
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");
                    //测试锁重入
                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(3 * 1000);

                    //释放锁
                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1 再次释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //获取到锁
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");
                    //测试锁重入
                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(3 * 1000);

                    //释放锁
                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2 再次释放锁");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }

    /**
     * 分布式锁初始化
     */
    private static CuratorFramework getCuratorFramework() {
        //重试策略,初试时间 3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂创建Curator
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                //zookeeper server列表
                .connectString("192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181")
                //connection超时时间
                .connectionTimeoutMs(20000)
                //session超时时间
                .sessionTimeoutMs(20000)
                .retryPolicy(policy).build();
        //启动客户端
        client.start();

        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}
6.4.3.3.控制台变化
zookeeper 初始化完成...

zookeeper 初始化完成...

线程1 获取到锁
线程1 再次获取到锁
线程1 释放锁
线程1 再次释放锁
线程2 获取到锁
线程2 再次获取到锁
线程2 释放锁
线程2 再次释放锁

七、模拟12306售票案例

7.1.代码实现

/**
 * Description: 模拟12306售票案例
 */
public class LockTicket implements Runnable {

    private int tickets = 20;//数据库的票数

    private InterProcessMutex lock;

    public LockTicket() {
        //重试策略,初试时间 3秒,重试3次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂创建client客户端对象
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                //zookeeper server列表
                .connectString("192.168.147.128:2181,192.168.147.129:2181,192.168.147.130:2181")
                //connection超时时间
                .connectionTimeoutMs(20000)
                //session超时时间
                .sessionTimeoutMs(20000)
                .retryPolicy(policy)
                .build();
        //启动客户端
        client.start();

        lock = new InterProcessMutex(client, "/locks");
    }

    @Override
    public void run() {
        while (true) {
            try {
                //获取锁
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets > 0) {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + ":" + tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

7.2.测试

/**
 * Description: 模拟12306售票案例
 */
public class LockTicketTest {
    public static void main(String[] args) {
        LockTicket lockTicket=new LockTicket();
        //创建客户端
        Thread t1 = new Thread(lockTicket, "携程");
        Thread t2 = new Thread(lockTicket, "飞猪");

        t1.start();
        t2.start();
    }
}

7.3.控制台变化

飞猪:20
携程:19
飞猪:18
携程:17
飞猪:16
携程:15
飞猪:14
携程:13
飞猪:12
携程:11
飞猪:10
携程:9
飞猪:8
携程:7
飞猪:6
携程:5
飞猪:4
携程:3
飞猪:2
携程:1

八、企业面试真题(面试重点)

8.1.选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则: 投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:

  • EPOCH 大的直接胜出
  • EPOCH 相同,事务 id 大的胜出
  • 事务 id 相同,服务器 id 大的胜出

8.2.生产集群安装多少zk 合适

安装奇数台
生产经验

  • 10 台服务器:3 台zk
  • 20 台服务器:5 台zk
  • 100 台服务器:11 台zk
  • 200 台服务器:11 台zk

服务器台数多:好处,提高可靠性;坏处:提高通信延时

8.3.常用命令

ls、get、create、delete 

endl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1477559.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网站数据加密之Hook通用方案

文章目录 1. 写在前面2. 请求分析3. 编写Hook4. 其他案例 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【作者推荐】&#xff1a;对JS逆向感兴趣的朋…

Python进阶学习:Pickle模块--dump()和load()的用法

Python进阶学习&#xff1a;Pickle模块–dump()和load()的用法 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到您…

【c语言】探索联合和枚举---解锁更多选择

前言 上一篇 讲解的是结构体相关知识&#xff0c;接着本篇主要讲解的是 联合和枚举 相关知识 结构体、联合体和枚举都属于 自定义类型。 那么接下来就跟上我的节奏&#xff0c;准备发车~ 欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#xf…

网络协议:DHCP协议工作原理,DHCP分配方式,DHCP租约,Wireshark抓包分析DHCP报文

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「专栏简介」&#xff1a;此文章已录入专栏《计算机网络零基础快速入门》 DHCP协议 一、简介二、分配方式1&#xff09;自动分配2&#xff0…

C++笔记(六)--- 静态成员变量/函数(static)

目录 C语言中静态变量 C静态成员变量 C静态成员函数 C语言中静态变量 1.函数内部用static修饰的变量&#xff0c;只能在此函数使用&#xff0c;但所修饰的变量不随函数释放而释放&#xff0c;下次调用时的值仍为上次结束时的值2.static修饰的全局变量只能在其定义的文件使用…

DAY9-防病毒AV概述

DNS过滤 URL过滤和DNS过滤对比

clickhouse 大表数据归档处理解决办法

本文作者为 360 奇舞团前端开发工程师 李彬 一、需求场景 基于目前项目在各个应用上的数据采集&#xff0c;尤其是性能监控上的采集&#xff0c;数据量越来越大&#xff0c;例如pv数据的采集表三个月的数据量已经在3亿&#xff0c;数据量小的也有几百万&#xff0c;但是目前的常…

浅谈 Linux fork 函数

文章目录 前言fork 基本概念代码演示示例1&#xff1a;体会 fork 函数返回值的作用示例2&#xff1a;创建多进程&#xff0c;加深对 fork 函数的理解 前言 本篇介绍 fork 函数。 fork 基本概念 pid_t fork(void) fork 的英文含义是"分叉"&#xff0c;在这里就是 …

TypeScript+React Web应用开发实战

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 在现代Web开发中&#xff0c;React和TypeScrip…

嵌入式学习day25 Linux

进程基本概念: 1.进程: 程序&#xff1a;存放在外存中的一段数据组成的文件 进程&#xff1a;是一个程序动态执行的过程,包括进程的创建、进程的调度、进程的消亡 2.进程相关命令: 1.top 动态查看当前系统中的所有进程信息&#xff08;根据CPU占用率排序&a…

54.仿简道云公式函数实战-文本函数-LOWER

1. LOWER函数 将一个文本字符串中的所有大写字母转换为小写字母 2. 函数用法 LOWER(text) 3. 函数示例 将一个文本字符串中的所有大写字母转换为小写字母。 4. 代码实战 首先我们在function包下创建text包&#xff0c;在text包下创建LowerFunction类&#xff0c;代码如下…

Unity中URP实现水体(水的焦散)

文章目录 前言一、原理1、 通过深度图&#xff0c;得到 对应像素 在 世界空间下的Z值2、得到模型顶点在 观察空间 下的坐标3、由以上两点得到 深度图像素 对应的 xyz 值4、最后&#xff0c;转化到 模型本地空间下&#xff0c;用其对焦散纹理采样 二、实现1、获取深度图2、在顶点…

Kali Linux下载与安装

目录 1 kali官网下载镜像文件 2 VMware打开kali linux文件 3 启动kali-linux-2023.4操作系统 1 kali官网下载镜像文件 kali官网&#xff1a;https://www.kali.org/get-kali/#kali-platforms 进入kali官网主页后看到如图所示界面&#xff0c;左边“Installer Images”界面是…

C++:模版初阶 | STL简介

创作不易&#xff0c;感谢支持&#xff01;&#xff01; 一、泛型编程思想 如何实现一个通用的交换函数呢&#xff1f; 注&#xff1a;其实swap函数在C的标准库提供了&#xff0c;不需要自己写&#xff0c;这边只是举个例子 void Swap(int& left, int& right) { in…

Linux和Windows集群中部署HTCondor

目录 1、集群架构 2、HTCondor版本 3、Linux系统安装 3.1、HTCondor安装 3.2、中央管理节点配置 3.3、其他节点配置 4、Windwos系统安装 5、安全配置 6、参考 1、集群架构 操作系统IP地址1*Ubuntu22.04192.168.1.742Ubuntu22.04192.168.1.603Ubuntu22.04192.168.1.6…

Ansys Lumerical | 自发参量下变频 (SPDC) 光子源

附件下载 联系工作人员获取附件 此示例演示了如何对真实的光子源进行建模&#xff0c;并将其用作 qINTERCONNECT 的输入。用直波导中一个长度为L且具有χ ( 2 ) 非线性的局部区域计算I型SPDC过程的光子产生速率和波函数&#xff0c;其中泵浦光子被转换为信号和闲置光子对&…

10.广域网技术

1. PPP实验点这里&#xff08;拓扑代码&#xff09; 2. PPPoE配置实验点这里&#xff08;拓扑代码&#xff09; 目录 一、广域网二、PPP协议三、PPP链路建立过程1-LCP&#xff08;链路协商&#xff09;四、PPP链路建立过程2-PAP/CHAP&#xff08;认证协商&#xff0c;可选&…

基于x86架构的OpenHarmony应用生态挑战赛等你来战!

为了更快速推进OpenHarmony在PC领域的进一步落地&#xff0c;加快x86架构下基于OpenHarmony的应用生态的繁荣&#xff0c;为北向应用开发者提供一个更加便捷的开发环境&#xff0c;推动OpenHarmony北向应用开发者的增加&#xff0c;助力OpenHarmony在PC领域实现新的突破&#x…

工业智能网关的实际应用及其带来的变革-天拓四方

工业智能网关是一种集数据采集、传输、处理和分析于一体的智能化设备。它能够实现对工业现场各种传感器、执行器等设备的数据进行实时采集&#xff0c;并通过网络传输到云端或本地数据中心进行分析处理。同时&#xff0c;工业智能网关还具备边缘计算能力&#xff0c;能够在本地…

六、OpenAI之嵌入式(Embedding)

嵌入模式 学习怎么将文本转换成数字&#xff0c;解锁搜索等案例。 新的嵌入模型 text-embedding-3-small 和 text-embedding-3-large&#xff0c;是目前最新的并且性能最好的嵌入模型&#xff0c;成本低&#xff0c;支持多语言&#xff0c;拥有控制所有大小的新参数 1. 什么是…