Zookeeper实现分布式锁、Zookeeper实现配置中心

news2024/11/27 20:45:12

一、Zookeeper实现分布式锁

分布式锁主要用于在分布式环境中保证数据的一致性。

包括跨进程、跨机器、跨网络导致共享资源不一致的问题。

1.Zookeeper分布式锁的代码实现 

 新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖

 <dependency>
        <groupId>com.101tec</groupId>
         <artifactId>zkclient</artifactId>
         <version>0.10</version>
 </dependency>

2. Zookeeper分布式锁的核心代码实现

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 
* @Description: Zookeeper分布式锁的核心代码实现
* @author leeSmall
* @date 2018年9月4日
*
*/
public class DistributedLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);

    private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
    private static final String LOCK_PATH = "/LOCK";

    private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());

    private CountDownLatch cdl;

    private String beforePath;// 当前请求的节点前一个节点
    private String currentPath;// 当前请求的节点

    // 判断有没有LOCK目录,没有则创建
    public DistributedLock() {
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    public void lock() {
        //尝试去获取分布式锁失败
        if (!tryLock()) {
            //对次小节点进行监听
            waitForLock();
            lock();
        } 
        else {
            logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
        }
    }
    
    public boolean tryLock() {
        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 创建一个临时顺序节点
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            System.out.println("---------------------------->" + currentPath);
        }

        // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
        List<String> childrens = this.client.getChildren(LOCK_PATH);
        //由小到大排序所有子节点
        Collections.sort(childrens);
        //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
        if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
            return true;
        } 
        //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
        else {
            int wz = Collections.binarySearch(childrens, currentPath.substring(6));
            beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
        }

        return false;
    }

    //等待锁,对次小节点进行监听
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
                if (cdl != null) {
                    cdl.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
        this.client.subscribeDataChanges(beforePath, listener);
        if (this.client.exists(beforePath)) {
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);
    }
    
    //完成业务逻辑以后释放锁
    public void unlock() {
        // 删除当前临时节点
        client.delete(currentPath);
    }

    // ==========================================
    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public Condition newCondition() {
        return null;
    }
}
 3.2 在业务里面使用分布式锁
package com.study.demo.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 
* @Description: 在业务里面使用分布式锁
* @author leeSmall
* @date 2018年9月4日
*
*/
public class OrderServiceImpl implements Runnable {
    private static OrderCodeGenerator ong = new OrderCodeGenerator();

    private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
    // 同时并发的线程数
    private static final int NUM = 10;
    // 按照线程数初始化倒计数器,倒计数器
    private static CountDownLatch cdl = new CountDownLatch(NUM);

    private Lock lock = new DistributedLock();

    // 创建订单接口
    public void createOrder() {
        String orderCode = null;

        //准备获取锁
        lock.lock();
        try {
            // 获取订单编号
            orderCode = ong.getOrderCode();
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            //完成业务逻辑以后释放锁
            lock.unlock();
        }

        // ……业务代码

        logger.info("insert into DB使用id:=======================>" + orderCode);
    }

    
    public void run() {
        try {
            // 等待其他线程初始化
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 创建订单
        createOrder();
    }

    public static void main(String[] args) {
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            new Thread(new OrderServiceImpl()).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}

 工具类:

import java.text.SimpleDateFormat;
import java.util.Date;

public class OrderCodeGenerator {
    // 自增长序列
    private static int i = 0;

    // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
    public String getOrderCode() {
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        return sdf.format(now) + ++i;
    }

}
二.新建一个zookeeper配置中心类,从zookeeper动态获取数据库配置
import java.util.List;
import java.util.Properties;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;

import com.zaxxer.hikari.HikariDataSource;

/**
* 
* @Description: zookeeper配置中心类,从zookeeper动态获取数据库配置
* @author leeSmall
* @date 2018年9月10日
*
*/
public class ZookeeperConfigurerCentral {
    //curator客户端
    private CuratorFramework zkClient;
    
    //curator事件监听
    private TreeCache treeCache;

    //zookeeper的ip和端口
    private String zkServers;
    
    //zookeeper上的/Jdbc路径
    private String zkPath;
    
    //超时设置
    private int sessionTimeout;
    
    //读取zookeeper上的数据库配置文件放到这里
    private Properties props;

    public ZookeeperConfigurerCentral(String zkServers, String zkPath, int sessionTimeout) {
        this.zkServers = zkServers;
        this.zkPath = zkPath;
        this.sessionTimeout = sessionTimeout;
        this.props = new Properties();

        //初始化curator客户端
        initZkClient();
        //从zookeeper的Jdbc节点下获取数据库配置存入props
        getConfigData();
        //对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新props
        addZkListener();
    }

    //初始化curator客户端
    private void initZkClient() {
        zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        zkClient.start();
    }

    //从zookeeper的Jdbc节点下获取数据库配置存入props
    private void getConfigData() {
        try {
            List<String> list = zkClient.getChildren().forPath(zkPath);
            for (String key : list) {
                String value = new String(zkClient.getData().forPath(zkPath + "/" + key));
                if (value != null && value.length() > 0) {
                    props.put(key, value);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新props
    private void addZkListener() {
        TreeCacheListener listener = new TreeCacheListener() {
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
                    getConfigData();
                    WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();
                    HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");
                    System.out.println("================"+props.getProperty("url"));
                    dataSource.setJdbcUrl(props.getProperty("url"));
                    dataSource.setUsername(props.getProperty("uname"));
                    dataSource.setPassword(props.getProperty("password "));
                    dataSource.setDriverClassName(props.getProperty("driver "));
                }
            }
        };

        treeCache = new TreeCache(zkClient, zkPath);
        try {
            treeCache.start();
            treeCache.getListenable().addListener(listener);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Properties getProps() {
        return props;
    }

    public void setZkServers(String zkServers) {
        this.zkServers = zkServers;
    }

    public void setZkPath(String zkPath) {
        this.zkPath = zkPath;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }
}

 新建一个加载props里面的数据库配置的类

import java.util.Properties;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;

/**
* 
* @Description: 加载props里面的数据库配置,这个类等价于以前在xml文件里面的配置:
* <context:property-placeholder location="classpath:config/jdbc_conf.properties"/>
* @author leeSmall
* @date 2018年9月10日
*
*/
public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
    private ZookeeperConfigurerCentral zkConfigurerCentral;

    @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
            throws BeansException {
        System.out.println(zkConfigurerCentral.getProps());
        super.processProperties(beanFactoryToProcess, zkConfigurerCentral.getProps());
    }

    public void setzkConfigurerCentral(ZookeeperConfigurerCentral zkConfigurerCentral) {
        this.zkConfigurerCentral = zkConfigurerCentral;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2248627.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java写一个石头剪刀布小游戏

石头剪刀布是一款经典的手势游戏,通常由两人参与,玩法简单且充满趣味。玩家通过出示手势代表“石头”、“剪刀”或“布”,并根据规则比较手势决定胜负。它广泛用于休闲娱乐、决策或解压活动。 一、功能简介 用户与计算机对战。 用户输入选择:石头、剪刀或布。 计算机随机生…

MATLAB深度学习(六)——LSTM长短期神经网络原理与应用

LSTM的应用可以参见一个相当好的视频&#xff1a;小车倒立摆最优控制教程 - Part1 Simulink Simscape Multibody仿真建模_哔哩哔哩_bilibili 6.1 序列建模——循环神经网络 循环神经网络RNN是一类专门用于处理序列性数据x&#xff0c;&#xff0c;xn的神经网络结构&#xff0c…

音视频技术扫盲之预测编码的基本原理探究

预测编码是一种数据压缩技术&#xff0c;广泛应用于图像、视频和音频编码等领域。其基本原理是利用数据的相关性&#xff0c;通过对当前数据的预测和实际值与预测值之间的差值进行编码&#xff0c;从而实现数据压缩的目的。 一、预测编码的基本概念 预测编码主要包括预测器和…

第六届国际科技创新学术交流大会暨信息技术与计算机应用学术会议(ITCA 2024)

重要信息 会议官网&#xff1a;itca2024.iaecst.org 会议时间&#xff1a;2024年12月06-08日 会议地点&#xff1a;中国-广州&#xff08;越秀国际会议中心&#xff09; 会议简介 第六届信息技术与计算机应用学术会议(ITCA 2024) 依旧作为第六届国际科技创新学术交流大会…

Leetcode 将有序数组转换为二叉搜索树

算法思想及代码解析&#xff1a; 这段代码的目的是将一个有序数组转换为 高度平衡的二叉搜索树&#xff08;Balanced Binary Search Tree, BST&#xff09;。以下是算法的详细解释&#xff1a; 1. 什么是高度平衡的二叉搜索树&#xff1f; 二叉搜索树&#xff1a;对于树中的每…

15 go语言(golang) - 并发编程goroutine原理及数据安全

底层原理 Go 的 goroutine 是一种轻量级的线程实现&#xff0c;允许我们在程序中并发地执行函数。与传统的操作系统线程相比&#xff0c;goroutine 更加高效和易于使用。 轻量级调度 用户态调度&#xff1a;Go 运行时提供了自己的调度器&#xff0c;这意味着 goroutine 的创建…

ESP-KeyBoard:基于 ESP32-S3 的三模客制化机械键盘

概述 在这个充满挑战与机遇的数字化时代&#xff0c;键盘已经成为我们日常学习、工作、娱乐生活必不可少的设备。而在众多键盘中&#xff0c;机械键盘&#xff0c;以其独特的触感、清脆的敲击音和经久耐用的特性&#xff0c;已经成为众多游戏玩家和电子工程师的首选。本文将为…

PyTorch基础05_模型的保存和加载

目录 一、模型定义组件——重构线性回归 二、模型的加载和保存 2、序列化保存对象和加载 3、保存模型参数 一、模型定义组件——重构线性回归 回顾之前的手动构建线性回归案例&#xff1a; 1.构建数据集&#xff1b;2.加载数据集(数据集转换为迭代器)&#xff1b;3.参数初…

《Python基础》之函数的用法

一、简介 在 Python 中&#xff0c;函数是一段可重用的代码块&#xff0c;用于执行特定的任务。函数可以帮助你将代码模块化&#xff0c;提高代码的可读性和可维护性。 函数的用途 代码重用&#xff1a;通过函数&#xff0c;你可以将常用的代码块封装起来&#xff0c;避免重复…

java:aqs实现自定义锁

aqs采用模板方式设计模式&#xff0c;需要重写方法 package com.company.aqs;import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;…

【小白学机器学习34】基础统计2种方法:用numpy的方法np().mean()等进行统计,pd.DataFrame.groupby() 分组统计

目录 1 用 numpy 快速求数组的各种统计量&#xff1a;mean, var, std 1.1 数据准备 1.2 直接用np的公式求解 1.3 注意问题 1.4 用print() 输出内容&#xff0c;显示效果 2 为了验证公式的背后的理解&#xff0c;下面是详细的展开公式的求法 2.1 均值mean的详细 2.2 方差…

vue2 中使用 Ag-grid-enterprise 企业版

文章目录 问题Vue2 引入企业版不生效npm run dev 时卡住了94% after seal 卡在这里了测试打包源 git 解决方案记录 问题 我想用企业版的树状表格 Vue2 引入企业版不生效 编译引入 // vue.config.js module.exports {transpileDependencies: ["ag-grid-enterprise"…

RESTful快速开发

&#xff08;3&#xff09;RESTful快速开发 &#xff08;2&#xff09;中的控制器仍然存在大量的冗余代码 问题1&#xff1a; 每个方法的RequestMapping注解中都定义了访问路径/users&#xff0c;重复性太高 问题2&#xff1a;每个方法的RequestMapping注解中都要使用method属…

万能门店小程序管理系统 doPageGetFormList SQL注入漏洞复现

0x01 产品简介 万能门店小程序管理系统是一款功能强大的工具,旨在为各行业商家提供线上线下融合的全方位解决方案。是一个集成了会员管理和会员营销两大核心功能的综合性平台。它支持多行业使用,通过后台一键切换版本,满足不同行业商家的个性化需求。该系统采用轻量后台,搭…

【作业九】RNN-SRN-Seq2Seq

点击查看作业内容 目录 1 实现SRN &#xff08;1&#xff09;使用numpy实现 &#xff08;2&#xff09;在&#xff08;1&#xff09;的基础上&#xff0c;增加激活函数tanh &#xff08;3&#xff09;使用nn.RNNCell实现 &#xff08;4&#xff09;使用nn.RNN实现 2 使用R…

Emgu (OpenCV)

Emgu Github Emgu 环境&#xff1a; Emgu CV 4.9.0 netframework 4.8 1、下载 libemgucv-windesktop-4.9.0.5494.exe 安装后&#xff0c;找到安装路径下的runtime文件夹复制到c#项目Debug目录下 安装目录 c# Debug目录

YOLOv8模型pytorch格式转为onnx格式

一、YOLOv8的Pytorch网络结构 model DetectionModel((model): Sequential((0): Conv((conv): Conv2d(3, 64, kernel_size(3, 3), stride(2, 2), padding(1, 1))(act): SiLU(inplaceTrue))(1): Conv((conv): Conv2d(64, 128, kernel_size(3, 3), stride(2, 2), padding(1, 1))(a…

澳洲房产市场数据清洗、聚类与可视化综合分析

本项目涉及数据清洗及分析时候的思路&#xff0c;如果仅在CSDN中看&#xff0c;可能会显得有些乱&#xff0c;建议去本人和鲸社区对应的项目中去查看&#xff0c;源代码和数据集都是免费下载的。 声明&#xff1a;本项目的成果可无偿分享&#xff0c;用于学习交流。但请勿用于…

IT服务团队建设与管理

在 IT 服务团队中&#xff0c;需要明确各种角色。例如系统管理员负责服务器和网络设备的维护与管理&#xff1b;软件工程师专注于软件的开发、测试和维护&#xff1b;运维工程师则保障系统的稳定运行&#xff0c;包括监控、故障排除等。通过清晰地定义每个角色的职责&#xff0…

go-zero(八) 中间件的使用

go-zero 中间件 一、中间件介绍 中间件&#xff08;Middleware&#xff09;是一个在请求和响应处理之间插入的程序或者函数&#xff0c;它可以用来处理、修改或者监控 HTTP 请求和响应的各个方面。 1.中间件的核心概念 请求拦截&#xff1a;中间件能够在请求到达目标处理器之…