zookeeper应用场景(二)

news2025/1/19 17:02:43

单机环境下可以利用jvm级别的锁,比如synchronized、Lock等来实现锁,如果是多机部署就需要一个共享数据存储区域来实现分布式锁

一、分布式锁实现方式

1、基于数据库实现分布式锁

可以用数据库唯一索引来实现

2、基于redis实现分布式锁

redis实现的分布式锁始终会有一些问题,即便使用多数写入,主节点挂了,数据丢失还是会存在加锁问题,就是主节点宕机,客户端无法感知

3、基于zookeeper实现分布式锁

1)实现方式一

使用临时节点创建成功获取锁,否则监听临时节点,有个问题,比如1000个线程只有一个会加锁成功,当删除临时节点时999个线程都会去竞争

2)实现方式二

公平锁的实现

4、Curator可重入分布式锁工作流程

从InterProcessMutex类找到acquire()加锁方法

public void acquire() throws Exception {
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }
1)加锁 
private boolean internalLock(long time, TimeUnit unit) throws Exception {
        // 获取当前线程
        Thread currentThread = Thread.currentThread();
        // threadData类型是ConcurrentMap,从threadData中去拿LockData加锁对象
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        // 如果拿到了 证明之前加锁了,lockCount重入次数+1
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            // 没有拿到开始从zookeeper上创建lock节点
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            // 创建成功 加锁成功把对象放到threadData中
            if (lockPath != null) {
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                // 加锁失败
                return false;
            }
        }
    }

private static class LockData {
        // 持有锁的线程
        final Thread owningThread;
        // 在zookeeper的锁路径
        final String lockPath;
        // 线程加锁次数
        final AtomicInteger lockCount;

        private LockData(Thread owningThread, String lockPath) {
            this.lockCount = new AtomicInteger(1);
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
 2)创建节点返回路径
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        // 重试次数
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while(!isDone) {
            isDone = true;

            try {
                // 创建临时有序节点
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                // 创建的节点是否为最小节点
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                // 加锁失败 重试设置重试策略
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        // 是否要给节点设置属性 创建的都是临时有序节点
        if (lockNodeBytes != null) {
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
        } else {
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
        }

        return ourPath;
    }
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }

            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                // 将子节点进行排序
                List<String> children = this.getSortedChildren();
                // 截取创建的节点的序号
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                // 判断是否为最小序号
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                if (predicateResults.getsTheLock()) {
                    // 加锁成功
                    haveTheLock = true;
                } else {
                    // 拿到上一个节点的路径
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                            // 监听上一个节点
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
                                // 等待
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait > 0L) {
                                    // 超时等待
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }
 3)解锁
public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData == null) {
            // 分布式场景下什么情况都可能有 所以判断一下
            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
        } else {
            // 重入次数减1
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount <= 0) {
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
                } else {
                    try {
                        // 这里之前应该还有个大于0的判断 在curator5.1.0&zookeeper 3.9.0版本去掉了
                        this.internals.releaseLock(lockData.lockPath);
                    } finally {
                        this.threadData.remove(currentThread);
                    }

                }
            }
        }
    }

 

5、总结

优点:Zookeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题

缺点:因为需要频繁的创建和删除节点,性能上不如redis

在高性能、高并发场景下,不建议用Zookeeper的分布式锁。而由于Zookeeper的高可靠性,因此在并发量不是太高的应用场景下,还是推荐使用Zookeeper的分布式锁

二、服务注册与发现

1、设计思路

2、实现注册中心的优缺点

优点:

  • 高可用性:ZooKeeper是一个高可用的分布式系统,可以通过配置多个服务器实例来提供容错能力。如果其中一个实例出现故障,其他实例仍然可以继续提供服务。
  • 强一致性:ZooKeeper保证了数据的强一致性。当一个更新操作完成时,所有的服务器都将具有相同的数据视图。这使得ZooKeeper非常适合作为服务注册中心,因为可以确保所有客户端看到的服务状态是一致的。
  • 实时性:ZooKeeper的监视器(Watcher)机制允许客户端监听节点的变化。当服务提供者的状态发生变化时(例如,上线或下线),客户端会实时收到通知。这使得服务消费者能够快速响应服务的变化,从而实现动态服务发现。

缺点:

  • 性能限制:ZooKeeper的性能可能不如一些专为服务注册中心设计的解决方案,如nacos或Consul。尤其是在大量的读写操作或大规模集群的情况下,ZooKeeper可能会遇到性能瓶颈。

3、整合Spring Cloud Zookeeper实现微服务注册中心 

Spring Cloud Zookeeper

第一步:在父pom文件中指定Spring Cloud版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

注意: springboot和springcloud的版本兼容问题

第二步:微服务pom文件中引入Spring Cloud Zookeeper注册中心依赖

<!-- zookeeper服务注册与发现 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题

Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration

第三步: 微服务配置文件application.yml中配置zookeeper注册中心地址

spring:
  cloud:
    zookeeper:    
      connect-string: localhost:2181
      discovery:
        instance-host: 127.0.0.1

注册到zookeeper的服务实例元数据信息如下:

注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定

第四步:整合feign进行服务调用

@RequestMapping(value = "/findOrderByUserId/{id}")
public R  findOrderByUserId(@PathVariable("id") Integer id) {
    log.info("根据userId:"+id+"查询订单信息");
    //feign调用   
    R result = orderFeignService.findOrderByUserId(id);
    return result;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1095671.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试除了点点点,还有哪些内容呢?

今天和一个网友讨论了一下关于互联网行业中测试的情况&#xff0c;希望能够了解现在的互联网行业主要的测试工作内容。小编根据以往的工作经历和经验情况&#xff0c;来做一个总结和整理。 1、岗位分类 现在的岗位划分主要是分为两大类&#xff1a;测试工程师 和 测试开发工程…

1、验证1101序列(Moore)

题目要求&#xff1a; 用Moore型状态机验证1101序列。 题目描述&#xff1a; 用使用状态机验证1101序列&#xff0c;注意&#xff1a;允许重复子序列。如图 端口描述&#xff1a; module moore_1101(input clk,//时钟信号input clr,//reset复位信号&#xff0c;高电平有效in…

Netty 入门 — 亘古不变的Hello World

这篇文章我们正式开始学习 Netty&#xff0c;在入门之前我们还是需要了解什么是 Netty。 什么是 Netty 为什么很多人都推崇 Java boy 去研究 Netty&#xff1f;Netty 这么高大上&#xff0c;它到底是何方神圣&#xff1f; 用官方的话说&#xff1a;Netty 是一款异步的、基于事…

【网络协议】聊聊ifconfig

我们知道在linux是ifconfig查看ip地址&#xff0c;但是ip addr也可以查看 IP 地址是一个网卡在网络世界的通讯地址&#xff0c;相当于我们现实世界的门牌号码。 从IP地址的划分来看&#xff0c;C类地址只可以容纳254个&#xff0c;而B类6W多&#xff0c;那么又没有一种折中的…

系统文件IO、文件描述符fd、重定向、文件系统、动态库和静态库

目录 C文件接口系统文件I/O系统调用和库函数文件描述符0 & 1 & 2FILE和fd的关系文件描述符的分配规则 重定向重定向的本质输出重定向输入重定向追加重定向 dup2函数 FILE理解文件系统了解磁盘的物理结构逻辑抽象文件系统文件系统的图解和解析通过文件系统来理解ls -al通…

74.C++ STL stack容器

目录 1.什么是stack 2.stack的构造函数 3.赋值操作 4.数据存取操作 5.大小操作 1.什么是stack stack 是 C 标准库中的容器适配器&#xff0c;它提供了一个堆栈&#xff08;栈&#xff09;数据结构的封装&#xff0c;用于管理元素的插入和移除。栈是一种后进先出的数据结构…

【RWKV】如何新增一个自定义的Tokenizer和模型到HuggingFace

0x0. 前言 RWKV社区在Huggingface上放了rwkv-4-world和rwkv-5-world相关的一系列模型&#xff0c;见&#xff1a;https://huggingface.co/BlinkDL/rwkv-4-world & https://huggingface.co/BlinkDL/rwkv-5-world &#xff0c;然而这些模型的格式是以PyTorch的格式进行保存的…

$ vue -Vbash: vue: command not found

$ vue -V bash: vue: command not found报这个错&#xff0c;我们需要找到vue安装路径&#xff0c;添加在环境变量的用户变量中&#xff1a; 1、vue安装路径 2、编辑环境变量 然后重新打开命令框&#xff0c;就可以了

嵌入式数据库sqlite3【基础篇】基本命令操作,小白一看就懂(C/C++)

目录 前言 一、sqlite概念和特性 二、sqlite安装 三、sqlite3数据类型 四、sqlite数据库约束 五、sqlite常用命令 六、SQL语句&#xff08;增删改查&#xff09; 七、sqlite使用实例&#xff08;教学管理数据库&#xff09; 总结 前言 数据在实际工作中应用非常广泛…

Linux网络编程系列之服务器编程——阻塞IO模型

Linux网络编程系列 &#xff08;够吃&#xff0c;管饱&#xff09; 1、Linux网络编程系列之网络编程基础 2、Linux网络编程系列之TCP协议编程 3、Linux网络编程系列之UDP协议编程 4、Linux网络编程系列之UDP广播 5、Linux网络编程系列之UDP组播 6、Linux网络编程系列之服务器编…

Stream流中的 max()和 sorted()方法

需求&#xff1a;某个公司的开发部门&#xff0c;分为开发 一部 和 二部 &#xff0c;现在需要进行年中数据结算。分析&#xff1a; 员工信息至少包含了&#xff08;名称、性别、工资、奖金、处罚记录&#xff09;开发一部有 4 个员工、开发二部有 5 名员工分别筛选出 2 个部门…

1.2 向量的长度与点积

一、向量的点积 两个向量 v ( v 1 , v 2 ) \boldsymbol v(v_1,v_2) v(v1​,v2​) 与 w ( w 1 , w 2 ) \boldsymbol w(w_1,w_2) w(w1​,w2​)的点积或内积是数字 v ⋅ w \boldsymbol v\cdot\boldsymbol w v⋅w&#xff1a; v ⋅ w v 1 w 1 v 2 w 2 ( 1.2.1 ) \boldsymbo…

【Agora UID 踩坑记录 Java 数据类型】

目录 负数二进制表示Java中32位无符号数的取法项目踩坑记录Java 0xffffffff隐式类型转换的坑 负数二进制表示 由于计算机中数据都以二进制表示&#xff0c;而负数的二级制是根据正数二进制取补码&#xff08;补码就是先取反码&#xff0c;然后加1&#xff09;得到&#xff0c;…

关于Gym变成Gymnasium

根据网页搜索的gym官网&#xff0c;发现如下网站https://www.gymlibrary.dev/ 刚进页面时 翻译一下&#xff0c;意思就是 Gym 的所有开发都已迁移到 Gymnasium,这是 Farama 基金会中的一个新软件包,由过去 18 个月来维护 Gym 的同一团队开发人员维护。如果您已经在使用最新版…

Java基础(运算符篇)

一、算术运算符 正号-负号加法-减法*乘法/除法%模运算&#xff08;取余&#xff09;自增--自减 算术运算符的使用比较简单&#xff0c;只需要注意一些细节。 tips&#xff1a; 加号&#xff08; &#xff09;除了可以作为正号&#xff0c;还可以用于字符串拼接。 public c…

解读下SWD协议以及其应用

SWD协议原理 SWD&#xff08;Serial Wire Debug&#xff09;协议是一种用于ARM Cortex微控制器的调试接口协议。它定义了主机计算机与目标设备之间通过SWD线进行通信的格式和规范。 SWD协议使用两根线进行通信&#xff1a;SWDIO&#xff08;Serial Wire Debug I/O&#xff09…

跳石板(牛客)

目录 一、题目 二、代码 &#xff08;一&#xff09;解法一&#xff1a;超时了 &#xff08;二&#xff09;优化 一、题目 跳石板_牛客题霸_牛客网 二、代码 &#xff08;一&#xff09;解法一&#xff1a;超时了 #include <climits> #include <iostream> …

python的pyecharts第三方模块绘制高端统计图表

pyecharts库 python的pyecharts库是一个用于生成 Echarts 图表的python第三方类库&#xff0c;可以绘制很高端的统计图表以便展示数据。 安装方法 pip安装 pip install pyecharts或者github拉取下载安装 git clone https://github.com/pyecharts/pyecharts.git cd pyechar…

Android 内容提供者和内容观察者:数据共享和实时更新的完美组合

任务要求 一个作为ContentProvider提供联系人数据另一个作为Observer监听联系人数据的变化&#xff1a; 1、创建ContactProvider项目&#xff1b; 2、在ContactProvider项目中用Sqlite数据库实现联系人的读写功能&#xff1b; 3、在ContactProvider项目中通过ContentProvid…

1.安装 docker 容器并配置镜像加速器

1.2.1 实验环境准备 实验环境&#xff1a; rockylinux8.8 可以去官网下载 下载 Rocky | Rocky Linux 主机名&#xff1a; xuegod63 主机 ip: 192.168.1.63&#xff08;这个 ip 大家可以根据自己所在环境去配置&#xff0c;配置成静态 IP&#xff09; 2g 内存、2vCPU、50G 硬…