Zookeeper 官方示例2-SyncPrimitive 代码解读(二)

news2024/12/28 13:17:48

测试命令
java jar .\ZookeeperDemo-0.0.1-SNAPSHOT.jar bTest 192.168.206.100:2181 2

1. Barrier(阻塞原语)

1.1 概念

[!quote] A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.

  • 阻塞是一个原语,它使一组进程能够同时开始计算。此实现的总体思想是拥有一个屏障节点,用于作为单个流程节点的父节点。
  • 假设我们将障碍节点称为“/b1”。然后,每个进程“ p”创建一个节点“/b1/p”。一旦有足够多的进程创建了相应的节点,联合进程就可以开始计算了。
  • 场景:当有些操作需要所有参与者全部准备好之后才能开始执行,并且对每个参与者来说必须等待所有参与者全部执行完毕,才算执行完毕。于是就需要一个屏障,来控制所有参与者同时开始,并等待所有参与者全部结束。

1.2 设计

  • 创建一个/b1的znode的持久化节点。
  • enter() 模拟往阻塞里增加执行进程(Join barrier)。往znode下增加子节点,并判断子节点数是否满足指定的个数n。若未满足条件则继续等待;反之则返回true。
  • leave() 模拟进程执行完毕后的离开(Wait until all reach barrier)。删除znode的子节点,并判断子节点是否大于0,若大于0则表示还有子进程没有执行完。

源码:

package com.agileluo.zookeeperdemo.barriers;  
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.nio.ByteBuffer;  
import java.util.List;  
import java.util.Random;  
import java.lang.Integer;  
import org.apache.commons.lang3.RandomStringUtils;  
import org.apache.zookeeper.CreateMode;  
import org.apache.zookeeper.KeeperException;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher;  
import org.apache.zookeeper.ZooKeeper;  
import org.apache.zookeeper.ZooDefs.Ids;  
import org.apache.zookeeper.data.Stat;  
  
/**  
 * 1. Queue test * 1.1 Start a producer to create 100 elements *    java SyncPrimitive qTest localhost 100 p * 1.2 Start a consumer to consume 100 elements *    java SyncPrimitive qTest localhost 100 c * * 2.Barrier test * Start a barrier with 2 participants (start as many times as many participants you'd like to enter) *    java SyncPrimitive bTest localhost 2 */public class SyncPrimitive implements Watcher {  
  
    static ZooKeeper zk = null;  
    static Integer mutex;  
    String root;  
  
    static{  
        System.setProperty("zookeeper.sasl.client", "false");  
    }  
  
    SyncPrimitive(String address) {  
        if(zk == null){  
            try {  
                System.out.println("Starting ZK:");  
                zk = new ZooKeeper(address, 3000, this);  
                mutex = Integer.parseInt("-1");  
                System.out.println("Finished starting ZK: " + zk);  
            } catch (IOException e) {  
                System.out.println(e.toString());  
                zk = null;  
            }  
        }  
        //else mutex = new Integer(-1);  
    }  
  
    synchronized public void process(WatchedEvent event) {  
        synchronized (mutex) {  
            //System.out.println("Process: " + event.getType());  
            mutex.notify();  
        }  
    }  
  
    /**  
     * Barrier(阻塞原语)  
     *  A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to  
     *  have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process  
     *  "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.  
     *  阻塞是一个原语,它使一组进程能够同时开始计算。此实现的总体思想是拥有一个屏障节点,用于作为单个流程节点的父节点。  
     *  假设我们将障碍节点称为“/b1”。然后,每个进程“ p”创建一个节点“/b1/p”。一旦有足够多的进程创建了相应的节点,联合进程就可以开始计算了。  
     *  场景:当有些操作需要所有参与者全部准备好之后才能开始执行,并且对每个参与者来说必须等待所有参与者全部执行完毕,才算执行完毕。于是就需要一个屏障,来控制所有参与者同时开始,并等待所有参与者全部结束。  
     */  
    static public class Barrier extends SyncPrimitive {  
  
        //需要并行等待的子进程个数  
        int size;  
        /**  
         *  本参与者对应的子节点path  
         */        String name;  
  
        /**  
         * Barrier constructor         *         * @param address  
         * @param root  
         * @param size  
         */  
        Barrier(String address, String root, int size) {  
            super(address);  
            this.root = root;  
            this.size = size;  
  
            // Create barrier node(障碍节点必须是持久节点 CreateMode.PERSISTENT)  
            if (zk != null) {  
                try {  
                    Stat s = zk.exists(root, false);  
                    if (s == null) { // 如果根节点不存在,则创建  
                        /**  
                         *  zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode)                         *  第1个参数: barrier节点的path  
                         *  第2个参数: barrier节点的data  
                         *  第3个参数: barrier节点的权限  
                         *  第4个参数: barrier 节点的类型,持久节点 CreateMode.PERSISTENT,子节点必须是临时节点。  
                         */  
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  
                                CreateMode.PERSISTENT);  
                    }  
                } catch (KeeperException e) {  
                    System.out  
                            .println("Keeper exception when instantiating queue: "  
                                    + e.toString());  
                } catch (InterruptedException e) {  
                    System.out.println("Interrupted exception");  
                }  
            }  
  
            // My node name  
            try {  
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()+ ":"+ RandomStringUtils.randomAlphabetic(4));  
            } catch (UnknownHostException e) {  
                System.out.println(e.toString());  
            }  
  
        }  
  
        /**  
         * Join barrier         *         * @return         * @throws KeeperException  
         * @throws InterruptedException  
         */  
  
        boolean enter() throws KeeperException, InterruptedException{  
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,  
                    CreateMode.EPHEMERAL); // EPHEMERAL 临时节点  
            while (true) {  
                synchronized (mutex) {  
                    List<String> list = zk.getChildren(root, true);  
  
                    if (list.size() < size) { //判断当前根下子节点的数量,若数量小于设定的进程数,则等待。  
                        mutex.wait();  
                    } else {  
                        return true;  
                    }  
                }  
            }  
        }  
  
        /**  
         * Wait until all reach barrier         *         * @return         * @throws KeeperException  
         * @throws InterruptedException  
         */  
        boolean leave() throws KeeperException, InterruptedException{  
            zk.delete(root + "/" + name, 0); //模拟进程完成任务,删除子节点。  
            while (true) {  
                synchronized (mutex) {  
                    List<String> list = zk.getChildren(root, true);  
                    if (list.size() > 0) { //只要还存在子节点,就说明还有任务没有完成。  
                        mutex.wait();  
                    } else {  
                        return true;  
                    }  
                }  
            }  
        }  
    }  
  
    /**  
     * Producer-Consumer queue     */    static public class Queue extends SyncPrimitive {  
  
        /**  
         * Constructor of producer-consumer queue         *         * @param address  
         * @param name  
         */  
        Queue(String address, String name) {  
            super(address);  
            this.root = name;  
            // Create ZK node name  
            if (zk != null) {  
                try {  
                    Stat s = zk.exists(root, false);  
                    if (s == null) {  
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  
                                CreateMode.PERSISTENT);  
                    }  
                } catch (KeeperException e) {  
                    System.out  
                            .println("Keeper exception when instantiating queue: "  
                                    + e.toString());  
                } catch (InterruptedException e) {  
                    System.out.println("Interrupted exception");  
                }  
            }  
        }  
  
        /**  
         * Add element to the queue.         *         * @param i  
         * @return  
         */  
        boolean produce(int i) throws KeeperException, InterruptedException{  
            ByteBuffer b = ByteBuffer.allocate(4);  
            byte[] value;  
  
            // Add child with value i  
            b.putInt(i);  
            value = b.array();  
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,  
                    CreateMode.PERSISTENT_SEQUENTIAL);  
  
            return true;  
        }  
  
        /**  
         * Remove first element from the queue.         *         * @return         * @throws KeeperException  
         * @throws InterruptedException  
         */  
        int consume() throws KeeperException, InterruptedException{  
            int retvalue = -1;  
            Stat stat = null;  
  
            // Get the first element available  
            while (true) {  
                synchronized (mutex) {  
                    List<String> list = zk.getChildren(root, true);  
                    if (list.size() == 0) {  
                        System.out.println("Going to wait");  
                        mutex.wait();  
                    } else {  
                        Integer min = Integer.parseInt((list.get(0).substring(7)));  
                        String minNode = list.get(0);  
                        for(String s : list){  
                            Integer tempValue = Integer.parseInt(s.substring(7));  
                            //System.out.println("Temporary value: " + tempValue);  
                            if(tempValue < min) {  
                                min = tempValue;  
                                minNode = s;  
                            }  
                        }  
                        System.out.println("Temporary value: " + root + "/" + minNode);  
                        byte[] b = zk.getData(root + "/" + minNode,  
                                false, stat);  
                        zk.delete(root + "/" + minNode, 0);  
                        ByteBuffer buffer = ByteBuffer.wrap(b);  
                        retvalue = buffer.getInt();  
  
                        return retvalue;  
                    }  
                }  
            }  
        }  
    }  
  
    public static void main(String args[]) {  
        if (args[0].equals("qTest"))  
            queueTest(args);  
        else  
            barrierTest(args);  
    }  
  
    public static void queueTest(String args[]) {  
        Queue q = new Queue(args[1], "/app1");  
  
        System.out.println("Input: " + args[1]);  
        int i;  
        Integer max = Integer.parseInt(args[2]+"");  
  
        if (args[3].equals("p")) {  
            System.out.println("Producer");  
            for (i = 0; i < max; i++)  
                try{  
                    q.produce(10 + i);  
                } catch (KeeperException e){  
  
                } catch (InterruptedException e){  
  
                }  
        } else {  
            System.out.println("Consumer");  
  
            for (i = 0; i < max; i++) {  
                try{  
                    int r = q.consume();  
                    System.out.println("Item: " + r);  
                } catch (KeeperException e){  
                    i--;  
                } catch (InterruptedException e){  
                }  
            }  
        }  
    }  
  
    public static void barrierTest(String args[]) {  
        Barrier b = new Barrier(args[1], "/b1", Integer.parseInt(args[2]+""));  
        try{  
            boolean flag = b.enter();  
            System.out.println("Entered barrier: " + args[2]);  
            if(!flag) System.out.println("Error when entering the barrier");  
        } catch (KeeperException e){  
        } catch (InterruptedException e){  
        }  
  
        // Generate random integer  
        Random rand = new Random();  
        int r = rand.nextInt(100);  
        // Loop for rand iterations  
        for (int i = 0; i < r; i++) {  
            try {  
                Thread.sleep(100);  
            } catch (InterruptedException e) {  
            }  
        }  
        try{  
            b.leave();  
        } catch (KeeperException e){  
  
        } catch (InterruptedException e){  
  
        }  
        System.out.println("Left barrier");  
    }  
}

1.3 测试步骤

  • 第1步,打包 ZookeeperDemo-0.0.1-SNAPSHOT.jar
<build>  
    <plugins>  
       <plugin>  
          <groupId>org.apache.maven.plugins</groupId>  
          <artifactId>maven-jar-plugin</artifactId>  
          <configuration>  
             <archive>  
                <manifest>  
                   <addClasspath>true</addClasspath>  
                <mainClass>com.xx.zookeeperdemo.barriers.SyncPrimitive</mainClass> 
                </manifest>  
             </archive>  
          </configuration>  
       </plugin>  
    </plugins>  
</build>
  • 第2步,jar包目录下打开命令窗口,并执行 java -jar .\ZookeeperDemo-0.0.1-SNAPSHOT.jar bTest 192.168.206.100:2181 3
    控制台输出:

执行后,查看zookeeper的znode情况:

  • 第3步,复制第2步操作,模拟启动第2个进程
    执行后,查看zookeeper的znode情况:

  • 第4步,复制第2步操作,模拟启动第3个进程
    执行后,第1个控制台输出:

第2个控制台输出:

第3个控制台输出:

然后所有进程在随机的整数时间后输出 Left barrier

查看zookeeper的znode情况: 所有子进程创建的临时子节点都已delete

1.4 结果

能实现多个进程之间的并行协同。

1.5 注意事项

  • 为了方便在同一台IP上模拟不同的进程,在官方提供的代码基础上增加了4位长度的随机字符串。

// 官方示例:
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());

// 新增后的示例
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()+ ":"+ RandomStringUtils.randomAlphabetic(4));

  • 关闭SASL安全验证
static{  
    System.setProperty("zookeeper.sasl.client", "false");  
}

2. 队列

2.1 概念

模拟向同一队列生产/消费消息。

2.2 设计

生产消息: 往znode新增子节点。
消费消息: 往znode中取first子节点,然后删除子节点。

2.3 源码

/**  
 * Producer-Consumer queue */static public class Queue extends SyncPrimitive {  
  
    /**  
     * Constructor of producer-consumer queue     *     * @param address  
     * @param name  
     */  
    Queue(String address, String name) {  
        super(address);  
        this.root = name;  
        // Create ZK node name  
        if (zk != null) {  
            try {  
                Stat s = zk.exists(root, false);  
                if (s == null) {  
                    zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  
                            CreateMode.PERSISTENT);  
                }  
            } catch (KeeperException e) {  
                System.out  
                        .println("Keeper exception when instantiating queue: "  
                                + e.toString());  
            } catch (InterruptedException e) {  
                System.out.println("Interrupted exception");  
            }  
        }  
    }  
  
    /**  
     * Add element to the queue.     *     * @param i  
     * @return  
     */  
    boolean produce(int i) throws KeeperException, InterruptedException{  
        ByteBuffer b = ByteBuffer.allocate(4);  
        byte[] value;  
  
        // Add child with value i  
        b.putInt(i);  
        value = b.array();  
        zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,  
                CreateMode.PERSISTENT_SEQUENTIAL);  
  
        return true;  
    }  
  
    /**  
     * Remove first element from the queue.     *     * @return     * @throws KeeperException  
     * @throws InterruptedException  
     */  
    int consume() throws KeeperException, InterruptedException{  
        int retvalue = -1;  
        Stat stat = null;  
  
        // Get the first element available  
        while (true) {  
            synchronized (mutex) {  
                List<String> list = zk.getChildren(root, true);  
                if (list.size() == 0) {  
                    System.out.println("Going to wait");  
                    mutex.wait();  
                } else {  
                    Integer min = Integer.parseInt((list.get(0).substring(7)));  
                    String minNode = list.get(0);  
                    for(String s : list){  
                        Integer tempValue = Integer.parseInt(s.substring(7));  
                        //System.out.println("Temporary value: " + tempValue);  
                        if(tempValue < min) {  
                            min = tempValue;  
                            minNode = s;  
                        }  
                    }  
                    System.out.println("Temporary value: " + root + "/" + minNode);  
                    byte[] b = zk.getData(root + "/" + minNode,  
                            false, stat);  
                    zk.delete(root + "/" + minNode, 0);  
                    ByteBuffer buffer = ByteBuffer.wrap(b);  
                    retvalue = buffer.getInt();  
  
                    return retvalue;  
                }  
            }  
        }  
    }  
}

2.4 测试

生产消息: java SyncPrimitive qTest 192.168.206.100:2181 100 p
消费消息: java SyncPrimitive qTest 192.168.206.100:2181 100 c

2.5 结论

借助zookeeper实现消息队列的模拟。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2086340.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

猫头虎 分享:Python库 NumPy 的简介、安装、用法详解入门教程

猫头虎 分享&#xff1a;Python库 NumPy 的简介、安装、用法详解入门教程 &#x1f431;&#x1f42f; 摘要 在Python编程领域&#xff0c;特别是人工智能和数据科学方向&#xff0c;NumPy库的重要性不言而喻。 作为一个强大且广泛使用的库&#xff0c;NumPy为我们提供了处理…

WLAN原理实验简述——AP上线

一、需求&#xff1a; AP通过AC上线。 AC通过控制VLAN管理AP,创建VLAN100和放行。 AP同AC建立CAPWAP关系。 二、实验拓扑图&#xff1a; 三、实验步骤&#xff1a; LSW1: sys Enter system view, return user view with CtrlZ. [Huawei]Sysname lsw1 [lsw1]undo info enable I…

vue-ueditor-wrap设置autoHeightEnabled:true无效问题

问题描述 今天小伙伴遇到一个问题&#xff0c;使用vue-ueditor-wrap富文本编辑器&#xff0c;发现设置autoHeightEnabled为true后&#xff0c;对于某些文章&#xff0c;编辑器的高度依然没有按照实际的文章内容高度进行变化&#xff1a; 问题排查 通过调试代码发现是文章html…

C语言实现经典排序算法

1.排序的概念及其运用 1.1排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&#xff1a;假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记…

SAP BW/BPC:实现自动执行BPC跑包程序

作者 idan lian 如需转载备注出处 如果对你有帮助&#xff0c;请点赞收藏~~~ 用途&#xff1a;创建程序&#xff0c;跑BPC包&#xff0c;把数据从BW应用层跑到BPC,程序可放到处理链或自动作业中&#xff0c;实现定时跑包。 1.步骤 首先需要BPC顾问创建一个他们手动执行的包…

数据挖掘之分类算法

分类算法是数据挖掘中常用的一类算法&#xff0c;其主要任务是根据已知的训练数据&#xff08;即带有标签的数据&#xff09;构建模型&#xff0c;然后利用该模型对新的数据进行分类。分类算法广泛应用于金融、医疗、市场营销等领域&#xff0c;用于预测、决策支持等任务。以下…

并查集【算法 12】

并查集 (Union-Find) 的基础概念与实现 并查集&#xff08;Union-Find&#xff09;是一种用于处理不相交集合&#xff08;disjoint sets&#xff09;的数据结构&#xff0c;常用于解决连通性问题。典型的应用场景包括动态连通性问题&#xff08;如网络节点连通性检测&#xff0…

数据库sqlite3

数据库 数组、链表、变量 ----->内存&#xff1a;程序运行结束&#xff0c;掉电数据丢失 文件 ----------------------->硬盘&#xff1a;程序运行结束&#xff0c;掉电数据不丢失 数据库&#xff1a;专业存储数据、大量数据 ----->硬盘 常用数据库&#xff1a; …

linux 如何查看cpu核心数量

在Linux系统中&#xff0c;有多种方法可以查看CPU的核心数量。 一、lscpu lscpu命令是最直接的方法之一&#xff0c;它可以显示CPU架构信息&#xff0c;包括CPU数量、每个CPU的核心数、每个核心的线程数等。要查看CPU核心数量&#xff0c;可以直接查看lscpu命令输出的Core(s) …

力扣面试150 删除排序链表中的重复元素 II 哑兵 双指针

Problem: 82. 删除排序链表中的重复元素 II &#x1f468;‍&#x1f3eb; 灵神题解 Code /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* List…

企业车辆|基于SprinBoot+vue的企业车辆管理系统(源码+数据库+文档)

企业车辆管理系统 基于SprinBootvue的企业车辆管理系统 一、前言 二、系统设计 三、系统功能设计 系统功能实现 后台模块实现 管理员模块实现 驾驶员模块实现 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主…

悬浮翻译软件有哪些?试试这些利器

在观看外国电影或电视剧的奇幻旅程中&#xff0c;面对字幕如流星般划过屏幕&#xff0c;是否渴望能即时捕捉每一个细微的情感涟漪与幽默火花&#xff0c;让体验更加完整无憾&#xff1f; 此刻&#xff0c;无需再为语言障碍而烦恼&#xff01;悬浮翻译器电脑版作为你贴心的跨文…

新买的笔记本只有一个C盘,进行磁盘分区的操作

开始是这样的: 快捷键 window x 找到磁盘管理 102,400M 100GB 然后右键重命名磁盘名字 最终得到结果如下:

SpringBoot+Vue的AI智能图书馆系统来袭!!

SpringBootVue的AI智能图书馆系统来袭&#xff01;&#xff01; 一、项目介绍用户&#xff08;借阅人&#xff09;图书管理员系统管理员 二、相关技术栈三、项目演示管理员登录用户登录 四、相关地址总结 大家好&#xff0c;这里是程序猿代码之路。在数字化时代的浪潮中&#x…

Python办公自动化 获取文本数据 支持多种类型文件

学好办公自动化,走遍天下都不怕&#xff01;&#xff01; 前面我们已经学习了&#xff0c;如何用python的下载安装以及入门基础知识&#xff0c;并且也知道如何使用python自动处理Excel文件数据、如何批量生成Word文件、如何对数据分析后生成洞察报告、如何用python实现自动发送…

【自由能系列(初级)】自由能原理——神经科学的“能量守恒”方程

【通俗理解】自由能原理——神经科学的“能量守恒”方程 关键词提炼 #自由能原理 #KL散度 #生成模型 #识别密度 #观测数据 #神经科学 第一节&#xff1a;自由能原理的类比与核心概念 1.1 自由能原理的类比 自由能原理在神经科学中的应用&#xff0c;可以类比为一个“大脑的…

Java 面试题:HTTP版本演变--xunznux

文章目录 HTTP版本演变HTTP/0.9HTTP/1.0HTTP/1.1新引入&#xff1a;问题&#xff1a;长连接是什么&#xff1a;管道网络传输&#xff1a;队头阻塞是什么&#xff1f;解决http队头阻塞的方法&#xff1a;HTTP1.1常见性能问题为解决HTTP1.1性能问题而提出的常见优化手段 HTTP/21、…

数据库(专业存储数据)

数组、链表、变量----->内存&#xff1a;程序运行结束&#xff0c;数据丢失 文件-------------->硬盘 数据库&#xff1a;专业存储数据&#xff0c;大量数据----------->硬盘 一、数据库文件与普通文件区别: 1.普通文件对数据管理(增刪改查)效率低 2.数据库对数据…

UNI-APP 打包构建 APK

UNI-APP 打包构建 APK 前言一、WINDOWS&#xff08;在线 - 纯命令版&#xff09;依赖其他前置准备实现原理操作步骤 二、WINDOWS&#xff08;离线 - Android Studio 版&#xff09;依赖&#xff08;首次构建需要联网安装依赖&#xff09;其他前置准备实现原理操作步骤 三、WIND…

【QT】学习笔记:处理数据库 SQLite

在 Qt 中使用 SQLite 数据库非常简单&#xff0c;Qt 提供了 QSqlDatabase 和 QSqlQuery 类来处理数据库的连接、查询、插入、更新和删除等操作。下面是一个示例程序&#xff0c;展示如何在 Qt 中使用 SQLite 数据库。 示例代码 1. 项目配置 首先&#xff0c;确保在项目的 .p…