Zookeeper源码分析——Follower和Leader状态同步源码

news2025/1/23 10:39:23

Follower和Leader状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的Leader更新自己状态为 Leader,其他节点更新自己状态为 Follower

Leader更新状态入口: leader.lead()
Follower更新状态入口: follower.followerLeader()

  1. follower必须要让 leader知道自己的状态: epoch、 zxid、 sid

必须要找出谁是leader
发起请求连接leader
发送自己的信息给leader

leader接收到信息,必须要返回对应 的信息给 follower

  1. 当leader得知 follower的状态了,就确定需要做何种方式的数据同步 DIFF、 TRUNC、SNAP

  2. 执行数据同步

  3. 当 leader接收到超过半数 follower的 ack之后,进入正常工作状态,集群启动完成了

最终总结同步的方式:

  1. DIFF咱两一样,不需要做什么
  2. TRUNC follower的 zxid比 leader的 zxid大,所以 Follower要回滚
  3. COMMIT leader的 zxid比 follower的 zxid大,发送 Proposal给 foloower提交执行
  4. 如果 follower并没有任何数据,直接使用 SNAP的方式来执行数据同步(直接把数据全部序列到 follower)

Follower和Leader状态同步源码解析

在这里插入图片描述

在这里插入图片描述

Leader.lead()等待接收follower 的状态同步申请

在Leader.java 种查找lead()方法

void lead() throws IOException, InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
                QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;

        zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

        try {
            self.tick.set(0);
            // 恢复数据到内存,启动时,其实已经加载过了
            zk.loadData();

            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from
            // new followers.
            // 等待其他 follower节点向 leader节点发送同步状态
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();

            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

            ...
        } finally {
            zk.unregisterJMX(this);
        }
    }


class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
        private volatile boolean stop = false;

        public LearnerCnxAcceptor() {
            super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
                    .getZooKeeperServerListener());
        }

        @Override
        public void run() {
            try {
                while (!stop) {
                    Socket s = null;
                    boolean error = false;
                    try {
                        // 等待接收 follower的状态同步申请
                        s = ss.accept();

                        // start with the initLimit, once the ack is processed
                        // in LearnerHandler switch to the syncLimit
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);
						
                        BufferedInputStream is = new BufferedInputStream(
                                s.getInputStream());
                        // 一旦接收到 follower的请求,就创建 LearnerHandler对象,处理请求
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        
                        // 启动线程
                        fh.start();
                    } 
                    ...
        }

        public void halt() {
            stop = true;
        }
    }

其中ss的初始化是在创建 Leader对象时,创建的 socket

private final ServerSocket ss;

    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        this.proposalStats = new BufferStats();
        try {
            if (self.shouldUsePortUnification() || self.isSslQuorum()) {
                boolean allowInsecureConnection = self.shouldUsePortUnification();
                if (self.getQuorumListenOnAllIPs()) {
                    ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());
                } else {
                    ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
                }
            } else {
                if (self.getQuorumListenOnAllIPs()) {
                    ss = new ServerSocket(self.getQuorumAddress().getPort());
                } else {
                    ss = new ServerSocket();
                }
            }
            ss.setReuseAddress(true);
            if (!self.getQuorumListenOnAllIPs()) {
                ss.bind(self.getQuorumAddress());
            }
        } catch (BindException e) {
            if (self.getQuorumListenOnAllIPs()) {
                LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
            } else {
                LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
            }
            throw e;
        }
        this.zk = zk;
        this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
    }

Follower.lead()查找并连接 Leader

在 Follower.java种查找 followLeader()方法

void followLeader() throws InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
                QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;
        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
        try {
            // 查找 leader
            QuorumServer leaderServer = findLeader();            
            try {
                // 连接 leader
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                
                // 向 leader注册
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                try {
                    sock.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }


protected QuorumServer findLeader() {
        QuorumServer leaderServer = null;
        // Find the leader by id
    
    	// 选举投票的时候记录的,最后推荐的 leader的 sid
        Vote current = self.getCurrentVote();
    
   		 // 如果这个 sid在启动的所有服务器范围中
        for (QuorumServer s : self.getView().values()) {
            if (s.id == current.getId()) {
                // Ensure we have the leader's correct IP address before
                // attempting to connect.
                s.recreateSocketAddresses();
                leaderServer = s;
                break;
            }
        }
        if (leaderServer == null) {
            LOG.warn("Couldn't find the leader with id = "
                    + current.getId());
        }
        return leaderServer;
    }


protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, InterruptedException, X509Exception {
        this.sock = createSocket();

        int initLimitTime = self.tickTime * self.initLimit;
        int remainingInitLimitTime = initLimitTime;
        long startNanoTime = nanoTime();

        for (int tries = 0; tries < 5; tries++) {
            try {
                // recalculate the init limit time because retries sleep for 1000 milliseconds
                remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
                if (remainingInitLimitTime <= 0) {
                    LOG.error("initLimit exceeded on retries.");
                    throw new IOException("initLimit exceeded on retries.");
                }

                sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
                if (self.isSslQuorum())  {
                    ((SSLSocket) sock).startHandshake();
                }
                sock.setTcpNoDelay(nodelay);
                break;
            } catch (IOException e) {
               ...
            Thread.sleep(1000);
        }

        self.authLearner.authenticate(sock, hostname);

        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
                sock.getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }

Leader.lead()创建 LearnerHandler

void lead() throws IOException, InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
                QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;

        zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

        try {
            self.tick.set(0);
            // 恢复数据到内存,启动时,其实已经加载过了
            zk.loadData();

            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from
            // new followers.
            // 等待其他 follower节点向 leader节点发送同步状态
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();

            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

            ...
        } finally {
            zk.unregisterJMX(this);
        }
    }


class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
        private volatile boolean stop = false;

        public LearnerCnxAcceptor() {
            super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
                    .getZooKeeperServerListener());
        }

        @Override
        public void run() {
            try {
                while (!stop) {
                    Socket s = null;
                    boolean error = false;
                    try {
                        // 等待接收 follower的状态同步申请
                        s = ss.accept();

                        // start with the initLimit, once the ack is processed
                        // in LearnerHandler switch to the syncLimit
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);
						
                        BufferedInputStream is = new BufferedInputStream(
                                s.getInputStream());
                        // 一旦接收到 follower的请求,就创建 LearnerHandler对象,处理请求
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        
                        // 启动线程
                        fh.start();
                    } 
                    ...
        }

        public void halt() {
            stop = true;
        }
    }

由于public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler是一个线程。所以 fh.start()执行的是 LearnerHandler中的 run()方法。

public void run() {
        try {
            leader.addLearnerHandler(this);
            // 心跳处理
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);
			
            
            // 从网络中接收消息,并反序列化为 packet
            QuorumPacket qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            
            // 选举结束后, observer和 follower都应该给 leader发送一个标志信息:FOLLOWERINFO或者 					OBSERVERINFO
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
                LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }

            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                    long configVersion = bbsid.getLong();
                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                    }
                }
            } else {
                this.sid = leader.followerCounter.getAndDecrement();
            }

            if (leader.self.getView().containsKey(this.sid)) {
                LOG.info("Follower sid: " + this.sid + " : info : "
                        + leader.self.getView().get(this.sid).toString());
            } else {
                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
            }
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }
			// 读取 Follower发送过来的 lastAcceptedEpoch 
            // 选举过程中,所使用的 epoch,其实还是上一任 leader的 epoch
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

            long peerLastZxid;
            StateSummary ss = null;
            
            // 读取 follower发送过来的 zxid
            long zxid = qp.getZxid();
            // Leader根据从 Follower获取 sid和旧的 epoch,构建新的 epoch
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                // Leader向 Follower发送信息(包含 :zxid和 newEpoch)
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
				}
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                leader.waitForEpochAck(this.getSid(), ss);
            }
            peerLastZxid = ss.getLastZxid();
           
            // Take any necessary action if we need to send TRUNC or DIFF
            // startForwarding() will be called in all cases
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();

                    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                            + "send zxid of db as 0x{}, {} concurrent snapshots, " 
                            + "snapshot was {} from throttle",
                            Long.toHexString(peerLastZxid), 
                            Long.toHexString(leaderLastZxid),
                            Long.toHexString(zxidToSend), 
                            snapshot.getConcurrentSnapshotNumber(),
                            snapshot.isEssential() ? "exempt" : "not exempt");
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                    bufferedOutput.flush();
                } finally {
                    snapshot.close();
                }
            }

            LOG.debug("Sending NEWLEADER message to " + sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
            // we got here, so the version was set
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();

            // Start thread that blast packets in the queue to learner
            startSendingPackets();
            
            /*
             * Have to wait for the first ACK, wait until
             * the leader is ready, and only then we can
             * start processing messages.
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.ACK){
                LOG.error("Next packet was supposed to be an ACK,"
                    + " but received packet: {}", packetToString(qp));
                return;
            }

            if(LOG.isDebugEnabled()){
            	LOG.debug("Received NEWLEADER-ACK message from " + sid);   
            }
            leader.waitForNewLeaderAck(getSid(), qp.getZxid());

            syncLimitCheck.start();
            
            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

            /*
             * Wait until leader starts up
             */
            synchronized(leader.zk){
                while(!leader.zk.isRunning() && !this.isInterrupted()){
                    leader.zk.wait(20);
                }
            }
            // Mutation packets will be queued during the serialize,
            // so we need to mark when the peer can actually start
            // using the data
            //
            LOG.debug("Sending UPTODATE message to " + sid);      
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

            while (true) {
                qp = new QuorumPacket();
                ia.readRecord(qp, "packet");

                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                if (qp.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                }
                tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


                ByteBuffer bb;
                long sessionId;
                int cxid;
                int type;

                switch (qp.getType()) {
                case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Received ACK from Observer  " + this.sid);
                        }
                    }
                    syncLimitCheck.updateAck(qp.getZxid());
                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;
                case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
                            .getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        leader.zk.touch(sess, to);
                    }
                    break;
                case Leader.REVALIDATE:
                    bis = new ByteArrayInputStream(qp.getData());
                    dis = new DataInputStream(bis);
                    long id = dis.readLong();
                    int to = dis.readInt();
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    DataOutputStream dos = new DataOutputStream(bos);
                    dos.writeLong(id);
                    boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
                    if (valid) {
                        try {
                            //set the session owner
                            // as the follower that
                            // owns the session
                            leader.zk.setOwner(id, this);
                        } catch (SessionExpiredException e) {
                            LOG.error("Somehow session " + Long.toHexString(id) +
                                    " expired right after being renewed! (impossible)", e);
                        }
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                                 ZooTrace.SESSION_TRACE_MASK,
                                                 "Session 0x" + Long.toHexString(id)
                                                 + " is valid: "+ valid);
                    }
                    dos.writeBoolean(valid);
                    qp.setData(bos.toByteArray());
                    queuedPackets.add(qp);
                    break;
                case Leader.REQUEST:
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if(type == OpCode.sync){
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    leader.zk.submitLearnerRequest(si);
                    break;
                default:
                    LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                    break;
                }
            }
        } catch (IOException e) {
            ...
        }
    }

Follower.lead()创建 registerWithLeader

void followLeader() throws InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
                QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;
        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
        try {
            // 查找 leader
            QuorumServer leaderServer = findLeader();            
            try {
                // 连接leader
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                
                // 向leader注册
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                // 循环等待接收消息
                while (this.isRunning()) {
                    // 读取 packet信息
                    readPacket(qp);
                    // 处理 packet消息
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                try {
                    sock.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }


protected long registerWithLeader(int pktType) throws IOException{
        /*
         * Send follower info, including last zxid and sid
         */
    	long lastLoggedZxid = self.getLastLoggedZxid();
        QuorumPacket qp = new QuorumPacket();                
        qp.setType(pktType);
        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
        
        /*
         * Add sid to payload
         */
        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
        boa.writeRecord(li, "LearnerInfo");
        qp.setData(bsid.toByteArray());
        
    	// 发送 FollowerInfo给 Leader
        writePacket(qp, true);
    	
    	// 读取 Leader返回的结果: LeaderInfo
        readPacket(qp);        
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    
    	// 如果接收到 LeaderInfo
		if (qp.getType() == Leader.LEADERINFO) {
        	// we are connected to a 1.0 server so accept the new epoch and read the next packet
        	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        	byte epochBytes[] = new byte[4];
        	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            
            // 接收 leader的 epoch
        	if (newEpoch > self.getAcceptedEpoch()) {
                
                // 把自己原来的 epoch保存在 wrappedEpochBytes里
        		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
                
                // 把 leader发送过来的 epoch保存起来
        		self.setAcceptedEpoch(newEpoch);
        	} else if (newEpoch == self.getAcceptedEpoch()) {
        		// since we have already acked an epoch equal to the leaders, we cannot ack
        		// again, but we still need to send our lastZxid to the leader so that we can
        		// sync with it if it does assume leadership of the epoch.
        		// the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
        	} else {
        		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
        	}
            
            // 发送 ackepoch给 leader(包含了自己的 epoch和 zxid)
        	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        	writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
        	if (newEpoch > self.getAcceptedEpoch()) {
        		self.setAcceptedEpoch(newEpoch);
        	}
            if (qp.getType() != Leader.NEWLEADER) {
                LOG.error("First packet should have been NEWLEADER");
                throw new IOException("First packet should have been NEWLEADER");
            }
            return qp.getZxid();
        }
    } 

Leader.lead()接收 Follwer状态, 根据同步方式发送同步消息

public void run() {
        try {
            leader.addLearnerHandler(this);
            // 心跳处理
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);
			
            
            // 从网络中接收消息,并反序列化为 packet
            QuorumPacket qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            
            // 选举结束后, observer和 follower都应该给 leader发送一个标志信息:FOLLOWERINFO或者 					OBSERVERINFO
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
                LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }

            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                    long configVersion = bbsid.getLong();
                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                    }
                }
            } else {
                this.sid = leader.followerCounter.getAndDecrement();
            }

            if (leader.self.getView().containsKey(this.sid)) {
                LOG.info("Follower sid: " + this.sid + " : info : "
                        + leader.self.getView().get(this.sid).toString());
            } else {
                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
            }
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }
			// 读取 Follower发送过来的 lastAcceptedEpoch 
            // 选举过程中,所使用的 epoch,其实还是上一任 leader的 epoch
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

            long peerLastZxid;
            StateSummary ss = null;
            
            // 读取 follower发送过来的 zxid
            long zxid = qp.getZxid();
            // Leader根据从 Follower获取 sid和旧的 epoch,构建新的 epoch
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                // Leader向 Follower发送信息(包含 :zxid和 newEpoch)
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                
                // 接收到 Follower应答的 ackepoch
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
				}
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                
                // 保存了对方 follower或者 observer的状态: epoch和 zxid
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                leader.waitForEpochAck(this.getSid(), ss);
            }
            peerLastZxid = ss.getLastZxid();
           
            // Take any necessary action if we need to send TRUNC or DIFF
            // startForwarding() will be called in all cases
            // 方法判断 Leader和 Follower是否需 要同步
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();

                    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                            + "send zxid of db as 0x{}, {} concurrent snapshots, " 
                            + "snapshot was {} from throttle",
                            Long.toHexString(peerLastZxid), 
                            Long.toHexString(leaderLastZxid),
                            Long.toHexString(zxidToSend), 
                            snapshot.getConcurrentSnapshotNumber(),
                            snapshot.isEssential() ? "exempt" : "not exempt");
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                    bufferedOutput.flush();
                } finally {
                    snapshot.close();
                }
            }

            LOG.debug("Sending NEWLEADER message to " + sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
            // we got here, so the version was set
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
           ...
            }
        } catch (IOException e) {
            ...
        }
    }



public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
        /*
         * When leader election is completed, the leader will set its
         * lastProcessedZxid to be (epoch < 32). There will be no txn associated
         * with this zxid.
         *
         * The learner will set its lastProcessedZxid to the same value if
         * it get DIFF or SNAP from the leader. If the same learner come
         * back to sync with leader using this zxid, we will never find this
         * zxid in our history. In this case, we will ignore TRUNC logic and
         * always send DIFF if we have old enough history
         */
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        // Keep track of the latest zxid which already queued
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
        boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
        ReentrantReadWriteLock lock = db.getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();
            long maxCommittedLog = db.getmaxCommittedLog();
            long minCommittedLog = db.getminCommittedLog();
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

            LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
                    + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                    + " peerLastZxid=0x{}", getSid(),
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(minCommittedLog),
                    Long.toHexString(lastProcessedZxid),
                    Long.toHexString(peerLastZxid));

            if (db.getCommittedLog().isEmpty()) {
                /*
                 * It is possible that committedLog is empty. In that case
                 * setting these value to the latest txn in leader db
                 * will reduce the case that we need to handle
                 *
                 * Here is how each case handle by the if block below
                 * 1. lastProcessZxid == peerZxid -> Handle by (2)
                 * 2. lastProcessZxid < peerZxid -> Handle by (3)
                 * 3. lastProcessZxid > peerZxid -> Handle by (5)
                 */
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }

            /*
             * Here are the cases that we want to handle
             *
             * 1. Force sending snapshot (for testing purpose)
             * 2. Peer and leader is already sync, send empty diff
             * 3. Follower has txn that we haven't seen. This may be old leader
             *    so we need to send TRUNC. However, if peer has newEpochZxid,
             *    we cannot send TRUNC since the follower has no txnlog
             * 4. Follower is within committedLog range or already in-sync.
             *    We may need to send DIFF or TRUNC depending on follower's zxid
             *    We always send empty DIFF if follower is already in-sync
             * 5. Follower missed the committedLog. We will try to use on-disk
             *    txnlog + committedLog to sync with follower. If that fail,
             *    we will send snapshot
             */

            if (forceSnapSync) {
                // Force leader to use snapshot to sync with follower
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                // Follower is already sync with us, send empty diff
                LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
                         " for peer sid: " +  getSid());
                queueOpPacket(Leader.DIFF, peerLastZxid);
                needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
                // Newer than committedLog, send trunc and done
                LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
                          Long.toHexString(maxCommittedLog) +
                          " for peer sid:" +  getSid());
                queueOpPacket(Leader.TRUNC, maxCommittedLog);
                currentZxid = maxCommittedLog;
                needOpPacket = false;
                needSnap = false;
            } else if ((maxCommittedLog >= peerLastZxid)
                    && (minCommittedLog <= peerLastZxid)) {
                // Follower is within commitLog range
                LOG.info("Using committedLog for peer sid: " +  getSid());
                Iterator<Proposal> itr = db.getCommittedLog().iterator();
                currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                                     null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                // Use txnlog and committedLog to sync

                // Calculate sizeLimit that we allow to retrieve txnlog from disk
                long sizeLimit = db.calculateTxnLogSizeLimit();
                // This method can return empty iterator if the requested zxid
                // is older than on-disk txnlog
                Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
                        peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                    LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                         minCommittedLog, maxCommittedLog);

                    LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
                    Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                         null, maxCommittedLog);
                    needSnap = false;
                }
                // closing the resources
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn("Unhandled scenario for peer sid: " +  getSid());
            }
            LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
                      " for peer sid: " +  getSid());
            leaderLastZxid = leader.startForwarding(this, currentZxid);
        } finally {
            rl.unlock();
        }

        if (needOpPacket && !needSnap) {
            // This should never happen, but we should fall back to sending
            // snapshot just in case.
            LOG.error("Unhandled scenario for peer sid: " +  getSid() +
                     " fall back to use snapshot");
            needSnap = true;
        }

        return needSnap;
    }

Follower.lead()应答 Leader同步结果

protected void processPacket(QuorumPacket qp) throws Exception{
        switch (qp.getType()) {
        case Leader.PING:            
            ping(qp);            
            break;
        case Leader.PROPOSAL:           
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            
            if (hdr.getType() == OpCode.reconfig){
               SetDataTxn setDataTxn = (SetDataTxn) txn;       
               QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
               self.setLastSeenQuorumVerifier(qv, true);                               
            }
            
            fzk.logRequest(hdr, txn);
            break;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;
            
        case Leader.COMMITANDACTIVATE:
           // get the new configuration from the request
           Request request = fzk.pendingTxns.element();
           SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();                                                                                                      
           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));                                
 
           // get new designated leader from (current) leader's message
           ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
           long suggestedLeaderId = buffer.getLong();
            boolean majorChange = 
                   self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
           // commit (writes the new config to ZK tree (/zookeeper/config)                     
           fzk.commit(qp.getZxid());
            if (majorChange) {
               throw new Exception("changes proposed in reconfig");
           }
           break;
        case Leader.UPTODATE:
            LOG.error("Received an UPTODATE message after Follower started");
            break;
        case Leader.REVALIDATE:
            revalidate(qp);
            break;
        case Leader.SYNC:
            fzk.sync();
            break;
        default:
            LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
            break;
        }
    }


public void commit(long zxid) {
        if (pendingTxns.size() == 0) {
            LOG.warn("Committing " + Long.toHexString(zxid)
                    + " without seeing txn");
            return;
        }
        long firstElementZxid = pendingTxns.element().zxid;
        if (firstElementZxid != zxid) {
            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                    + " but next pending txn 0x"
                    + Long.toHexString(firstElementZxid));
            System.exit(12);
        }
        Request request = pendingTxns.remove();
        commitProcessor.commit(request);
    }

Leader.lead()应答 Follower

由于public class LearnerHandler extends ZooKeeperThread{},说明 LearnerHandler是一个线程。所以 fh.start()执行的是 LearnerHandler中的 run()方法。

public void run() {……
    //
    LOG.debug("Sending UPTODATE message to " + sid);
    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    while (true) {……}
} catch (IOException e) {
    ......
} finally {
    ......
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/443093.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VUE3 学习笔记(九)使用富文本编辑器tinymce最新版

目录 1、安装相关依赖 2、下载中文包 3. 引入皮肤和汉化包 4. 封装组件:在src/components下新建TEditor.vue&#xff0c;并写入以下代码 5. 注册及使用组件 6. Tinymce 版本&#xff08;截至2023-04-18&#xff09; 1、安装相关依赖 npm install tinymce -S npm install ti…

python+vue小型公司人事企业员工培训报名管理系统

该系统主要实现了公告信息管理、个人考勤管理、培训信息管理、员工管理、员工工资管理等主要模块功能。具体功能如下所示&#xff1a; 1. 公告信息管理&#xff1a;查看公告标题、公告类型、公告图片、发布日期等一系列信息。 2. 个人考勤管理&#xff1a;查看考勤名称、员工工…

Springboot整合Quartz定时任务框架(Spring解决方案)

目录 前言 介绍 集成 POM依赖 基础配置 1、配置数据源 2、配置JOB实例与触发器 3、配置SchedulerJobFactory 4、配置SchedulerFactoryBean 业务集成 job编写 接口编写 接口实现 前言 系统现在有定时任务触发业务场景的需求&#xff0c;并且频率及次数不固定&…

【案例教程】FVCOM流域、海洋水环境数值模拟方法及实践技术应用

近年来&#xff0c;随着人类活动产生营养负荷的增加&#xff0c;流域、海洋生态系统面临严重威胁。近岸水质数值模是近岸水环境保护的有效工具&#xff0c;已经应用于近岸水环境污染控制、水质规划管理中。FVCOM在近岸水环境模拟方面具有一定优势&#xff0c;如采用非结构化三角…

王道计组(23版)2_数据的表示和运算

1.数制和编码 十进制转换为二进制&#xff1a; 原码&#xff1a; [0]原0,0000 [-0]原1,0000 -1无法表示 补码&#xff1a; 按位取反&#xff0c;末位加1 [0.0000]补[-0.0000]补0.00000 反码&#xff1a; 按位取反 [0]反0,0000 [-0]反1,1111 移码&#xff1a; 与补码仅符号位…

从前端角度快速理解Transformer

从前端角度快速理解Transformer Transformer的三步曲从前端角度&#xff08;SEO和TDK&#xff09;理解TransformerSEO与TDK一个例子来理解 总结 声明&#xff1a;本文为原创&#xff0c;未经同意请勿转载或爬取&#xff0c;感谢配合&#x1f604; chatGPT今年年初的时候是非常火…

操作系统实验一 并发程序设计

1.实验目的 掌握Linux环境下&#xff0c;多进程之间并发程序设计方法&#xff0c;并通过程序的运行结果来验证分时系统和并发程序设计的优越性。 2.实验要求 熟悉Linux操作系统子进程创建方法以及任务执行时间测量方法 3.实验内容 在单进程&#xff08;单用户、单任务&#xff…

【MYSQL索引失效的场景有哪些】

创建一张表&#xff1a;id为主键&#xff08;primary key&#xff09;name为普通建&#xff08;index&#xff09; 插入数据&#xff1a; 用主键索引查询&#xff1a; 用普通建索引查询 对于执行计划&#xff0c;参数有&#xff1a; possible_keys 字段表示可能用到的索引&am…

KMM初探与编译过程详解

本文字数&#xff1a;22817字 预计阅读时间&#xff1a;58分钟 简介 KMM&#xff0c; 即Kotlin Multiplatform Mobile&#xff0c;是由Kotlin发布的移动端跨平台框架。相比于其他跨平台框架&#xff0c;KMM是原生UI逻辑共享的理念&#xff0c;共享重复逻辑性的工作来提升开发效…

Maven配置国内源以及jar下载失败处理详解

目录 1&#xff0c;配置Idea的Maven xml文件不存在&#xff1a; xml文件存在&#xff1a; 2&#xff0c;重新下载jar包 3&#xff0c;注意事项总结 1&#xff0c;配置Idea的Maven 需要配置的项目有两个&#xff0c;一个是当前项目&#xff0c;一个是新项目&#xff1a; 打…

抖音账号矩阵搭建管理获客系统

抖音矩阵号管理系统是一款企业矩阵运营管理工具&#xff0c;能够有效地帮助企业管理多个矩阵账号&#xff0c;并实现批量管理。在短视频矩阵系统中&#xff0c;自动获客工具和智能AI的帮助下&#xff0c;一个人也能轻松地管理多个账号。 一、矩阵账号管理&#xff1a; 首先&a…

leetCode算法第三天

继续练习leetcode中关于字符串的算法题&#xff0c;越练越觉得自己编码思想还很欠缺&#xff0c;继续努力。 文章目录 有效的括号括号生成串联所有单词的子串最长有效括号 有效的括号 leetcode链接&#xff1a;https://leetcode.cn/problems/valid-parentheses/ 解题思路&…

SPI协议

SPI数据接口 SPI&#xff08;Serial Peripheral Interface&#xff09;串行外设接口的简称&#xff0c;它是一种同步全双工通信协议。有 3根或者 4根数据线组成&#xff0c;包括 CLK、SOMI、SIMO、STE&#xff1a; CLK为时钟线&#xff0c;由主机控制输出。 SOMI…

国产数字温度传感芯片M117 Pin to Pin替代PT100和PT1000

高精度数字温度传感芯片 - M117&#xff0c;可Pin to Pin替代PT100/PT1000&#xff0c;且具功能差异化优势&#xff0c;支持行业应用的定制化需求。高测温精度0.1℃&#xff0c;用户无需进行校准。芯片感温原理基于CMOS半导体PN节温度与带隙电压的特性关系&#xff0c;经过小信…

电脑开机进不了系统卡在加载界面怎么办?

电脑开机进不了系统卡在加载界面怎么办&#xff1f;有用户电脑弹出需要进行系统更新&#xff0c;不小心点到了系统更新的选项。因为自己不想进行系统更新&#xff0c;所以马上将电脑关机了。但是关机之后却发现系统一直卡在开机的界面中&#xff0c;无法进入桌面中了。那么这个…

如何在Anaconda下安装pytorch(conda安装和pip安装)

前言 文字说明 本文中标红的&#xff0c;代表的是我认为比较重要的。 版本说明 python环境配置&#xff1a;jupyter的base环境下的python是3.10版本。CUDA配置是&#xff1a;CUDA11.6。目前pytorch官网提示支持的版本是3.7-3.9 本文主要用来记录自己在安装pytorch中…

乙肝80%以上由妈妈传给孩子 5岁以下治愈率超六成

中国是乙肝大国。目前&#xff0c;乙肝病毒感染人数仍超过7000万。通过医务人员多年的努力&#xff0c;母婴传播感染率明显下降。到目前为止&#xff0c;已降至0.3%左右。每年仍有5万名儿童感染乙肝病毒。目前&#xff0c;儿童慢性乙肝仍在180万左右&#xff0c;绝对数仍是世界…

Node【模块系统】

文章目录 &#x1f31f;前言&#x1f31f;Nodejs模块系统&#x1f31f;为什么需要模块化&#x1f31f;什么是Nodejs模块&#x1f31f;Nodejs模块分类&#x1f31f;文件模块的分类&#x1f31f;调用内置模块&#x1f31f;调用文件模块 &#x1f31f;Nodejs模块使用&#x1f31f;…

2023年网络安全的发展趋势是怎样的?

数据安全越来越重要。 我国《数据安全法》提出“建立健全数据安全治理体系”&#xff0c;各地区部门均在探索和简历数据分类分级、重要数据识别与重点保护制度。 数据安全治理不仅是一系列技术应用或产品&#xff0c;更是包括组织构建、规范制定、技术支撑等要素共同完成数据…

PACS/RIS影像管理系统源码,支持图像后处理与重建

PACS/RIS影像管理系统源码&#xff0c;功能强大&#xff0c;文档齐全&#xff0c;有演示。 文末获取联系&#xff01; 系统特点&#xff1a; 符合国内医院影像中心/放射科的典型工作管理流程。 开放式体系结构&#xff0c;完全符合DICOM3.0标准&#xff0c;提供HL7标准接口&a…