你好:Zookeeper

news2025/1/13 6:17:26

Zookeeper 初识

  • 初识Zookeeper
  • Zookeeper 安装
  • Zookeeper 命令操作
    • Zookeeper数据模型
    • 服务端命令
    • 客户端命令
    • JavaAPI操作
      • Curator
        • 常用操作:
        • 前置环境
        • 连接Zookeeper集群
        • 创建节点
        • 查询节点
        • 修改节点
        • 删除节点
        • 事件监听机制 – Watch
        • 分布式锁
          • 实现分布式锁得API


初识Zookeeper

Zookeeper是Hadoop项目下的一个子项目,是一个树形目录服务,翻译过来就是动物管理员,负责管理Hadoop(大象)、Hive(小猪)的管理员,简称zk.Zookeeper是一个分布式的、开源的分布式应用程序的协调服务

ZooKeeper主要功能包括:

  1. 配置管理
  2. 分布式锁
  3. 集群管理

Zookeeper 安装

在这里我已经提前上传好了Zookeeper的安装包,和三台虚拟机,并设置好了SSH免密登录、Jdk以及Hosts,这是一个很重要的前提,主要是为了后续的配置方便

我的host设置如下
image-20230513212202975

我的Zookeeper安装包
image-20230513212615844

正常流程先解压Zookeeper

tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz  

image-20230513213338587

因为个人习惯,我在这里将其移动到了其他位置

mv apache-zookeeper-3.7.1-bin /opt/module/zookeeper

image-20230513213500376

欧克在这里已经复制成功了。我们进入到zookeeper下的conf目录下准备修改配置文件,修改配置文件前,我们先需要修改配置文件的名字,让内部存在一个zoo.cfg,因为zookeeper在启动时会检测这个文件,然后根据这个文件进行配置,但是不建议直接修改,还是复制来的保险,模板文件方便后续恢复什么的

cp zoo_sample.cfg zoo.cfg

复制完成:
image-20230513213949440

现在开始修改Zookeeper的配置文件,zookeeper 的配置文件不多我在这里直接修改

# 心跳检测时间 2s 2 * 1000
tickTime=2000
# 初始化时 连接到服务器端的间隔次数 总时间为 2 * 10 = 20 s
initLimit=10
# ZK Leader 和follower之间通讯的次数 总时间为 5 * 2 = 10s
syncLimit=5
# 这里表示zookeeper存储的位置,默认为/tmp/zookeeper下,这个目录下的数据有可能会在磁盘空间不足或服务器重启时自动被linux清理
# 我提前在/opt/data 下创建了zookeeper文件夹,将数据存放在这里
dataDir=/opt/data/zookeeper
# 指定日志文件存放位置
dataLogDir=/opt/logs/zookeeper
# the port at which the clients will connect
# 这里表示暴露给客户端进行连接的端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=hadoop132-father:2888:3888
server.2=hadoop133:2888:3888
server.3=hadoop134:2888:3888

server.1的说明:

集群模式中, 集群中的每台机器都需要感知其它机器, 在 zoo.cfg 配置文件中, 可以按照如下格式进行配置, 每一行代表一台服务器配置:
server.id=host:port:port

id 被称为 Server ID, 用来标识服务器在集群中的序号。同时每台 ZooKeeper 服务器上, 都需要在数据目录(即 dataDir 指定的目录) 下创建一个 myid 文件, 该文件只有一行内容, 即对应于每台服务器的Server ID。
ZooKeeper 集群中, 每台服务器上的 zoo.cfg 配置文件内容一致。
server.1 的 myid 文件内容就是 “1”。每个服务器的 myid 内容都不同, 且需要保证和自己的 zoo.cfg 配置文件中 “server.id=host:port:port” 的 id 值一致。
id 的范围是 1 ~ 255。

随后我们需要去创建myid 文件,myid文件的位置需要与dataDir指定的目录相同,此外其内容应该与zookeeper中配置的一样
image-20230513221938384

OK 完成,随后就是向相关配置复制过去即可,方式有很多种,我这里采用的scp

scp -r zookeeper root@hadoop133:$PWD
scp -r zookeeper root@hadoop134:$PWD

之后我直接偷懒,不想每次启动zookeeper都到这个文件夹下,我直接配置环境变量

# 编写环境变量
vim /etc/profile
# 在环境变量中追加上这几行
export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf
# 添加完刷新环境变量
source /etc/profile

启动Zookeeper!

zkServer.sh start

一套下来成了嘛?咱们查看一下运行状态
image-20230514080806661

这是没有启动起来嘛?也不是,启动起来了,但是由于咱们是集群模式,我们把另外两个全部启动在进行查看

image-20230514080905656
image-20230514080923891
image-20230514080943588

OK集群搭建结束,下班!


Zookeeper 命令操作

Zookeeper数据模型

Zookeeper是一个属性目录服务,其数据模型和Unix的文件系统目录树很相似,拥有一个层次化结构

这里面的每一个节点都被称为ZNode,每个节点上都会保存自己的数据和节点信息
节点可以拥有子节点,同时也允许少量数据(1M)存储在该节点下
节点可以分为4大类:

  • PERSISTENT 持久化节点
  • EPHEMERAL 临时节点 -e
  • PERSISTENT_SEQUENTIAL 持久化顺序节点 -s
  • EPHEMERAL_SEQUENTIAL 临时顺序节点 -es

image-20230514085901370

服务端命令

服务端命令比较简单 主要是对zookeeper服务进行操作[请注意我这里是配置了环境变量]

# 启动ZK服务
zkServer.sh start
# 查看Zookeeper 服务状态
zkServer.sh stauts
# 停止Zookeeper 服务
zkServer.sh stop
# 重启Zookeeper服务
zkServer.sh restart

客户端命令

连接服务端

zkCli.sh -server IP:PORT

我这里尝试连接hadoop132-father的数据

zkCli.sh -server hadoop132-father:2181

运行结果如下

image-20230514092323408

现在已经连接上了zookeeper

退出

退出就比较简单可以直接输入quit即可,这样子可以直接退出

image-20230514092431725

ok在这里已经退出成功了,当然也可以暴力进行,直接ctrl+c直接退出

查看

查看本质上Linux的命令行基本相似,直接进行即可

ls path

image-20230514093457186

但是区别是我们不能ls zookeeper进行访问,必须是绝对路径,如果我们强行输入:
image-20230514093602787

正确的输入方法:

image-20230514093642284

这里有自动补全机制,所以大家不用担心没有办法偷懒

有时候需要查看详细信息,那么需要进行操作,推荐使用 ls2 path直接进行,但是我在自己尝试的时候发现没有这个命令,搞了半天是被取代了

# 查看详细信息 老版本
ls2 
# 其他方法 我的版本是3.7.1 算是一个比较新的版本了 已经没有ls2命令了
ls -s path

image-20230514102901652

创建

创建就比较简单了直接create

create  path [value]

这里是可以进行赋值的,我们在设置值的时候在进行演示

创建一个节点,名字是wxk
image-20230514094230139
OK 创建成功,再再根节点下创建一个wxk2
image-20230514094316171
wxk2 节点也创建成功。当然我们也可以创建为某个节点创建一个子结点,例如我要在wxk下创建一个子节点 叫做wxk3
image-20230514094502473
在这里也创建成功了

在上文中我们提到了临时节点和顺序节点,那么这个该如何设置?

临时节点创建:

create -e path
# 临时顺序节点
create -es path

创建临时节点:
image-20230514101401592
在这里我们直接看看不出区别,有什么特性也不知道,其实当本次会话结束后,临时节点就会被删除,我们退出重进一下:
image-20230514101507139
现在已经没有了,而我们之前c创建的节点仍然存在,这些仍然存在的节点是持久化节点.现在创建几个顺序节点
image-20230514102049566
现在我们直接重启,看看hu会不会序号是否会发生变化
image-20230514102141462
序号生成时啥样就是啥样不受影响

赋值 与 取值

赋值一般想到的都是set方法,再zookeeper中也是如此,而获取一般都是get,这个也是:

# 赋值 / 更新值
set path value
# 获取值
get path

这里讲wxk2进行赋值,赋值为 this_is_wxk
image-20230514094753185
那么如何更新? 更新无非就是在设置一次嘛,这里将值修改为this_is_xiaoming
image-20230514094902499

删除
删除delete,老方法了,大家都熟悉了

delete path

在这里直接删除wxk这个节点
image-20230514095134279
啥?删除不掉,他说内部不为空,说明delete删除的时候不能够迭代,不能够把有子节点的节点给删除掉,那咱们就先把子节点给删除了试试
image-20230514095256712

作为一个懒蛋,确实不想进去一个一个删,我只想偷懒,OK这里还有一个删除命令:

deleteall path

deleteall可以删除带有子节点的节点,由于wxk下的删完了,咱们创建一个在进行删除:
image-20230514095840770
OK下机

JavaAPI操作

下机15s后重新连接了xdm

Curator

Curator是Zookeeper的Java客户端库,其目标是简化Zookeeper客户端的使用,现在是Apache的顶级项目 地址Apache Curator

常用操作:

建立连接

添加节点

删除节点

修改节点

查询节点

Watch事件监听

前置环境

maven

    <dependencies>
        <!--测试单元-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
            <scope>test</scope>
        </dependency>
        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.3.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

日志文件配置

log4j.rootLogger=off,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

连接Zookeeper集群

@Test
public void testConnect(){
    /*
     * @param connectString       连接的字符串,zk Server的地址和端口,如果是集群那就用逗号隔开
     * @param sessionTimeoutMs    会话超时时间 单位 ms
     * @param connectionTimeoutMs 连接超时时间 单位 ms
     * @param retryPolicy         重试策略
     */
    String hosts = "hadoop132-father:2181,hadoop133:2181,hadoop134:2181";
    int sessionTime = 10 * 1000;
    int connectTime = 10 * 1000;
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
    CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, sessionTime, connectTime, retryPolicy);
    // 启动客户端
    client.start();
    //打印连接的状态
    final CuratorFrameworkState state = client.getState();
    System.out.println(state);
}

运行结果如下:
image-20230514171314085
当然也可以这样子

    @Test
    public void testConnect(){
        /*
         * @param connectString       连接的字符串,zk Server的地址和端口,如果是集群那就用逗号隔开
         * @param sessionTimeoutMs    会话超时时间 单位 ms
         * @param connectionTimeoutMs 连接超时时间 单位 ms
         * @param retryPolicy         重试策略
         */
        String hosts = "hadoop132-father:2181,hadoop133:2181,hadoop134:2181";
        int sessionTime = 10 * 1000;
        int connectTime = 10 * 1000;
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(hosts)
                .sessionTimeoutMs(sessionTime)
                .connectionTimeoutMs(connectTime)
                .retryPolicy(retryPolicy)
                .namespace("wxk") // 设置工作空间 设置之后所有的操作都是以次为根节点
                .build();
        client.start();
        final CuratorFrameworkState state = client.getState();
        System.out.println(state);
    }

二者的效果是相同的

创建节点

关于节点就有很多种情况:临时节点、持久化节点、有序、无需、有值、未设置值多种情况

创建普通的节点

在这里我将展示这个类的方法,后续只展示关键方法

public class ZookeeperTest {
    private  CuratorFramework client;
    @Before
    public void testConnect(){
        /*
         * @param connectString       连接的字符串,zk Server的地址和端口,如果是集群那就用逗号隔开
         * @param sessionTimeoutMs    会话超时时间 单位 ms
         * @param connectionTimeoutMs 连接超时时间 单位 ms
         * @param retryPolicy         重试策略
         */
        String hosts = "hadoop132-father:2181,hadoop133:2181,hadoop134:2181";
        int sessionTime = 10 * 1000;
        int connectTime = 10 * 1000;
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
        // CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, sessionTime, connectTime, retryPolicy);
        client = CuratorFrameworkFactory.builder()
                .connectString(hosts)
                .sessionTimeoutMs(sessionTime)
                .connectionTimeoutMs(connectTime)
                .retryPolicy(retryPolicy)
                .namespace("wxk") // 设置工作空间 设置之后所有的操作都是以次为根节点
                .build();
        client.start();
    }

    @Test
    public void baseCreate() throws Exception {
        //注意 这个forPath还有一个参数可以设置值,在这里是选择的是仅有一个参数的方法
         String s = client.create().forPath("/test");
        System.out.println(s);

    }

    @After
    public void close(){
        if (client != null){
            client.close();
        }
    }
}

运行结果:
image-20230514173239889
在这里不仅创建好了wxk还在下面创建了test这个节点,我们查看这个节点是否有值【在Linux Client客户端上直接创建,如果不赋值,那么就是null】

get /wxk/test # 这里是在zookeeper 中进行的

image-20230514173522055
这里返回的不是null而是我们我们的IP。
如果创建节点,没有指定数据,那么默认将当前客户端的ip作为数据存储

创建节点并赋予数据

    @Test
    public void createNodeDate() throws Exception {
        String s = client.create().forPath("/have_data", "我有数据".getBytes(StandardCharsets.UTF_8));
        System.out.println(s);
    }

运行之后查看数据
image-20230514174231894

创建临时节点,并获取它的值

@Test
public void createNodeMode() throws Exception {
    //创建临时节点
    String s = client.create()
    .withMode(CreateMode.EPHEMERAL)
    .forPath("/have_mode", "我有数据".getBytes(StandardCharsets.UTF_8));
    System.out.println(s);
    // 但是临时节点有个问题就是会话结束就会管理,那么咱们在这里获取一波
    final byte[] bytes = client.getData().forPath("/have_mode");
    String c = new String(bytes);
    System.out.println(c);
}

查看结果:
image-20230516204353360

创建多级节点
我们知道,在Client中不能够创建多级节点,如果创建多级节点,就会直接报错:
image-20230516204536126
但是在Java API就可以实现

    @Test
    public void createNeed() throws Exception {
        final String s = client.create().creatingParentsIfNeeded().forPath("/qqq/ttt");
        System.out.println(s);
    }

查看运行结果:
image-20230516205024328
下载成功

查询节点

查询数据 – – – Get
查询数据使用Get方法,需要注意的是,这里返回的结果是byte数组类型,需要使用new String()进行转换成字符串

    @Test
    public void getData() throws Exception {
        final byte[] bytes = client.getData().forPath("/qqq/ttt");
        System.out.println(new String(bytes));
    }

运行结果如下:
image-20230516205949082

查询子节点
我们在client中可以使用ls / 来查看子节点,那么Java API 如何实现? Java API实现主要是通过getChildern()方法

    @Test
    public void getChildren() throws Exception {
        // 在上文中我们已经传见了/qqq/ttt节点,所以这个查询的结果是ttt
        client.getChildren().forPath("/qqq").forEach(System.out::println);
    }

image-20230516210546144

那么如果直接查询/呢?这个结果是什么?难道是这个?
image-20230516210624730
那么先测试一下:

    @Test
    public void getRootsCh() throws Exception {
        client.getChildren().forPath("/").forEach(System.out::println);
    }

运行结果如下:
image-20230516210725670为什么和我们在客户端查询的不一样? –如果你产生这个疑问,那么就是前面讲的时候没有仔细看,在前面我们已经指定了我们的工作空间是/wxk这个路径,所以我们我们在客户端上输入/其实就是查询/wxk下的路径
image-20230516210930370
你看一毛一样

查询节点的状态

查询节点状态相对来说比较麻烦一点,因为数据封装的比较难顶,这里直接贴方法

    @Test
    public void getState() throws Exception {
        Stat state = new Stat();
        // 先输出一次 便于后续对比
        System.out.println(state);
        client.getData().storingStatIn(state).forPath("/qqq");
        // 第二次进行输出
        System.out.println(state);
    }

运行结果:
image-20230516211440344我们查看查看Stat源码,发现就是对各种方法的一个封装,但是返回的太简陋了
image-20230516211701157

修改节点

修改节点使用的是SetData方法,在这里我修改的是一个通过API创建的值,这个值在默认情况下是IP,这里我将IP修改成其他值

    @Test
    public void changeData() throws Exception {
        client.setData().forPath("/qqq/ttt", "我不是IP".getBytes(StandardCharsets.UTF_8));
    }

image-20230516214004134

根据版本进行修改
我们在进行修改操作的时候,如果并发度比较低基本上都美有问题,但是如果并发度比较高,一个数据可能有多个线程需要进行修改,该怎么办?好比Java 中原子类的保证原子的方式:判断当前版本号与需要修改时的版本号是否一致,如果一致,那么直接进行修改,如果不一致,则直接抛出异常

    @Test
    public void changeByVersion() throws Exception {
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/qqq/ttt");
        System.out.println("version = " + stat.getVersion());
        // 100 是我随便输入的 还有就是这个的修改不到100次
        client.setData().withVersion(100).forPath("/qqq/ttt", "我就是IP".getBytes(StandardCharsets.UTF_8));
    }

运行结果:

image-20230516220417758
直接就给你说 :版本号不对

删除节点

普通的删除

    @Test
    public void deleteNode() throws Exception {
        client.delete().forPath("/node1");
    }

尝试是否能直接删除父节点(下面存在子节点)

    @Test
    public void deleteNode() throws Exception {
        client.delete().forPath("/qqq");
    }

运行结果:
image-20230516220915768
节点不为空,直接删除失败

迭代删除

这个用法与上文的创建多级节点相似,

    @Test
    public void deleteFor() throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath("/qqq");
    }

这里运行成功,在Client中进行检查
image-20230516221135103
qqq节点已经被删除了

必须成功的删除

@Test
public void deleteMustSuc() throws Exception {
    client.delete().guaranteed().forPath("/");
}

回调删除

    @Test
    public void deleteCall() throws Exception {
        client.delete().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("被删除了:-(");
                System.out.println(event);
            }
        }).forPath("/test");
        
    }

运行结果
image-20230516222016644
相关的数据都被封装在了event中

事件监听机制 – Watch

Zookeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将时间通知到感兴趣的客户端上,该机制是Zookeeper实现分布式协调服务的重要特性。Zookeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态发生变化,会通知所有的订阅者

Zookeeper引入了三种watcher:

  • NodeCache:只是监听某一特定的节点
  • PathChildrenCache:监听一个ZNode的节点的子节点
  • TreeCache:可以监听某个节点及其子节点,相当于是NodeCache 和PathChildrenCache的组合

但是很可惜,我采用的是高版本得curator,我的是5.0,这三个类已经被弃用了,这里直接采用CuratorCache来实现:

    @Test
    public void NodeCacheTest() {
        CuratorCache cache = CuratorCache.build(client, "/");
        cache.listenable().addListener(new CuratorCacheListener() {
            @Override
            public void event(Type type, ChildData oldData, ChildData data) {
                System.out.println(type.name());
                switch (type.name()) {
                    case "NODE_CREATED": //新节点被创建
                        if (data != null) {
                            System.out.println("创建了节点:" + data.getPath());
                        }
                        break;
                    case "NODE_CHANGED": // 节点被修改
                        if (oldData != null) {
                            System.out.println("修改前得数据: " + new String(oldData.getData()) + " , 修改后得数据为" + new String(data.getData()));
                        } else {
                            System.out.println("这是第一次修改数据,修改后得数据为" + new String(data.getData()));
                        }
                        break;
                    case "NODE_DELETED": // 节点被删除
                        System.out.println("节点{path='" + oldData.getPath() + "'} 已被删除");
                        break;
                    default:
                        break;
                }
            }
        });
        cache.start();
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

直接全部管用:
image-20230517111604150

分布式锁

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点

  1. 客户端获取锁的时候,在lock节点下创建临时顺序节点
  2. 然后获取lock下面得所有子节点,客户端获取到所有得子节点,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,使用完后就将这个锁给删除
  3. 如果发现自己创建的节点并非是所有子节点中最小得,说明自己还没有获得到锁,此时客户端需要找到比自己小得那个节点,同时对其注册事件监听器,监听删除时间
  4. 如果发现比自己小的那个节点被删除,则客户端watcher会受到相应的通知,此时在判断自己创建的节点是否是lock子结点中最小得,如果是,则获取到了锁,如果不是则重复上述步骤继续获取到比自己小的节点并进行监听

image-20230517113337366

实现分布式锁得API

一共有5中方案:

  1. InterProcessSemaphoreMutex: 分布式排他锁(非可重入锁)
  2. InterProcessMutex:分布式可重入排他锁
  3. InterProcessReadWriteLock:分布式读写锁
  4. InterProcessMultiLock:将多个锁作为单个实体管理的容器
  5. InterProcessSemaphoreV2:共享信号量

在这里进行一个简单的举例:这是一个与票相关得类

package wxk.test;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;

/**
 * @author wxk
 * @date 2023/05/18/15:15
 */
public class Trick12306 implements Runnable{
    private int tickets=10;
    private InterProcessMutex lock;
    private CuratorFramework client;
    public Trick12306(){
        String hosts = "hadoop132-father:2181,hadoop133:2181,hadoop134:2181";
        int sessionTime = 10 * 1000;
        int connectTime = 10 * 1000;
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client = CuratorFrameworkFactory.builder()
                .connectString(hosts)
                .sessionTimeoutMs(sessionTime)
                .connectionTimeoutMs(connectTime)
                .retryPolicy(retryPolicy)
                .build();
        client.start();


        lock = new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {
        while (true){
            //获取锁
            try {
                lock.acquire(5,TimeUnit.SECONDS);
                if (tickets > 0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(e);
            }finally{
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

下面是售票得方法:

package wxk.test;

import org.junit.Test;

/**
 * @author wxk
 * @date 2023/05/18/15:17
 */
public class LockTest {
    public static void main(String[] args) {
        Trick12306 ticket = new Trick12306();
        Thread t1 =new Thread(ticket,"携程");
        Thread t2 =new Thread(ticket,"飞猪");
        t1.start();
        t2.start();
    }
}

运行结果如下:
image-20230518154325244
虽然说有报错,但是不影响正常使用,这里的报错是因为没有拿到锁但是进行了所释放操作。我们查看我们的数据正常输出,没有发生超卖显现,如果想避免报错,可以将时间设置的稍微长一些,下面是我将时间修改成10s得输出结果
image-20230518154606471

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541158.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

排序篇:外排序(排序文件中的数据)

目录 前言&#xff1a; 一&#xff1a;大体思路 二&#xff1a;分割成有序的小文件 (1)先给代码 (2)解析 三&#xff1a;进行文件归并 (1)主逻辑 (2)归并两个有序文件 四&#xff1a;全部代码 前言&#xff1a; 如果要排序的数据量非常大&#xff0c;内存无法容纳&…

零基础自学软件测试我只用了105天,从月薪3000到15K, 我整理的超全学习指南!

我21年毕业于管理类专业&#xff0c;干了大半年行政打杂&#xff0c;工作平淡无快感。性格较内向&#xff0c;思维严谨独立。喜欢软件测试工作内容的系统性&#xff0c;技术性&#xff0c;丰富性。以上便是转行的最初心理&#xff0c;大家觉得可妥&#xff1f;说干就干去年6月底…

(前期准备工作2)白嫖Replit的免费云服务器搭建属于自己的应用

Replit介绍 Replit(原来是https://repl.it)是一个基于浏览器的云端协同开发平台,可用于构建开发环境、实时协作、托管网络应用等。Replit提供可创建动态或者静态网站的容器,并会自动生成免费https域名(格式为:项目名.用户名.http://repl.co)。这代表着任何人都可以试用…

Kafka的工作原理

一、Kafka是什么&#xff1f; Kafka是一个分布式的基于发布/订阅模式的消息队列。分布式消息队列可以看成是将这种先进先出的数据结构独立部署在服务器上&#xff0c;应用程序可以通过远程访问接口使用它。 二、Kafka的工作机制是什么&#xff1f; 1.基本概念 2.消息模型 发…

WiFi基础学习到实战(六:Beacon帧字段解析)

欢迎大家一起学习探讨通信之WLAN。上节我们基于Android设备分析了WiFi扫描的代码实现&#xff0c;具体执行WiFi网络扫描由WiFi模块实现。WLAN协议定义扫描方式有“被动扫描”和“主动扫描”。本节继续分析“被动扫描”依赖Beacon帧中的字段。 好。我们先来看Android11 WiFi扫描…

会计转行数据分析,可行性多高?

看到这样的问题&#xff0c;第一个想法是想劝退&#xff0c;毕竟通过不明真相的网友身上找自己的未来&#xff0c;这件事听着就不靠谱。转行难&#xff0c;转行做好更好&#xff0c;虽然会计也与数据有关&#xff0c;但是数据分析涉及的技术内容明显有很大的差别。所以&#xf…

重塑职业未来:在竞争激烈的职场上脱颖而出的关键策略

在竞争激烈的职场上&#xff0c;各种职场难题时常出现&#xff0c;如何进行有效沟通、如何应对工作压力、如何提升职业能力等&#xff0c;这都是需要去克服的问题。下面分享一下职场老人的经验&#xff01; 一、你遇到过哪些职场问题&#xff1f;分享一下你是怎么解决的呢&…

Type-C边充电边OTG转接器方案

随着生活水平的提高&#xff0c;大家的电子设备也多了起来&#xff0c;更有甚者会凑齐“全家桶”&#xff0c;手机&#xff0c;平板&#xff0c;笔记本电脑&#xff0c;智能手表&#xff0c;无线耳机&#xff0c;Switch&#xff0c;PS5&#xff0c;一样不落。那么多的电子设备&…

hibernate入门项目(一)

本节我们将演示如何搭建一个 Hibernate 工程。 搭建 Hibernate 工程需要以下 7 步&#xff1a; 下载 Hibernate 开发包 新建工程 创建数据库表 创建实体类 创建映射文件 创建 Hibernate 核心配置文件 测试 1. 下载 Hibernate 开发包 浏览器访问 Hibernate 官网 下载 Hibern…

520告白日!小红书关键词热度查询,今年的心动密码是什么?

520&#xff0c;又是一个有爱的日子&#xff0c;人们借机表达爱意的日子&#xff0c;品牌不会错过的好时机。今年520什么东西比较热呢&#xff1f;消费者比较关注什么&#xff1f;品牌有什么样动作&#xff1f;下面&#xff0c;借助小红书关键词热度查询、热词排行榜&#xff0…

基于C3D卷积神经网路的动作识别

对于基于视频分析的问题&#xff0c;2D卷积&#xff08;卷积核为二维&#xff09;不能很好得捕获时序上的信息&#xff0c;因此《3D convolutional neural networks for human action recognition》 这片论文提出了3D卷积并用于行为识别的&#xff0c;论文中将其用于行为识别&a…

商家中心之java商城 开源java电子商务Spring Cloud+Spring Boot+mybatis+MQ+VR全景+b2b2c

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…

cpp test

1. 以下程序在linux 64位系统的输出结果&#xff08; &#xff09; #include <stdio.h> int main(void) {int buf[100] { 0 };printf("%d,%d,%d,%d,%d",sizeof(int), sizeof(long long), sizeof(buf),sizeof(buf)/sizeof(buf[0]), sizeof(&buf));retur…

【Linux Network】网络层协议——IP

目录 网络层 IP协议 基本概念 协议头格式 网段划分 特殊的IP地址 IP地址的数量限制 私有IP地址和公网IP地址 路由 路由表生成算法 Linux网络编程&#x1f337; 网络层 在复杂的网络环境中确定一个合适的路径&#xff1b; IP协议 基本概念 主机 : 配有 IP 地址 , 但是不进行路…

前端必学,crud,magic-Api

CRUD还要后端写&#xff1f; 前端自己搞定&#xff0c;只需要会写sql就行。 文档写的比我写的好太多&#xff0c;直接看文档 创建springBoot&#xff08;springBoot版本要小于3.0&#xff09; 引入pom <?xml version"1.0" encoding"UTF-8"?> &…

接口自动化——har 生成用例

这里写目录标题 一、目标二、应用场景三、Har 简介四、实现思路五、模板技术六、模版技术-环境安装&#xff08;Python&#xff09;七、har 生成用例实现思路1、python模板模板文件生成的测试文件 2、java模板模板文件生成的测试文件 3、httprunner模板模板文件生成的测试文件 …

数据库界的科技与狠活: 创邻科技Galaxybase X英特尔SGX数据加密解决方案正式发布

引言 近日&#xff0c;创邻科技入选与英特尔合作&#xff0c;在基于第四代英特尔至强处理器的支持下&#xff0c;利用软件防护扩展&#xff08;Software Guard Extension,SGX&#xff09; 技术&#xff0c;打造出了具备可信执行环境的图数据库产品&#xff0c;保护企业释放关联…

STM32单片机多功能电子秤点数秤食物热量卡路里称重

实践制作DIY- GC0132-多功能电子秤 一、功能说明&#xff1a; 基于STM32单片机设计-多功能电子秤 二、功能介绍&#xff1a; STM32F103C系列最小系统lcd1602HX7115Kg电子秤去皮键模式选择按键重量设置键上键下键 有3种模式普通模式、点数模式、卡路里模式。通过模式选…

Requests-get方法的使用

Requests-get方法使用 打开网页使用代码获取页面内容查看结果页面格式修改 爬取书名完整代码以及注释代码注释 翻页查询所有 以https://books.toscrape.com/网站为例&#xff1a; 打开网页 先把网页打开&#xff0c;然后右键检查&#xff0c;找到网络一栏&#xff0c;这个时候…

【Android】【Java】【每日练手1】字符串的二维码生成器

文章目录 一、需求二、创建android stdio工程三、设置好JDK四、创建activity五、编写activity_main.xml布局六、build.gradle增加zxing依赖七、MainActivity 一、需求 一个界面&#xff0c;界面上一个文本输入框、一个按钮、一个二维码显示view。可在文本输入框输入字符串&…