Zookeeper分布式队列实战

news2025/1/8 4:39:08

目录

Zookeeper分布式队列

普通方式实现

设计思路

具体实现

使用Curator实现

具体实现

注意事项


Zookeeper分布式队列

       常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中是比较好用的。

普通方式实现

设计思路

     

1.创建队列根节点
       在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。
2.实现入队操作
       当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。
3.实现出队操作
       当需要从队列中取出一个元素时,先获取根节点下的所有子节点。再找到具有最小序号的子节点,获取该节点的数据,删除该节点,然后返回节点的数据。

具体实现
/**
 * 入队
 * @param data
 * @throws Exception
 */
public void enqueue(String data) throws Exception {
    // 创建临时有序子节点
    zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),
            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}

/**
 * 出队
 * @return
 * @throws Exception
 */
public String dequeue() throws Exception {
    while (true) {
        List<String> children = zk.getChildren(QUEUE_ROOT, false);
        if (children.isEmpty()) {
            return null;
        }

        Collections.sort(children);

        for (String child : children) {
            String childPath = QUEUE_ROOT + "/" + child;
            try {
                byte[] data = zk.getData(childPath, false, null);
                zk.delete(childPath, -1);
                return new String(data, StandardCharsets.UTF_8);
            } catch (KeeperException.NoNodeException e) {
                // 节点已被其他消费者删除,尝试下一个节点
            }
        }
    }
}

使用Curator实现

Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

具体实现
public class CuratorDistributedQueueDemo {
    private static final String QUEUE_ROOT = "/curator_distributed_queue";

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
                new ExponentialBackoffRetry(1000, 3));
        client.start();

        // 定义队列序列化和反序列化
        QueueSerializer<String> serializer = new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };

        // 定义队列消费者
        QueueConsumer<String> consumer = new QueueConsumer<String>() {
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息: " + message);
            }

            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {

            }
        };

        // 创建分布式队列
        DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT)
                .buildQueue();
        queue.start();

        // 生产消息
        for (int i = 0; i < 5; i++) {
            String message = "Task-" + i;
            System.out.println("生产消息: " + message);
            queue.put(message);
            Thread.sleep(1000);
        }

        Thread.sleep(10000);
        queue.close();
        client.close();
    }
}
注意事项

       使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题的影响。

       在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。分布式环境中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱。如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁。

// 创建分布式队列
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order");
//指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
queue = builder.lockPath("/orderlock").buildQueue();
//启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1424303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Markdown写作的魔力

今年的年终总结报告,我是用Markdown写的,只花了大概2.5天的时间,包括统计任务数据,时效,总结成果,挖掘不足,提出改善措施和计划。 将全部文字内容的.md文档,导出为word,然后用了做PPT的AI,设计了PPT,再修改了半天,就完成了。 上周为两个代码工程,用Markdown写了r…

Kubernetes operator(五)api 和 apimachinery 篇【更新中】

云原生学习路线导航页&#xff08;持续更新中&#xff09; 本文是 Kubernetes operator学习 系列第五篇&#xff0c;主要对 k8s.io/api 和 k8s.io/apimachinery 两个项目 进行学习Kubernetes operator学习系列 快捷链接 Kubernetes operator&#xff08;一&#xff09;client-g…

PyCharm / DataSpell 导入WSL2 解析器,实现GPU加速

PyCharm / DataSpell 导入WSL2 解析器的实现 Windows的解析器不好么&#xff1f;设置WSL2和实现GPU加速为PyCharm / DataSpell 设置WSL解析器设置Interpreter Windows的解析器不好么&#xff1f; Windows上的解析器的确很方便&#xff0c;也省去了我们很多的麻烦。但是WSL2的解…

Ubuntu-22.04上ToDest设置开机不弹出图形界面

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、开始操作1.设置图形端 总结 前言 有时候远程成为开发必不可少的工具&#xff0c;目前国内有很多相关的软件&#xff0c;比较有名的是向日葵、ToDesk、Rust…

Docker基础(持续更新中)

# 第1步&#xff0c;去DockerHub查看nginx镜像仓库及相关信息# 第2步&#xff0c;拉取Nginx镜像 docker pull nginx# 第3步&#xff0c;查看镜像 docker images # 结果如下&#xff1a; REPOSITORY TAG IMAGE ID CREATED SIZE nginx latest 60…

黑马程序员前端web入门:新浪新闻

黑马程序员前端web入门&#xff1a;新浪新闻 几点学习到的&#xff1a; 设置li无圆点: list-style: none;设置a无下划线&#xff1a;text-decoration: none;a属于行内元素&#xff0c;高度hegiht不起作用&#xff0c;可以设置 display: block; 把它变成块元素。此时&#xff0c…

20240131在WIN10下配置whisper

20240131在WIN10下配置whisper 2024/1/31 18:25 首先你要有一张NVIDIA的显卡&#xff0c;比如我用的PDD拼多多的二手GTX1080显卡。【并且极其可能是矿卡&#xff01;】800&#xffe5; 2、请正确安装好NVIDIA最新的545版本的驱动程序和CUDA。 2、安装Torch 3、配置whisper http…

【简便方法和积累】pytest 单元测试框架中便捷安装插件和执行问题

又来进步一点点~~~ 背景&#xff1a;之前写了两篇关于pytest单元测试框架的文章&#xff0c;本篇内容对之前的做一个补充 一、pytest插件&#xff1a; pytest 有非常多的插件&#xff0c;很方便&#xff0c;以下为插件举例&#xff1a; pytest&#xff0c;pytest-html&#x…

leetcode刷题(剑指offer) 19.删除链表的倒数第N个节点

19.删除链表的倒数第N个节点 给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5]示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&am…

双异步系列完结撒花,如何解決异步事务问题?

目录 一、前情提要原始需求&#xff1a;读取一个10万行的Excel优化1&#xff1a;[**使用双异步后&#xff0c;从 191s 优化到 2s**](https://blog.csdn.net/guorui_java/article/details/135143234)优化2&#xff1a;[**使用双异步后&#xff0c;如何保证数据一致性&#xff1f…

ClickHouse为什么这么快(二)SSE指令优化

上一篇 ClickHouse为什么这么快&#xff08;一&#xff09;减少数据扫描范围 我们说到了ClickHouse中使用列存储&#xff0c;每个列都单独存储为一个文件&#xff0c;每个文件都是由一个或多个数据块组成&#xff0c;也就是说&#xff1a;每个文件由一个或多个数组组成&#xf…

Python限定符

在正则表达式中&#xff0c;限定符用于指定模式的匹配次数或匹配范围。在Python中&#xff0c;正则表达式模块re提供了多种不同的限定符&#xff0c;以实现更灵活和精确的匹配操作。熟悉并理解Python中的限定符对于处理文本和字符串数据非常重要。本文将详细介绍Python中常见的…

DNS配置文件讲解

1. 概述 BIND&#xff1a;Berkeley Internet Name Domain &#xff0c;伯克利因特网域名解析服务是一种全球使用最广泛的、 最高效的、最安全的域名解析服务程序 2. 安装软件 [rootserver ~]# yum install bind -y 3. bind服务中三个关键文件 /etc/named.conf : 主配置文件…

Histone H3K27ac Antibody, SNAP-ChIP® Certified

EpiCypher是一家为表观遗传学和染色质生物学研究提供高质量试剂和工具的专业制造商。EpiCypher&#xff08;国内代理商欣博盛生物&#xff09;推出的ChIP级别的Histone H3K27ac Antibody符合EpiCypher的“SNAP-ChIP Certified”标准&#xff0c;用于ChIP实验中的特异性和有效靶…

Orion-14B-Chat-RAG本地部署的解决方案

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

刷存在感,Excel转Protobuf/Json通用配置文件

使用场景 最近工作流中有将Excel转Protobuf作为配置文件的技术方案。具体实现是先定一个proto文件&#xff0c;再在一个对应excel表中定义对应字段&#xff0c;由策划在excel进行更改。proto文件可以生成对应语言的脚本&#xff0c;然后将excel转成对应protobuf的binary。 我…

【Tomcat与网络9】提高Tomcat启动速度的八大措施

本文我们来看一下如何对Tomcat进行调优&#xff0c;我们对于Tomcat的调优主要集中在三个方面&#xff1a;提高启动速度、提高系统稳定性和提高并发能力&#xff0c;后两者很多时候是相辅相成的&#xff0c;我们放在一起看。 Tomcat现在一般都嵌入在SpringBoot里&#xff0c;因…

又涨又跌 近期现货黄金价格波动怎么看?

踏入2024年一月的下旬&#xff0c;现货黄金价格可以说没了之前火热的状态&#xff0c;盘面上是又涨又跌。面对这样的行情&#xff0c;很多投资者不知道如何看了。下面我们就来讨论一下怎么把握近期的行情。 先区分走势类型。在现货黄金市场中有两种主要的走势类型&#xff0c;一…

千兆电口模块和万兆电口模块:网络速度的演变

随着信息技术的迅猛发展&#xff0c;网络通信技术也在不断进步。在过去的几十年中&#xff0c;以太网的速度发生了巨大的变化&#xff0c;从最初的百兆以太网&#xff0c;到如今的千兆以太网和万兆以太网甚至40G、100G以太网满足了大数据、云计算、人工智能等新兴应用的需求。在…

Golang语言异常机制解析:错误策略与优雅处理

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站https://www.captainbed.cn/kitie。 前言 作为开发者来说&#xff0c;我们没办法保证程序在运行过程中永远不会出现异常&#xff0c;对于异常…