Zookeeper(六)Zokeeper 使用场景案例

news2024/12/23 8:17:28

目录

  • 一 数据发布/订阅
    • 1.1 配置变更
    • 1.2 代码实现
    • 1.3 启动测试
  • 二 负载均衡
    • 2.1 实现
    • 2.2 代码
    • 2.3 启动测试
  • 三 分布式ID
    • 3.1 代码实现
    • 3.2 效果
  • 四 服务器集群监控
  • 五 分布式锁
    • 2.1 排他锁
    • 2.2 共享锁

  • 官网:Apache ZooKeeper

一 数据发布/订阅

数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。

1.1 配置变更

  • 假设我们现在有三个服务节点,一个配置中心,两个服务器中心

image.png

  • 依赖
<dependencies>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>4.0.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>4.0.0</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.8.0-beta4</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.8.0-beta4</version>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
</dependencies>
  • 大体过程:集群中每台机器在启动初始化阶段,首先会从上面提到的ZooKeeper配置节点上读取数据库信息,同时,客户端还需要在该配置节点上注册一个数据变更的Watcher监听,一旦发生节点数据变更,所有订阅的客户端都能够获取到数据变更通知。

1.2 代码实现

  • 配置客户端
package com.shu.registrationcenter;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.InputStream;
import java.util.Properties;
import java.util.Scanner;

/**
 * @author 31380
 * @description
 * @create 2024/3/17 13:52
 */
public class ConfigCenter {
    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String ROOT_PATH_DB = "/app1/configCenter/DbConfig";
    private static final String ROOT_PATH_APPSERVER1 = "/app1/configCenter/Appserver1";
    private static final String ROOT_PATH_APPERVER2 = "/app1/configCenter/Appserver2";
    private static final String DB_CONFIG_PATH = "dbConfig.properties";
    private static final String DB_CONFIG_PATH1 = "db1Config.properties";
    private static final String APPSERVER1_CONFIG_PATH = "appserver1Config.properties";
    private static final String APPSERVER1_CONFIG_PATH1 = "appserver3Config.properties";
    private static final String APPSERVER2_CONFIG_PATH = "appserver2Config.properties";
    private static final String APPSERVER2_CONFIG_PATH2 = "appserver4Config.properties";


    /**
     *  创建数据库配置中心
     * @param path 路径
     * @param configName 配置文件名
     */
    public void createConfigCenter(String path, String configName) {
        // 创建连接
        CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
        .sessionTimeoutMs(SESSION_TIMEOUT)
        .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
        curator.start();
        try {
            // 创建根节点,如果不存在则创建
            if (curator.checkExists().forPath(path) == null) {
                curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
            }
            // 使用 ClassLoader 获取资源文件的输入流
            InputStream inputStream = Appserver1.class.getClassLoader().getResourceAsStream(configName);
            if (inputStream != null) {
                // 加载属性文件
                Properties properties = new Properties();
                properties.load(inputStream);
                // 创建 ObjectMapper 实例
                ObjectMapper objectMapper = new ObjectMapper();
                // 将配置属性转换为 JSON 数据
                String jsonData = objectMapper.writeValueAsString(properties);
                // 写入 JSON 数据到 ZooKeeper 节点
                curator.setData().forPath(path, jsonData.getBytes());
                System.out.println("节点 " + path + " 写入成功!");
                // 关闭输入流
                inputStream.close();
            } else {
                System.out.println("资源文件未找到!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            curator.close();
        }
    }




    /**
     * 更新数据库配置中心
     */
    public void updateConfigCenter(String path, String configName) {
        // 创建连接
        CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
        curator.start();
        try {
            // 使用 ClassLoader 获取资源文件的输入流
            InputStream inputStream = Appserver1.class.getClassLoader().getResourceAsStream(configName);
            if (inputStream != null) {
                // 加载属性文件
                Properties properties = new Properties();
                properties.load(inputStream);
                // 创建 ObjectMapper 实例
                ObjectMapper objectMapper = new ObjectMapper();
                // 将配置属性转换为 JSON 数据
                String jsonData = objectMapper.writeValueAsString(properties);
                // 写入 JSON 数据到 ZooKeeper 节点
                curator.setData().forPath(path, jsonData.getBytes());
                System.out.println("节点 " + path + " 更新成功!");
                // 关闭输入流
                inputStream.close();
            } else {
                System.out.println("资源文件未找到!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            curator.close();
        }
    }


    /**
     * 修改数据
     *
     * @param args
     */


    public static void main(String[] args) {
        ConfigCenter configCenter = new ConfigCenter();
        configCenter.createConfigCenter(ROOT_PATH_DB, DB_CONFIG_PATH);
        configCenter.createConfigCenter(ROOT_PATH_APPSERVER1, APPSERVER1_CONFIG_PATH);
        configCenter.createConfigCenter(ROOT_PATH_APPERVER2, APPSERVER2_CONFIG_PATH);
        // 等待键盘输入,输入任意字符后继续,1.更新数据库配置中心,2.更新 Appserver1 配置中心,3.更新 Appserver2 配置中心
        Scanner scanner = new Scanner(System.in);
        System.out.println("请选择操作:1.更新数据库配置中心,2.更新 Appserver1 配置中心,3.更新 Appserver2 配置中心, 4 退出" );
        int choice = scanner.nextInt();
        switch (choice) {
            case 1:
                configCenter.updateConfigCenter(ROOT_PATH_DB, DB_CONFIG_PATH1);
                break;
            case 2:
                configCenter.updateConfigCenter(ROOT_PATH_APPSERVER1, APPSERVER1_CONFIG_PATH1);
                break;
            case 3:
                configCenter.updateConfigCenter(ROOT_PATH_APPERVER2, APPSERVER2_CONFIG_PATH2);
                break;
            case 4:
                // 关闭 Scanner 对象
                scanner.close();
                break;
            default:
                break;
        }
    }

}

  • AppServer1
package com.shu.registrationcenter;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author 31380
 * @description Appserver1,NodeCache 监听节点数据变化,但是不监听节点的子节点变化,如果需要监听子节点变化,可以使用 PathChildrenCache
 * @create 2024/3/17 14:03
 */
public class Appserver1 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String ROOT_PATH_APPERVER1 = "/app1/configCenter/Appserver1";

    public static void main(String[] args) {
        // 创建连接
        CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
        curator.start();
        try {
            // 创建 NodeCache 监听指定节点
            final NodeCache nodeCache = new NodeCache(curator, ROOT_PATH_APPERVER1);
            // 设置为 true,表示缓存数据
            nodeCache.start(true);
            // 监听节点数据变化
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    if (nodeCache.getCurrentData() != null) {
                        // 获取节点的当前数据
                        String data = new String(nodeCache.getCurrentData().getData());
                        System.out.println("节点数据变化:" + data);
                    } else {
                        System.out.println("节点已删除!");
                    }
                }
            });
            // 读取节点数据
            System.out.println("节点数据:" + new String(nodeCache.getCurrentData().getData()));

            // 为了保持监听线程活动,让程序不退出
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            curator.close();
        }
    }

}

  • Appserver2
package com.shu.registrationcenter;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author 31380
 * @description
 * @create 2024/3/17 14:03
 */
public class Appserver2 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String ROOT_PATH_APPERVER2 = "/app1/configCenter/Appserver2";

    public static void main(String[] args) {
        // 创建连接
        CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
        curator.start();
        try {
            // 创建 NodeCache 监听指定节点
            final NodeCache nodeCache = new NodeCache(curator, ROOT_PATH_APPERVER2);
            // 设置为 true,表示缓存数据
            nodeCache.start(true);
            // 监听节点数据变化
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    if (nodeCache.getCurrentData() != null) {
                        // 获取节点的当前数据
                        String data = new String(nodeCache.getCurrentData().getData());
                        System.out.println("节点数据变化:" + data);
                    } else {
                        System.out.println("节点已删除!");
                    }
                }
            });
            // 读取节点数据
            System.out.println("节点数据:" + new String(nodeCache.getCurrentData().getData()));

            // 为了保持监听线程活动,让程序不退出
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            curator.close();
        }
    }

}

1.3 启动测试

image.png
image.png
image.png
image.png
image.png
image.png

  • 修改数据中心数据

image.png
image.png
image.png
注意:这个这里代码监听中NodeCache只能监听子节点的变化,以上就简单的实现了发布与订阅的功能,但是设计到后面的思考,服务如何平滑过渡?

二 负载均衡

负载均衡(Load Balance)是一种相当常见的计算机网络技术,用来对多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源进行分配负载,以达到优化资源使用、最大化吞吐率、最小化响应时间和避免过载的目的。

2.1 实现

假设现在要我们拥有10台Web服务,1~10,当发生请求时依次提供服务
image.png
image.png

  • 简单介绍一下,首先启动的时候注册10个服务器节点
  • 然后分别监听10个节点的上线情况,当请求来到的时候分别选择在线的节点信息来处理服务

2.2 代码

  • 服务节点
package com.shu.loadbalancing;

import java.io.Serializable;
import java.util.Objects;

/**
 * @author 31380
 * @description  WebNodeData
 * @create 2024/3/17 19:31
 */
public class WebNodeData implements Serializable {
    private static final long serialVersionUID = 4848971284812662834L;

    private String ip;
    private Integer port;
    private Integer weight;
    private String path;


    public WebNodeData() {
    }

    public WebNodeData(String ip, Integer port, Integer weight, String path) {
        this.ip = ip;
        this.port = port;
        this.weight = weight;
        this.path = path;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WebNodeData nodeData = (WebNodeData) o;
        return ip.equals(nodeData.ip) &&
                port.equals(nodeData.port);
    }

    @Override
    public int hashCode() {
        return Objects.hash(ip, port);
    }

    public String getPath() {
        return path;
    }

    public void setPath(String path) {
        this.path = path;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Integer getWeight() {
        return weight;
    }

    public void setWeight(Integer weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "NodeData{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                ", weight=" + weight +
                ", path='" + path + '\'' +
                '}';
    }
}

  • 服务者
package com.shu.loadbalancing;

import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;

/**
 * @author 31380
 * @description
 * @create 2024/3/17 19:33
 */
public class Service {
    protected CuratorFramework client;
    String connectString = "127.0.0.1:2181";
    int sessionTimeout = 5000;
    int connectionTimeout = 5000;

    /**
     * 启动客户端
     */
    public void start() {
        client = CuratorUtils.createCuratorFramework(connectString, sessionTimeout, connectionTimeout);
        client.start();
        System.out.println("客户端启动成功");
    }

    /**
     * 关闭客户端
     */
    public void close() {
        client.close();
        System.out.println("客户端关闭成功");
    }
}

  • Web服务
package com.shu.loadbalancing;

import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
 * @author 31380
 * @description Service
 * @create 2024/3/17 19:36
 */
public class WebService extends Service{
    public final static String SERVICE_PATH = "/web";
    private WebNodeData nodeData;


    public WebService(CuratorFramework client, WebNodeData nodeData) {
        super.client = client;
        this.nodeData = nodeData;
    }


    //节点初始化
    public void init() throws Exception {
        String path = SERVICE_PATH + "/" + nodeData.getPath();
        Stat stat = client.checkExists().forPath(path);
        if (stat == null) {
            String parentpath = CuratorUtils.getparrentpath(path);
            CuratorUtils.createParentPath(parentpath, client);
            client.create().withMode(CreateMode.EPHEMERAL).forPath(path, DataUtil.getBytesFromObject(this.nodeData));
        } else {
            //如果已经存在,则更新数据
            client.setData().forPath(path, DataUtil.getBytesFromObject(this.nodeData));
        }
    }

}

  • 负载均衡服务
package com.shu.loadbalancing;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * @author 31380
 * @description
 * @create 2024/3/17 19:40
 */
public class LoadBalanceService extends Service {

    private List<WebNodeData> nodeDatas = new ArrayList<>(16);

    public LoadBalanceService(CuratorFramework client) {
        super.client = client;
    }

    //拿去已经注册上来的所有节点
    public void init() throws Exception {
        List<String> children = client.getChildren().forPath(WebService.SERVICE_PATH);
        for (String path : children) {
            path = WebService.SERVICE_PATH + "/" + path;
            try {
                byte[] data = client.getData().forPath(path);
                WebNodeData nodeData = (WebNodeData) DataUtil.getObjectFromBytes(data);
                nodeDatas.add(nodeData);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void register() throws Exception {
        PathChildrenCache watcher = new PathChildrenCache(client, WebService.SERVICE_PATH, true/*,false, service*/);
        watcher.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    System.out.println(pathChildrenCacheEvent.getData().getPath() + "上线");
                    //新服务注册
                    WebNodeData data = (WebNodeData) DataUtil.getObjectFromBytes(pathChildrenCacheEvent.getData().getData());
                    nodeDatas.add(data);
                }
                if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    //服务下架或宕机
                    System.out.println(pathChildrenCacheEvent.getData().getPath() + "下线");
                    WebNodeData data = (WebNodeData) DataUtil.getObjectFromBytes(pathChildrenCacheEvent.getData().getData());
                    nodeDatas.remove(data);
                }
            }
        });
        watcher.start(PathChildrenCache.StartMode.NORMAL);
    }


    //负载算法,随机选择当前在线的一台服务
    public WebNodeData loadBalance() {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        WebNodeData result = null;
        if (nodeDatas.isEmpty()) {
            return null;
        }
        synchronized (nodeDatas) {
            if (nodeDatas.isEmpty()) {
                return null;
            }
            int all = 0;
            for (WebNodeData nodeData : nodeDatas) {
                all += nodeData.getWeight();
            }
            int index = random.nextInt(all);
            for (WebNodeData nodeData : nodeDatas) {
                if (index <= nodeData.getWeight()) {
                    result = nodeData;
                    break;
                }
                index -= nodeData.getWeight();
            }
        }
        return result;
    }

}

  • 工具类
package com.shu.loadbalancing;

import java.io.*;

/**
 * @author 31380
 * @description
 * @create 2024/3/17 19:32
 */
public class DataUtil {

    //将object转换为bytes
    public static byte[] getBytesFromObject(Object object) throws IOException {
        ObjectOutputStream out = null;
        ByteArrayOutputStream bos = null;
        try {
//            System.out.println(object);
            bos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(bos);
            out.writeObject(object);
            out.flush();
            byte[] yourBytes = bos.toByteArray();
            return yourBytes;
        } finally {
            if (bos != null) {
                bos.close();
            }
            if (out != null) {
                out.close();
            }
        }
    }

    //
    public static Object getObjectFromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
        ObjectInputStream in = null;
        try {
            in = new ObjectInputStream(new ByteArrayInputStream(bytes));
            return in.readObject();
        } finally {
            if (in != null) {
                in.close();
            }
        }
    }

}

  • 测试
package com.shu.loadbalancing;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;

import java.util.Scanner;

/**
 * @author 31380
 * @description
 * @create 2024/3/17 19:43
 */
public class LoadBalancingTest {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new RetryNTimes(3, 100);
        for(int i = 0; i < 10; i ++){
            CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
                    30*60*1000, 5*1000, retryPolicy);
            WebNodeData nodeData = new WebNodeData("127.0.0.1:2181"+i, 8080, i, "web"+i);
            WebService webService = new WebService(client, nodeData);
            webService.start();
            webService.init();
        }
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
                30*60*1000, 5*1000, retryPolicy);
        LoadBalanceService loadBalanceService = new LoadBalanceService(client);
        loadBalanceService.start();
        loadBalanceService.init();
        loadBalanceService.register();

        while (true){
            Scanner sc = new Scanner( System.in);
            String nextCommand = sc.nextLine();
            WebNodeData nodeData = loadBalanceService.loadBalance();
            System.out.println("本次请求由:" + nodeData.getIp() + ":" + nodeData.getPort() + " 执行");
        }

    }
}

2.3 启动测试

image.png

三 分布式ID

  • 在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条数据库记录生成一个唯一的ID,数据库会保证生成的这个ID在全局唯一。
  • 但是随着数据库数据规模的不断增大,分库分表随之出现,而auto_increment属性仅能针对单一表中的记录自动生成ID,因此在这种情况下,就无法再依靠数据库的auto_increment属性来唯一标识一条记录了。于是,我们必须寻求一种能够在分布式环境下生成全局唯一ID的方法。
  • UUID是一个非常不错的全局唯一ID生成方式,能够非常简便地保证分布式环境中的唯一性。一个标准的UUID是一个包含32位字符和4个短线的字符串,例如“e70f1357-f260-46ff-a32d-53a086c57ade”,但是于生成的字符串过长,浪费空间,含义不明

image.png

3.1 代码实现

package com.shu.id;

import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;

/**
 * @author 31380
 * @description
 * @create 2024/3/20 14:10
 */
public class ZookeeperUniqueIDGenerator {

    private static final String CONNECT_STRING = "localhost:2181"; // ZooKeeper连接字符串
    private static final int SESSION_TIMEOUT = 5000; // 会话超时时间
    private static final String ROOT_PATH = "/jdbs"; // 根节点路径

    CuratorFramework client;


    public ZookeeperUniqueIDGenerator() {
        // 创建连接
        client = CuratorUtils.createCuratorFramework(CONNECT_STRING, SESSION_TIMEOUT, 5000);
        // 启动连接
        client.start();
    }


    /**
     * 生成唯一ID
     * @param path   节点路径
     * @param prefix 节点前缀
     * @return
     */
    public String generate(String path, String prefix) {
        // 指定类型的任务下面通过调用create()接口来创建一个顺序节点,例如创建“job-”节点。
        String fullPath = ROOT_PATH + "/" + path + "/" + prefix + "-";
        try {
            // 创建一个顺序节点
            String createdPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(fullPath);
            // 返回节点名称
            return path+"-"+createdPath.substring(createdPath.lastIndexOf("/") + 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }


    /**
     * 关闭
     */
    public void close() {
        client.close();
    }



    public static void main(String[] args) {
        ZookeeperUniqueIDGenerator zookeeperUniqueIDGenerator = new ZookeeperUniqueIDGenerator();
        // 循环生成10个唯一ID
        for (int i = 0; i < 10; i++) {
            System.out.println(zookeeperUniqueIDGenerator.generate("web", "job"));
        }
        zookeeperUniqueIDGenerator.close();

    }


}

3.2 效果

image.png
image.png

四 服务器集群监控

参考第一个数据发布与订阅

五 分布式锁

  • 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要通过一些互斥手段来防止彼此之间的干扰,以保证一致性,在这种情况下,就需要使用分布式锁了。
  • 在平时的实际项目开发中,我们往往很少会去在意分布式锁,而是依赖于关系型数据库固有的排他性来实现不同进程之间的互斥。这确实是一种非常简便且被广泛使用的分布式锁实现方式。然而有一个不争的事实是,目前绝大多数大型分布式系统的性能瓶颈都集中在数据库操作上。
  • 因此,如果上层业务再给数据库添加一些额外的锁,例如行锁、表锁甚至是繁重的事务处理,那么是不是会让数据库更加不堪重负呢?下面我们来看看使用ZooKeeper如何实现分布式锁,这里主要讲解排他锁和共享锁两类分布式锁。

2.1 排他锁

排他锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取和更新操作,其他任何事务都不能再对这个数据对象进行任何类型的操作——直到T1释放了排他锁
image.png

package com.shu.lock;

import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

/**
 * @author 31380
 * @description
 * @create 2024/3/20 14:59
 */
public class ExclusiveLocksTest {

    private static final String ZK_ADDRESS = "127.0.0.1:2181";
    private static final String LOCK_PATH = "/exclusive_lock";
    // 客户端数量
    private static final int CLIENT_COUNT = 10;

    public static void main(String[] args) {
        for (int i = 0; i < CLIENT_COUNT; i++) {
            new Thread(() -> {
                CuratorFramework client = CuratorUtils.createCuratorFramework(ZK_ADDRESS, 5000, 5000);
                client.start();
                InterProcessMutex locks = new InterProcessMutex(client, LOCK_PATH);
                try {
                    locks.acquire();
                    System.out.println(Thread.currentThread().getName() + "获取到锁");
                    // 模拟业务处理
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + "释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    try {
                        locks.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    client.close();
                }
            }).start();
        }
    }
}

2.2 共享锁

共享锁(Shared Locks,简称S锁),又称为读锁,同样是一种基本的锁类型。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁——直到该数据对象上的所有共享锁都被释放。共享锁和排他锁最根本的区别在于,加上排他锁后,数据对象只对一个事务可见,而加上共享锁后,数据对所有事务都可见。下面我们就来看看如何借助ZooKeeper来实现共享锁。
image.png

package com.shu.lock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.RetryNTimes;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author 31380
 * @description
 * @create 2024/3/20 16:53
 */
public class InterProcessReadWriteLockTest {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static String zkLockPath = "/Aaron/Lock2";

    private static CuratorFramework zkClient;

    public static void init() {
        // 创建客户端
        zkClient = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")    // ZK Server地址信息
                .connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
                .sessionTimeoutMs( 60 * 1000 )  // 会话超时时间: 60s
                // 重试策略: 重试3次, 每次间隔1s
                .retryPolicy(new RetryNTimes(3, 1000))
                .build();
        // 启动客户端
        zkClient.start();
        System.out.println("---------------------- 系统上线 ----------------------");
    }

    /**
     * 测试: 读锁为共享锁
     */
    public void test1Read() {
        System.out.println("\n---------------------- Test 1 : Read ----------------------");
        for(int i=1; i<=3; i++) {
            String taskName = "读任务#"+i;
            Runnable task = new ReadTask(taskName, zkClient, zkLockPath);
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 写锁为互斥锁
     */
    public void test2Write() {
        System.out.println("\n---------------------- Test 2 : Write ----------------------");
        for(int i=1; i<=3; i++) {
            String taskName = "写任务#"+i;
            Runnable task = new WriteTask(taskName, zkClient, zkLockPath);
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 30*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 读写互斥
     */
    public void test2ReadWrite() {
        System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
        for(int i=1; i<=8; i++) {
            Runnable task = null;
            Boolean isReadTask = i%2 == 0;
            if( isReadTask ) {
                task = new ReadTask( "读任务#"+i, zkClient, zkLockPath );
            } else {
                task = new WriteTask( "写任务#"+i, zkClient, zkLockPath );
            }
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
    }

    public static void close() {
        // 关闭客户端
        zkClient.close();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */
    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "+ " <"+ thread +"> " + msg;
        System.out.println(log);
    }

    /**
     * 读任务
     */
    private static class ReadTask implements Runnable {
        private String taskName;

        private InterProcessMutex readLock;

        public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
            this.taskName = taskName;
            InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
            this.readLock = interProcessReadWriteLock.readLock();
        }

        @Override
        public void run() {
            try{
                readLock.acquire();
                info(taskName + ": 成功获取读锁 #1");
                // 模拟业务耗时
                Thread.sleep( RandomUtils.nextLong(100, 500) );
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放读锁 #1");
                try {
                    readLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 写任务
     */
    private static class WriteTask implements Runnable {
        private String taskName;

        private InterProcessMutex writeLock;

        public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
            this.taskName = taskName;
            InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
            this.writeLock = interProcessReadWriteLock.writeLock();
        }

        @Override
        public void run() {
            try{
                writeLock.acquire();
                info(taskName + ": 成功获取写锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100, 500));
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放写锁 #1\n");
                try {
                    writeLock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }



    // 随机数工具类
     static class RandomUtils {
        public static long nextLong(int min, int max) {
            return min + (long)(Math.random() * ((max - min) + 1));
        }
    }


    public static void main(String[] args) {
        InterProcessReadWriteLockTest test = new InterProcessReadWriteLockTest();
        init();
        test.test1Read();
        test.test2Write();
        test.test2ReadWrite();
        close();

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532521.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎样修改grafana的Loading picture和加载的文本

登录装了grafana的linux机器 command “sudo vi /usr/share/grafana/public/views/index.html”&#xff0c;编辑配置文件。 找到.preloader__logo更改background-image. 这里可以是个url也可以是个路径。 如果想要更改加载的文字.可以更改 的内容 改完:wq保存以后退出&…

从键盘到屏幕:C语言中输入输出探秘

在编程中&#xff0c;输入和输出是我们与计算机交流的关键。无论是键盘输入还是屏幕输出&#xff0c;它们贯穿了我们每一行代码的编写。本文将带你深入探索C语言中输入输出的精彩世界&#xff0c;解锁其中的奥秘&#xff0c;助你轻松驾驭键盘和屏幕&#xff01;&#xff08;最后…

C++ List底层实现

文章目录 前言成员变量成员函数迭代器self& operator()前置self operator(int)后置self operator--()前置--self operator--(int)后置--bool operator!(const self & tmp)判断是否相等T* operator*() 解引用操作 list()初始化iterator begin()iterator end()const_iter…

年度告警分类统计

1、打开前端Vue项目kongguan_web&#xff0c;完成前端src/components/echart/YearWarningChart.vue页面设计 在YearWarningChart.vue页面添加div设计 <template><div class"home"><div style"margin: 0px auto;height: 100%"><div …

seleniumUI自动化实例(CSDN发布文章)

1.CSDN登陆成功后&#xff0c;点击发布 源码&#xff1a; #点击首页中的发布按钮 CSDNconf.driver.find_element(By.LINK_TEXT,"发布").click() time.sleep(15) 2.输入标题 #输入文章标题&#xff0c;标题格式“selenium UI自动化测试实例今天的日期” CSDNconf.d…

「数据分析」之零基础入门数据挖掘

摘要&#xff1a;对于数据挖掘项目&#xff0c;本文将学习应该从哪些角度分析数据&#xff1f;如何对数据进行整体把握&#xff0c;如何处理异常值与缺失值&#xff0c;从哪些维度进行特征及预测值分析&#xff1f; 探索性数据分析&#xff08;Exploratory Data Analysis&#…

期刊如何反击一波可疑图像

出版商正在部署基于人工智能的工具来检测可疑图像&#xff0c;但生成式人工智能威胁着他们的努力。 期刊正在努力检测用于分析蛋白质和DNA的凝胶的操纵图像。图片来源&#xff1a;Shutterstock 似乎每个月都会有一系列针对研究人员的新高调指控&#xff0c;这些研究人员的论文…

正则表达式具体用法大全

# 正则表达式&#xff1a; ## 单字符匹配&#xff1a; python # 匹配某个字符串&#xff1a; # text "abc" # ret re.match(b,text) # print(ret.group()) # 点&#xff08;.&#xff09;&#xff1a;匹配任意的字符(除了\n)&#xff1a; # text "\nabc&quo…

《论文阅读》带边界调整的联合约束学习用于情感原因对提取 ACL 2023

《论文阅读》带边界调整的联合约束学习用于情感原因对提取 前言简介Clause EncoderJoint Constrained LearningBoundary Adjusting损失函数前言 亲身阅读感受分享,细节画图解释,再也不用担心看不懂论文啦~ 无抄袭,无复制,纯手工敲击键盘~ 今天为大家带来的是《Joint Cons…

Photoshop 2024让图像处理更智能、更高效@

Photoshop 2024是一款功能强大的图像处理软件&#xff0c;广泛应用于创意设计和图像处理领域。它提供了丰富的绘画和编辑工具&#xff0c;包括画笔、铅笔、颜色替换、混合器画笔等&#xff0c;使用户能够轻松进行图片编辑、合成、校色、抠图等操作&#xff0c;实现各种视觉效果…

如何处理WordPress网站域名循环重定向

我在 HostEase 搭建了一个 WordPress 网站。在访问网站时出现了循环重定向的问题。经检查&#xff0c;发现是我在 .htaccess 文件中设置的重定向规则导致的。 重定向循环通常指的是一个网页或者URL地址在不断地进行重定向&#xff0c;最终形成一个循环&#xff0c;导致网页无法…

Monaco Editor系列(一)启动项目与入门示例解析

前言&#xff1a;作为一名程序员&#xff0c;我们工作中的每一天都在与代码编辑器打交道&#xff0c;相信各位前端程序员对 VS Code 一定都不陌生&#xff0c;VS Code 可以为我们提供代码高亮、代码对比等等功能&#xff0c;让我们在开发的时候&#xff0c;不需要对着暗淡无光的…

FreeCAD傻瓜教程之创建参数化几何图形-螺旋体、平面、球体、椭球体、圆柱体、圆锥体、棱柱、椭圆

目的&#xff1a;学会用FreeCAD绘制参数化的几何图形。 一、使用的工作台和工具 1.1选择Part 工作台 1.2单击创建图元...工具 也就是上图黄色工具区域的倒数第2个 1.3 打开几何图元 下方的下拉列表 二、绘制螺旋体、弹簧、螺丝杆 2.1 选择几何图元列表中的 “螺旋体” 设…

opengl日记10-opengl使用多个纹理示例

文章目录 环境代码CMakeLists.txt文件内容不变。fragmentShaderSource.fsvertexShaderSource.vsmain.cpp 总结 环境 系统&#xff1a;ubuntu20.04opengl版本&#xff1a;4.6glfw版本&#xff1a;3.3glad版本&#xff1a;4.6cmake版本&#xff1a;3.16.3gcc版本&#xff1a;10.…

常见分布式ID解决方案

简介&#xff1a; 分布式ID解决方案是用于在分布式系统中生成唯一标识符的方案。常见的分布式ID解决方案可总结为3点&#xff1a;数据库方案、算法方案、开源组件方案。 分布式ID 分布式 ID&#xff08;Distributed ID&#xff09;是指在分布式系统中生成全局唯一的标识符&…

10000字!一文学会SQL数据分析

文章来源于山有木兮 原文链接&#xff1a;https://edu.cda.cn/goods/show/3412?targetId5695&preview0 第1节 SQL简介与基础知识 做数据分析的&#xff0c;为什么要写SQL&#xff1f; 没有数据的情况下&#xff0c;我们分析数据就像是巧妇难为无米之炊。因此&#xff0c…

【prometheus-operator】k8s监控redis

1、准备exporter https://github.com/oliver006/redis_exporter oliver006-redis_exporter-amd64.tar # 安装镜像 docker load -i oliver006-redis_exporter-amd64.tar # 上传镜像 docker tag oliver006/redis_exporter ip/monitor/redis_exporter:latest docker push ip/mo…

零基础入门数据挖掘系列之「建模调参」

摘要&#xff1a;对于数据挖掘项目&#xff0c;本文将学习如何建模调参&#xff1f;从简单的模型开始&#xff0c;如何去建立一个模型&#xff1b;如何进行交叉验证&#xff1b;如何调节参数优化等。 建模调参&#xff1a;特征工程也好&#xff0c;数据清洗也罢&#xff0c;都是…

强大的文本编辑器:Sublime Text for Mac注册激活版

Sublime Text for Mac是一款功能强大的文本编辑器&#xff0c;特别适合程序员和开发者使用。它提供了丰富的功能&#xff0c;如智能代码补全、语法高亮、自定义快捷键、项目管理、多行选择、自动保存等&#xff0c;以提高代码编写效率和舒适度。此外&#xff0c;Sublime Text还…

【鸿蒙HarmonyOS开发笔记】通知模块之发布基础类型通知,内含如何将图片变成PixelMap对象

通知简介 应用可以通过通知接口发送通知消息&#xff0c;终端用户可以通过通知栏查看通知内容&#xff0c;也可以点击通知来打开应用。 通知常见的使用场景&#xff1a; 显示接收到的短消息、即时消息等。 显示应用的推送消息&#xff0c;如广告、版本更新等。 显示当前正…