zookeeper源码分析之事务请求处理

news2025/1/6 17:02:38

一.参考

zookeeper启动和选举的源码分析参考之前的帖子.

二.源码

1.职责链模式.

每次经过的processor都是异步处理,加入当前processor的队列,然后新的线程从队列里面取出数据处理.

PrepRequestProcessor 检查ACL权限,创建ChangeRecord.

SyncRequestProcessor 负责事务日志的持久化,写入snapshot和log文件.

FinalRequestProcessor 提交leader事务.向observer发送请求,提交事务.

1.1如果是leader启动的时候会倒序初始化下面6个processor,

LeaderRequestProcessor->PrepRequestProcessor->ProposalRequestProcessor(含有SyncRequestProcessor的成员变量,processRequest中先调用下一个,再调用SyncRequestProcessor)->CommitProcessor->ToBeAppliedRequestProcessor->FinalRequestProcessor.

 protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
}

1.2.如果是follower启动,会进入ZooKeeperServer#startupWithServerState倒序初始化下面三个processor,

FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor.

另外两个,是在Follower的主循环中,接收到PROPOSAL请求时,会依次调用SyncRequestProcessor->SendAckRequestProcessor,在下面二.2中有详细分析.

  protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
        syncProcessor.start();
    }

2.处理网络请求

如果是follower,栈如下图所示,进入NIOServerCnxnFactory.SelectorThread#run,提交到RequestThrottler#submittedRequests队列里面,开始职责链处理.

3.ceate命令栈

发起creat /test2命令时,如果是leader,栈如下图所示:

如果是follower角色,栈如下所示:

4.主循环处理create

如果是follower,进入QuorumPeer#run方法,这个是follower角色的死循环主方法,进入Follower#followLeader方法,代码如下:

void followLeader() throws InterruptedException {
        //xxx

        try {
            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
            //启动过程中,读取的选票中获取leader.
            QuorumServer leaderServer = findLeader();
            try {
                //连接leader
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                connectionTime = System.currentTimeMillis();
                //向leader中注册自己follower的存在
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange()) {
                    throw new Exception("learned about role change");
                }
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                //leader 代数比自己小,抛异常,正常不会这样.
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch "
                              + ZxidUtils.zxidToString(newEpochZxid)
                              + " is less than our accepted epoch "
                              + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                long startTime = Time.currentElapsedTime();
                try {
                    self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                    //向leader同步数据,主要是拉取leader比较新的事务日志
                    syncWithLeader(newEpochZxid);
                    self.setZabState(QuorumPeer.ZabState.BROADCAST);
                    completedSync = true;
                } finally {
                    long syncTime = Time.currentElapsedTime() - startTime;
                    ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
                }
                if (self.getObserverMasterPort() > 0) {
                    LOG.info("Starting ObserverMaster");

                    om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                    om.start();
                } else {
                    om = null;
                }
                // create a reusable packet to reduce gc impact
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    //处理客户端请求,在后面2处分析
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                closeSocket();

                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
           //xxx
        }
    }

5.进入职责链

依次进入Follower#processPacket(case Leader.PROPOSAL)->FollowerZooKeeperServer#logRequest(类中有SyncRequestProcessor成员引用)->SyncRequestProcessor#processRequest方法,加入到SyncRequestProcessor#queuedRequests队列里面.代码如下.

protected void processPacket(QuorumPacket qp) throws Exception {
        switch (qp.getType()) {
        case Leader.PING:
            ping(qp);
            break;
        case Leader.PROPOSAL:
            //提案处理
           //xxx
            fzk.logRequest(hdr, txn, digest);
            //xxx
            //xxx
        }
}

 public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        request.setTxnDigest(digest);
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
        syncProcessor.processRequest(request);
}

6.SyncRequestProcessor队列处理

进入新线程的方法SyncRequestProcessor#run.代码如下所示:

public void run() {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());

                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                //从队列取出请求
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                    flush();
                    si = queuedRequests.take();
                }

                if (si == REQUEST_OF_DEATH) {
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                // track the number of records written to the log
                //写请求添加到DataTree数据库内,写日志文件,如下面4代码.
                if (zks.getZKDatabase().append(si)) {
                    //判断是否需要重新建立快照
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // roll the log
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        //启动新线程,创建新的快照.见下面代码5
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                if (shouldFlush()) {
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

7.写事务日志

进入FileTxnLog#append()写事务日志到文件,如下代码.

  public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
        if (hdr == null) {
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn(
                "Current zxid {} is <= {} for {}",
                hdr.getZxid(),
                lastZxidSeen,
                hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream == null) {
            LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));

            //创建日志文件log-xxx
            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            logStream.flush();
            filePadding.setCurrentSize(fos.getChannel().position());
            streamsToFlush.add(fos);
        }
        filePadding.padFile(fos.getChannel());
       //事务写到buf内.
        byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();
      //buf写到crc内,写到文件
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        Util.writeTxnBytes(oa, buf);

        return true;
    }

8.写事务日志文件

建立事务日志的快照,从上面的3过来.依次进入ZooKeeperServer#takeSnapshot()->FileTxnSnapLog#save(创建snapshot文件)->FileSnap#serialize()->SerializeUtils#serializeSnapshot->DataTree#serialize->DataTree#serializeNode.主要是向文件流中写入各种数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2055663.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用Python更改Word文档文本的字体

更改文字字体是编辑和美化Word文档时的一项常见需求&#xff0c;使用合适的字体不仅可以提升文档的整体视觉效果&#xff0c;还能突显关键信息&#xff0c;使得内容更加突出。然而&#xff0c;手动更改每一个文字的字体既繁琐又费时。因此&#xff0c;掌握一种高效的方法来自动…

verilog当中仿真的时候赋值用=还是<=

【总结&#xff1a;<是所有数据同时变化&#xff0c;而是顺序执行。如果是左边数据的赋值都是不关联的&#xff0c;那么就可以用<&#xff0c;使所有赋值同时有效。如果有a和b都需要赋值&#xff0c;且b的值跟a有关的同时还需要一起变化&#xff0c;那么可以用】 一般情…

C++ UML 使用 doxygen 生成源码类图

一&#xff1a;安装 sudo apt install doxygensudo apt install doxygen-guisudo apt install graphviz graphviz-doc 二&#xff1a;使用 命令行输入 doxywizard&#xff0c;打开图形界面 参考&#xff1a; https://www.cnblogs.com/guohaoblog/p/15151353.html

探秘:哪些项目需要代理IP来助力?

网络爬虫&#xff1a;数据采集的秘密武器 网络爬虫是互联网世界中的“数据矿工”&#xff0c;它们在网络中不断爬行&#xff0c;采集各种有用的信息。然而&#xff0c;频繁的访问请求很容易引起目标网站的警觉&#xff0c;甚至被封禁。此时&#xff0c;代理IP就像是爬虫的“隐…

Springboot发邮件功能如何实现?详细步骤?

Springboot发邮件配置指南&#xff1f;如何集成Spring Mail发邮件&#xff1f; 无论是用户注册、密码重置还是重要通知的发送&#xff0c;邮件都是不可或缺的沟通方式。Springboot作为一个流行的Java开发框架&#xff0c;提供了简洁易用的方式来实现邮件功能。AokSend将详细探…

LLM的一些基础知识:参数和内存估计

介绍&#xff1a; 基于 Transformer 架构的大型语言模型 (LLM) 已变得越来越普遍。例如&#xff0c;Mistral AI 团队推出了Mistral 7B 模型。了解其推理、微调和训练的内存需求对于高效部署和利用至关重要。 总结&#xff1a; 推理内存估算&#xff1a;对于 70 亿参数模型 (…

安装docker 遇到异常Could not resolve host: mirrorlist.centos.org; 未知的错误

问题 安装docker 遇到异常 Could not retrieve mirrorlist http://mirrorlist.centos.org/?release7&archx86_64&repoos&infrastock error was 14: curl#6 - “Could not resolve host: mirrorlist.centos.org; 未知的错误” 1、安装Docker依赖包 yum install …

机器学习第十二章-计算学习理论

目录 12.1基础知识 12.2 PAC学习 12.3有限假设空间 12.3.1可分情形 12.3.2不可分情形 12.4VC维 12.5 Rademacher复杂度 12.1基础知识 计算学习理论研究的是关于通过"计算"来进行"学习"的理论&#xff0c;即关于机器学习的理论基础&#xff0c;其目的…

数据分析实操案例分享:制造企业如何对订单数据进行BI分析?

提到数据分析&#xff0c;大家可能首先想到的是它在零售行业的应用&#xff0c;它能够助力商家实现精准营销&#xff0c;从而带来盈利。 事实上&#xff0c;数据分析的重要性已经扩展到制造业&#xff0c;它在该行业中的作用日益凸显。它能够帮助生产管理者迅速识别异常&#…

零基础也能看懂的五大网络安全技术,学网络安全真的可以很简单

网络安全技术是保护网络不受未经授权访问、破坏或盗取信息的重要手段。以下是五种零基础也能看懂的网络安全技术&#xff1a; 1.防火墙技术&#xff1a;防火墙是一种网络安全设备&#xff0c;用于监控和控制进入或离开网络的流量。它可以识别不安全的数据包&#xff0c;并阻止…

使用vs配置opencv环境(属性表方法)

opencv官网&#xff1a;https://opencv.org/releases/ 老手回忆&#xff08;新建属性表&#xff09; Step1: 安装VS&#xff0c;安装openCV Step2: 新建项目&#xff0c;新建项目属性表&#xff0c;debug|x64新建属性&#xff0c;命好名字 Step3: VC目录-包含目录中添加: 安装…

How can i wait for the actual reply of an openai-assistant? (Python OpenAI API)

题意&#xff1a;如何等待 OpenAI 助手的实际回复&#xff1f;&#xff08;Python OpenAI API&#xff09; 问题背景&#xff1a; I am interacting with the openai assistant API (python). so far, it works well. but sometimes the api returns the message i sent to th…

算法的学习笔记—包含 min 函数的栈(牛客JZ30)

&#x1f600;前言 在日常编程中&#xff0c;栈是一种常见的数据结构&#xff0c;具有后进先出的特点。它支持基本的操作如 push&#xff08;入栈&#xff09;、pop&#xff08;出栈&#xff09;和 top&#xff08;获取栈顶元素&#xff09;。然而&#xff0c;当需要在栈中快速…

linux容器基础-namespace-1(mnt)

mnt namespace mount namespace可隔离出一个具有独立挂载点信息的运行环境&#xff0c;内核知道如何去维护每个namespace的挂载点列表。 即每个namespace之间的挂载点列表是独立的&#xff0c;各自挂载互不影响。(用户通常使用mount命令来挂载普通文件系统&#xff0c;但实际…

JAVA单个商户多个门店点餐系统小程序源码

&#x1f525;单个商户&#xff0c;多店管理新纪元&#xff01;高效点餐系统大揭秘&#x1f37d;️ &#x1f680;【一店多管&#xff0c;轻松驾驭】&#x1f680; 你是否还在为多个门店的点餐管理手忙脚乱&#xff1f;&#x1f92f; 想象一下&#xff0c;从繁华都市的中心商…

一站解决多域名安全:通配符SSL证书的全面指南

随着企业在线业务的不断扩展&#xff0c;拥有多个子域名变得越来越常见。为了确保这些子域名的安全&#xff0c;并简化管理流程&#xff0c;通配符SSL证书成为了一个高效且经济的选择。本文旨在提供一份全面的指南&#xff0c;帮助您了解通配符SSL证书的重要性和如何正确地选择…

用AI也能做短剧了?全球首款 AI 短剧平台 SkyReels重磅发布!

短剧的风头在2024年依然强劲。 几分钟到十几分钟的单集时长、强烈的娱乐性和快节奏剧情&#xff0c;频繁踩中用户「爽点」&#xff0c;仅在2023年&#xff0c;短剧市场规模就达到373.9亿元&#xff0c;预计2024年将超过500亿元&#xff0c;2027年则将超过1000亿元。 然而传统编…

ChatGPT完成论文润写的几点说明

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 论文润写是指对已完成的论文进行修改和优化&#xff0c;以提高其逻辑性、语言流畅度和整体质量。这一过程对提升论文的学术价值至关重要。ChatGPT是一种先进的人工智能语言模型&#xf…

[图解]片段16 ESS状态机图-SysMLEA建模住宅安全系统

1 00:00:00,220 --> 00:00:03,580 然后我们看初始这里 2 00:00:03,590 --> 00:00:09,500 有一个指向它的一个迁移的事件 3 00:00:09,710 --> 00:00:13,730 站点可用&#xff0c;这个实际上是错误的 4 00:00:14,020 --> 00:00:15,050 这不是事件 5 00:00:15,900…

绝密!头部数字人源码厂商在数字人直播软件开发技术有哪些?

随着数字人直播的逐渐兴起&#xff0c;数字人直播软件开发也成为了众多有意向进军数字人行业的创业者们重点关注的对象&#xff0c;以数字人直播软件开发技术有哪些为代表的问题更是屡次冲上相关社群话题榜的前列&#xff0c;并引发多轮热议。 不过&#xff0c;就市场现状来看&…