Curator分布式锁

news2024/9/19 10:34:00

系列文章目录


文章目录

  • 系列文章目录
  • 前言


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。
在这里插入图片描述
zookeeper安装单机模式

http://www.javacui.com/opensource/445.html

SpringBoot集成Curator实现Zookeeper基本操作

http://www.javacui.com/tool/615.html

SpringBoot集成Curator实现Watch事件监听

http://www.javacui.com/tool/616.html

Zookeeper实现分布式锁的机制

使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。

创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。

比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。

锁分类

InterProcessSemaphoreMutex:分布式不可重入排它锁

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量

Shared Lock 分布式非可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-lock.html

InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。

Shared Reentrant Lockf分布式可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

此锁可以重入,但是重入几次需要释放几次。

InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

Shared Reentrant Read Write Lock可重入读写锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html

读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

Multi Shared Lock 多共享锁

官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html

多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。

Shared Semaphore共享信号量

官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html

一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。

有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。

如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。

acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。

编码测试

package com.example.springboot;
 
import com.example.springboot.tool.ZkConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
 
/**
 * @Auther: Java小强
 * @Date: 2022/2/4 - 19:33
 * @Decsription: com.example.springboot
 * @Version: 1.0
 */
@SpringBootTest(classes = Application.class)
public class CuratorTest {
    @Autowired
    private ZkConfiguration zk;
 
    // 共享信号量,多个信号量
    @Test
    public void testInterProcessSemaphoreV22() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3);
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取2个许可
                    Collection<Lease> acquire = semaphore.acquire(2);
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnAll(acquire);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 共享信号量
    @Test
    public void testInterProcessSemaphoreV2() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建一个信号量, Curator 以公平锁的方式进行实现
        final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1);
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取一个许可
                    Lease lease = semaphore.acquire();
                    System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    semaphore.returnLease(lease);
                    System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 多重共享锁
    @Test
    public void testInterProcessMultiLock() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 可重入锁
        final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock");
        // 不可重入锁
        final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock");
        // 创建多重锁对象
        final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    // 获取参数集合中的所有锁
                    lock.acquire();
 
                    // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入
                    System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock1 是可重入锁, 所以可以继续获取锁
                    System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS));
                    // interProcessLock2 是不可重入锁, 所以获取锁失败
                    System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 分布式读写锁
    @Test
    public void testReadWriteLock() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 创建共享可重入读写锁
        final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock");
        final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    locl1.writeLock().acquire(); // 获取锁对象
                    System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    locl1.readLock().acquire(); // 获取读锁,锁降级
                    System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    locl1.readLock().release();
                    System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
                    locl1.writeLock().release();
                    System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock2.writeLock().acquire(); // 获取锁对象
                    System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock2.readLock().acquire(); // 获取读锁,锁降级
                    System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock2.readLock().release();
                    System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
                    lock2.writeLock().release();
                    System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
    }
 
    // 分布式可重入排它锁
    @Test
    public void testInterProcessMutex() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 分布式可重入排它锁
        final InterProcessLock lock = new InterProcessMutex(client, "/lock");
        final InterProcessLock lock2 = new InterProcessMutex(client, "/lock");
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    lock.acquire(); // 测试锁重入
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    lock.acquire(); // 测试锁重入
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(1 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
//        顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁
    }
 
 
    // 分布式不可重入排它锁
    @Test
    void testInterProcessSemaphoreMutex() throws Exception {
        CuratorFramework client = zk.curatorFramework();
        // 分布式不可重入排它锁
        final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock");
        final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock");
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();
                    lock.acquire(); // 获取锁对象
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    // 测试锁重入
                    Thread.sleep(2 * 1000);
                    lock.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    String threadName = Thread.currentThread().getName();
                    lock2.acquire();
                    System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
                    Thread.sleep(2 * 1000);
                    lock2.release();
                    System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
 
        while (true) {
        }
//        顺序不一定,但是必须是获取后再释放其他线程才能获取到锁
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1666972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实现二叉树的基本操作

博主主页: 码农派大星. 关注博主带你了解更多数据结构知识 1我们先来模拟创建一个二叉树 public class TestBinaryTreee {static class TreeNode{public char val;public TreeNode left;public TreeNode right;public TreeNode(char val) {this.val val;}}public TreeNode …

Web入门——三栏布局页面

前置知识 内外边距 内边距(padding)&#xff1a; padding是元素边框与其内容之间的空间。也就是说&#xff0c;如果你给一个元素设置了内边距&#xff0c;这个空间会作为元素内容与元素边框之间的缓冲区域。设置内边距会使元素本身变大。例如padding:10px就创建了10像素的空间…

每日OJ题_贪心算法四④_力扣397. 整数替换

目录 力扣397. 整数替换 解析代码 力扣397. 整数替换 397. 整数替换 难度 中等 给定一个正整数 n &#xff0c;你可以做如下操作&#xff1a; 如果 n 是偶数&#xff0c;则用 n / 2替换 n 。如果 n 是奇数&#xff0c;则可以用 n 1或n - 1替换 n 。 返回 n 变为 1 所需…

计算机网络复习-传输层

概念 传输层是进程与进程之间的通信使用端口(Port)来标记不同的网络进程端口(Port)使用16比特位表示(0~65535) UDP协议详解 UDP&#xff1a;用户数据报协议数据报&#xff1a;应用层传输过来的一个完整的数据不合并&#xff0c;不拆分 UDP的头部 UDP特点 UDP是无连接协…

picoCTF-Web Exploitation-Trickster

Description I found a web app that can help process images: PNG images only! 这应该是个上传漏洞了&#xff0c;十几年没用过了&#xff0c;不知道思路是不是一样的&#xff0c;以前的思路是通过上传漏洞想办法上传一个木马&#xff0c;拿到webshell&#xff0c;今天试试看…

【计算机网络】物理层 通信基础、奈氏准则、香农公式 习题2

下列说法中正确的是( )。 A. 信道与通信电路类似&#xff0c;一条可通信的电路往往包含一个信道 B.调制是指把模拟数据转换为数字信号的过程 C. 信息传输速率是指通信信道上每秒传输的码元数 D.在数值上&#xff0c;波特率等于比特率与每符号所含的比特数的比值 信息传输速率&a…

python随机显示四级词汇

python实现一个浮动窗口随机显示四级单词在桌面跑来跑去 实现一个浮动窗体随机显示四级单词在windows桌面置顶移动 tkinter库来创建窗口和显示单词&#xff0c;以及random库来随机选择单词。 使用after方法来定时更新窗口的位置&#xff0c;实现单词窗口的慢慢移动效果 使用…

【Apache Doris】周FAQ集锦:第 3 期

【Apache Doris】周FAQ集锦&#xff1a;第 3 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户和…

SpringBoot+Vue实现图片滑块和文字点击验证码

一、背景 1.1 概述 传统字符型验证码展示-填写字符-比对答案的流程&#xff0c;目前已可被机器暴力破解&#xff0c;应用程序容易被自动化脚本和机器人攻击。 摒弃传统字符型验证码&#xff0c;采用行为验证码采用嵌入式集成方式&#xff0c;接入方便&#xff0c;安全&#…

速了解及使用布隆过滤器

布隆过滤器 介绍 概念&#xff1a;是一种高效查询的数据结构 作用&#xff1a;判断某个元素是否在一个集合中。&#xff08;但是会出现误判的情况&#xff09; 实现原理 加入元素&#xff1a; 当一个元素需要加入到布隆过滤器中时&#xff0c;会使用一组哈希函数对该元素进…

每周日发系统规划与管理师伴读脑图,今天是第4章

从第4章开始&#xff0c;系统规划与管理师的学习就正式步入了主题&#xff0c;考虑到我过去中断了2周&#xff0c;想必你的第4章教程已经看完了吧&#xff1f;

2024年天津市静海区教师招聘报名流程(建议电脑)

2024年天津市静海区教师招聘报名流程&#xff08;建议电脑&#xff09; #报名 #教师招聘 #教师招聘考试 #教招 #天津教师招聘 #天津教师招聘考试 #24年天津教师招聘 #24年天津市教师招聘考试 #天津市静海区教师招聘 #静海区教师招聘考试 #静海区教师编 #静海区#

【OceanBase诊断调优】—— 租户资源统计项及其查询方法

本文主要介绍 OceanBase 数据库中租户资源统计项及其查询方法。 适用版本 OceanBase 数据库 V4.1.x、V4.2.x 版本。 CPU 资源统计项 逻辑 CPU 使用率&#xff08;线程处理请求的时间占比&#xff09;。 通过虚拟表 __all_virtual_sysstat 在 SYS 系统租户下&#xff0c;查看…

Ubuntu意外断电vmdk损坏--打不开磁盘“***.vmdk”或它所依赖的某个快照磁盘。

背景&#xff1a;电脑资源管理器崩溃卡死&#xff0c;强行断电重启&#xff0c;结果虚拟机打不开了&#xff0c;提示打不开磁盘“***.vmdk”或它所依赖的某个快照磁盘。 删除lck文件&#xff1a;失败vmware-vdiskmanager修复 &#xff1a;提示无法修复最终用 VMFS Recovery挂载…

【机器学习】集成学习在信用评分领域实例

集成学习在信用评分领域的应用与实践 一、引言二、集成学习的概念与原理三、集成学习在信用评分中的应用实例四、总结与展望 一、引言 在当今金融数字化快速发展的时代&#xff0c;信用评分成为银行、金融机构等评估个人或企业信用风险的重要工具。然而&#xff0c;单一的信用评…

OFDM802.11a的FPGA实现(十二)使用FFT IP核添加循环前缀

原文链接&#xff08;相关文章合集&#xff09;&#xff1a;OFDM 802.11a的xilinx FPGA实现 目录 1.前言2.循环前缀3.硬件实现4.ModelSim仿真 1.前言 为了能够消除传输过程当中的符号间干扰&#xff0c;在IFFT处理完毕之后还要加上循环前缀。 2.循环前缀 实际通信信道中,由于接…

Linux常用软件安装(JDK、MySQL、Tomcat、Redis)

目录 一、上传与下载工具Filezilla1. filezilla官网 二、JDK安装1. 在opt中创建JDK目录2.上传JDK压缩文件到新建目录中3.卸载系统自代jdk4.安装JDK5.JDK环境变量配置6. 验证是否安装成功 三、安装MySQL1.创建mysql文件夹2.下载mysql安装压缩包3.上传到文件夹里面4. 卸载系统自带…

动态规划算法:⼦数组、⼦串系列(数组中连续的⼀段)

例题一 解法&#xff08;动态规划&#xff09;&#xff1a; 算法思路&#xff1a; 1. 状态表⽰&#xff1a; 对于线性 dp &#xff0c;我们可以⽤「经验 题⽬要求」来定义状态表⽰&#xff1a; i. 以某个位置为结尾&#xff0c;巴拉巴拉&#xff1b; ii. 以某个位置…

清除HP打印机内存的5种方法,总有一种适合你

序言 HP打印机通常具有2 MB到32 MB的内部内存容量。打印机使用此内存存储打印作业和信息,如文档中的页数、纸张类型、纸张大小和字体。但是,如果打印作业的大小超过打印机的内存大小,它将无法执行打印命令,并将拒绝打印文档。 此外,有时打印作业可能会卡在打印机的内存中…

Matlab/simulink永磁直驱风机的建模仿真

Matlab/simulink直驱永磁同步风机的建模仿真&#xff0c;跟随风速波动效果好&#xff0c;可以作为后期科研的基础模型