Zookeeper是什么?基于zookeeper实现分布式锁

news2025/4/24 13:04:40

zookeeper听的很多,但实际在应用开发中用的不错,主要是作为中间件配合使用的,例如:Kafka。

了解zk首先需要知道它的数据结构,可以想象为树、文件夹目录。每个节点有基本的信息,例如:创建时间、修改时间、版本,数据长度等。另外节点可以设置data,也就是数据,以字节的方式进行插入/获取,另外节点还拥有权限和状态。

状态很关键,有持久、临时(会话级别)、持久+顺序、临时+顺序、持久+TTL、临时+TTL。

顺序是给同一个节点增加一个编号,例如:path:/distributed_locks/lock

插入多个,在zk中是:/distributed_locks/lock0000000001和/distributed_locks/lock0000000002、、。

到这里数据结构已经大致清楚了,那么zk存在的意义是什么?

首先,zk的定义:是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。

关键点:集中、分布式。

在程序进行分布式、多节点部署时,传统程序内存中的变量或者锁机制等都不能在多节点中进行通用。此时,就需要一个集中式的一个中间件,在中间件上存储我们需要同时方案的变量或者其他定义。

那么,我们为什么不直接使用db数据库呢,可能是因为重?也可能是一些特殊的功能db中并不能实现?(临时会话、TTL?)。

作为目前很火热的一个中间件,存在它的意义肯定是有的。为什么说呢,zk是Java实现的,与 Hadoop、Kafka 等 Java 生态项目无缝集成。同理,可以想象,每个语言的特性不一致,都会有不同的中间件或者包。

上述,基本都是个人的一些理解,希望能给大家带来点启发。

zookeeper,咱们的扩展功能到分布式锁这里。通过节点的特性,我们采用会话级别、顺序性质的节点进行实现。

当我们的线程需要去尝试获取锁时,连接zk肯定是个会话,同时zk会根据顺序将不同的线程进行排序,线程内部只需要轮询、wait/notify等方式判断是否轮到自己得到锁了。获取到锁后,执行业务逻辑之后,随之可以将锁进行释放,以便让另外一个线程得到锁。

代码实现用2种方式实现:

原生zookeeper方法实现

package com.fahe.testdistrubutedlock.zk;
​
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
​
import java.util.List;
import java.util.concurrent.CountDownLatch;
​
/**
 * @program: test-distrubuted-lock
 * @description: client
 * @author: <linfahe-694204477@qq.com>
 * @create: 2025-04-23 14:05
 **/
@Slf4j
public class ZkClient implements Watcher {
​
    public static final String ZK_ADDR = "127.0.0.1:32181";
    public ZooKeeper zk;
    public CountDownLatch connectedSignal = new CountDownLatch(1);
​
    public ZkClient() {
        try {
            zk = new ZooKeeper(ZK_ADDR, 3000, this);
            connectedSignal.await(); // 等待连接成功
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
​
    @Override
    public void process(WatchedEvent watchedEvent) {
        log.info("process WatchedEvent : {}", watchedEvent);
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            connectedSignal.countDown();
        }
    }
​
​
    // 创建持久节点
    public void createNode() throws KeeperException, InterruptedException {
        Stat existsed = zk.exists("/my-node", false);
        if (existsed != null) {
//            zk.delete("/my-node", -1);
            return;
        }
        String path = zk.create("/my-node", "data".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("创建节点:" + path);
    }
​
    // 获取节点数据
    public void getData() throws KeeperException, InterruptedException {
        byte[] data = zk.getData("/my-node", false, null);
        System.out.println("节点数据:" + new String(data));
    }
​
    public static void main(String[] args) throws InterruptedException, KeeperException {
        ZkClient zkClient = new ZkClient();
        List<String> children = zkClient.zk.getChildren("/", true);
        for (String child : children) {
            log.info("child : {}", child);
        }
        zkClient.createNode();
        zkClient.getData();
    }
​
    public void close() {
        try {
            if (zk != null) {
                zk.close();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
​
package com.fahe.testdistrubutedlock.zk;
​
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
​
import java.util.Collections;
import java.util.List;
​
public class DistributedLock {
    private static final String LOCK_ROOT = "/locks";
    private static final String LOCK_NODE = LOCK_ROOT + "/lock_";
    private ZooKeeper zooKeeper;
    private String lockPath;
​
    public DistributedLock(ZooKeeper zooKeeper) throws Exception {
        this.zooKeeper = zooKeeper;
        Stat stat = zooKeeper.exists(LOCK_ROOT, false);
        if (stat == null) {
            zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
​
    public void acquireLock() throws Exception {
        lockPath = zooKeeper.create(LOCK_NODE, "new byte[0]".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Lock path: " + lockPath);
​
        while (true) {
            List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
            Collections.sort(children);
            String smallestChild = LOCK_ROOT + "/" + children.get(0);
​
            if (lockPath.equals(smallestChild)) {
                System.out.println("Acquired lock: " + lockPath);
                return;
            }
            System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild);
            String watchNode = null;
            for (int i = children.size() - 1; i >= 0; i--) {
                String child = LOCK_ROOT + "/" + children.get(i);
                if (child.compareTo(lockPath) < 0) {
                    watchNode = child;
                    break;
                }
            }
            System.out.println("Waiting for lock: " + lockPath + "; smallestChild : " + smallestChild + " ; watchNode = " + watchNode);
​
            if (watchNode != null) {
                final Object lock = new Object();
                Watcher watcher = new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        synchronized (lock) {
                            lock.notifyAll();
                        }
                    }
                };
​
                Stat stat = zooKeeper.exists(watchNode, watcher);
                if (stat != null) {
                    synchronized (lock) {
                        lock.wait();
                    }
                }
            }
        }
    }
​
    public void releaseLock() throws Exception {
        if (lockPath != null) {
            zooKeeper.delete(lockPath, -1);
            System.out.println("Released lock: " + lockPath);
            lockPath = null;
        }
    }
​
    public static void main(String[] args) {
        ZkClient client = new ZkClient();
        // 模拟多线程。
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                try {
                    mainTest(client);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        // 模拟多实例。
        ZkClient client2 = new ZkClient();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                try {
                    mainTest(client2);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
​
    public static void mainTest(ZkClient client) {
//         = new ZkClient();
        try {
            ZooKeeper zooKeeper = client.zk;
​
            DistributedLock lock = new DistributedLock(zooKeeper);
            lock.acquireLock();
            System.out.println("Lock acquired");
            // 模拟业务逻辑
            int randomSleepTime = (int) (Math.random() * 100);
            System.out.println("randomSleepTime = " + randomSleepTime);
            Thread.sleep(randomSleepTime);
            System.out.println("Business logic completed");
            lock.releaseLock();
//            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
​

使用Curator三方包实现:

package com.fahe.testdistrubutedlock.zk;
​
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
​
​
/**
 * @program: test-distrubuted-lock
 * @description: curator 测试
 * @author: <linfahe-694204477@qq.com>
 * @create: 2025-04-23 15:04
 **/
public class CuratorMain {
    private final InterProcessMutex lock;
    private static final String LOCK_PATH = "/distributed_lock/my_lock";
    private static final String ZK_ADDR = "127.0.0.1:32181";
​
    public CuratorMain() {
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDR,
                new ExponentialBackoffRetry(200, 2));
        client.start();
        this.lock = new InterProcessMutex(client, LOCK_PATH);
    }
​
    public boolean acquireLock() {
        try {
            lock.acquire();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
​
    public void releaseLock() {
        try {
            if (lock.isAcquiredInThisProcess()) {
                lock.release();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
​
    public static void main(String[] args) {
        CuratorMain curatorMain = new CuratorMain();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                boolean acquireLock = curatorMain.acquireLock();
                System.out.println("thread-" + Thread.currentThread().getName() + " is running");
                System.out.println("acquireLock = " + acquireLock);
                if (acquireLock) {
                    curatorMain.releaseLock();
                }
            }, "thread-" + i).start();
        }
        CuratorMain curatorMain2 = new CuratorMain();
        for (int i = 100; i < 200; i++) {
            new Thread(() -> {
                boolean acquireLock = curatorMain2.acquireLock();
                System.out.println("thread-" + Thread.currentThread().getName() + " is running");
                System.out.println("acquireLock = " + acquireLock);
                if (acquireLock) {
                    curatorMain2.releaseLock();
                }
            }, "thread-" + i).start();
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2341496.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL的日志--Redo Log【学习笔记】

MySQL的日志--Redo Log 知识来源&#xff1a; 《MySQL是怎样运行的》--- 小孩子4919 MySQL的事务四大特性之一就是持久性&#xff08;Durability&#xff09;。但是底层是如何实现的呢&#xff1f;这就需要我们的Redo Log&#xff08;重做日志&#xff09;闪亮登场了。它记录着…

【AI应用】免费代码仓构建定制版本的ComfyUI应用镜像

免费代码仓构建定制版本的ComfyUI应用镜像 1 创建代码仓1.1 注册登陆1.2 创建代码仓1.5 安装中文语言包1.4 拉取ComfyUI官方代码2 配置参数和预装插件2.1 保留插件和模型的版本控制2.2 克隆插件到代码仓2.2.1 下载插件2.2.2 把插件设置本仓库的子模块管理3 定制Docker镜像3.1 创…

​​Agentic AI——当AI学会主动思考与决策,世界将如何被重塑?

一、引言&#xff1a;2025&#xff0c;Agentic AI的元年 “如果ChatGPT是AI的‘聊天时代’&#xff0c;那么2025年将开启AI的‘行动时代’。”——Global X Insights[1] 随着Agentic AI&#xff08;自主决策型人工智能&#xff09;的崛起&#xff0c;AI系统正从被动应答的“工具…

Ollama API 应用指南

1. 基础信息 默认地址: http://localhost:11434/api数据格式: application/json支持方法: POST&#xff08;主要&#xff09;、GET&#xff08;部分接口&#xff09; 2. 模型管理 API (1) 列出本地模型 端点: GET /api/tags功能: 获取已下载的模型列表。示例:curl http://lo…

PNG透明免抠设计素材大全26000+

在当今的数字设计领域&#xff0c;寻找高质量且易于使用的素材是每个设计师的日常需求。今天&#xff0c;我们将为大家介绍一个超全面的PNG透明免抠设计素材大全&#xff0c;涵盖多种风格、主题和应用场景&#xff0c;无论是平面设计、网页设计还是多媒体制作&#xff0c;都能轻…

4.多表查询

SQL 多表查询&#xff1a;数据整合与分析的强大工具 文章目录 SQL 多表查询&#xff1a;数据整合与分析的强大工具一、 多表查询概述1.1 为什么需要多表查询1.2 多表查询的基本原理 二、 多表查询关系2.1 一对一关系&#xff08;One-to-One&#xff09;示例&#xff1a; 2.2 一…

美团2024年春招第一场笔试 C++

目录 1&#xff0c;小美的平衡矩阵 2&#xff0c;小美的数组询问 3&#xff0c;小美的MT 4&#xff0c;小美的朋友关系 1&#xff0c;小美的平衡矩阵 【题目描述】 给定一个n*n的矩阵&#xff0c;该矩阵只包含数字0和1。对于 每个i(1<i<n)&#xff0c;求在该矩阵中&am…

XHTMLConverter把docx转换html报java.lang.NullPointerException异常

一.报错 1.报错信息 org.apache.poi.xwpf.converter.core.XWPFConverterException: java.lang.NullPointerExceptionat org.apache.poi.xwpf.converter.xhtml.XHTMLConverter.convert(XHTMLConverter.java:77)at org.apache.poi.xwpf.converter.xhtml.XHTMLConverter.doConve…

OpenCV 图形API(52)颜色空间转换-----将 NV12 格式的图像数据转换为 RGB 格式的图像

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像从 NV12 (YUV420p) 色彩空间转换为 RGB。该函数将输入图像从 NV12 色彩空间转换到 RGB。Y、U 和 V 通道值的常规范围是 0 到 255。 输出图…

COdeTop-206-反转链表

题目 206. 反转链表 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&#xff1a;[2,1]示例 …

线段树讲解(小进阶)

目录 前言 一、线段树知识回顾 线段树区间加减 区间修改维护&#xff1a; 区间修改的操作&#xff1a; 区间修改update&#xff1a; 线段树的区间查询 区间查询&#xff1a; 区间查询的操作&#xff1a; 递归查询过程&#xff1a; 区间查询query&#xff1a; 代码&…

openharmony5.0.0中C++公共基础类测试-线程相关(一)

C公共基础类测试及源码剖析 延续传统&#xff0c;show me the code&#xff0c;除了给出应用示例还重点分析了下openharmony中的实现。 简介 openharmony中提供了C公共基础类库&#xff0c;为标准系统提供了一些常用的C开发工具类&#xff0c;本文分析其实现&#xff0c;并给…

TDengine 数据订阅设计

简介 数据订阅作为 TDengine 的一个核心功能&#xff0c;为用户提供了灵活获取所需数据的能力。通过深入了解其内部原理&#xff0c;用户可以更加有效地利用这一功能&#xff0c;满足各种实时数据处理和监控需求。 基本概念 主题 与 Kafka 一样&#xff0c;使用 TDengine 数…

URP-UGUI交互功能实现

一、非代码层面实现交互&#xff08;SetActive&#xff09; Button &#xff1a;在OnClick&#xff08;&#xff09;中添加SetActive方法&#xff08;但是此时只首次有效&#xff09; Toggle &#xff1a;在OnClick&#xff08;&#xff09;中添加动态的SetActive方法 &#…

UniGoal 具身导航 | 通用零样本目标导航 CVPR 2025

UniGoal的提出了一个通用的零样本目标导航框架&#xff0c;能够统一处理多种类型的导航任务 &#xff08;如对象类别导航、实例图像目标导航和文本目标导航&#xff09;&#xff0c;而无需针对特定任务进行训练或微调。 它的特点是 图匹配与多阶段探索策略&#xff01;&#x…

通过Quartus II实现Nios II编程

目录 一、认识Nios II二、使用Quartus II 18.0Lite搭建Nios II硬件部分三、软件部分四、运行项目 一、认识Nios II Nios II软核处理器简介 Nios II是Altera公司推出的一款32位RISC嵌入式处理器&#xff0c;专门设计用于在FPGA上运行。作为软核处理器&#xff0c;Nios II可以通…

Linux/AndroidOS中进程间的通信线程间的同步 - IPC方式简介

前言 从来没有总结过Linux/Android系统中进程间的通信方式和线程间的同步方式&#xff0c;这个专栏就系统总结讨论一下。首先从标题可知&#xff0c;讨论问题的主体是进程和线程、通信和同步&#xff1b;在这里默认你理解进程和线程的区别。通信和同步有什么概念上的区别&…

Windows:注册表配置应用

0、简介 本篇博客记录一下&#xff0c;日常的系统注册表配置选项&#xff0c;以防再次遇到问题不知如何解决。 1、开机启动配置 HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Run :: 此位置存储了所有用户登录时需要启动的程序。 在该项下新建字符串值&#…

WebXR教学 05 项目3 太空飞船小游戏

准备工作 自动创建 package.json 文件 npm init -y 安装Three.js 3D 图形库&#xff0c;安装现代前端构建工具Vite&#xff08;用于开发/打包&#xff09; npm install three vite 启动 Vite 开发服务器&#xff08;推荐&#xff09;&#xff08;正式项目开发&#xff09; …

达梦统计信息收集情况检查

查询达梦某个对象上是否有统计信息 select id,T_TOTAL,N_SMAPLE,N_DISTINCT,N_NULL,BLEVEL,N_LEAF_PAGES,N_LEAF_USED_PAGES,LAST_GATHERED from sysstats where id IN (select id from sysobjects where upper(name)upper(&objname));可能有系统对象&#xff0c;可以增加…