Zookeeper源码分析——ZK选举源码解析

news2024/11/16 20:44:49

ZK选举源码解析

Zookeeper选举机制——第一次启动

在这里插入图片描述

Zookeeper选举机制——非第一次启动

在这里插入图片描述

ZK选举源码解析

在这里插入图片描述

ZK选举准备源码解析

在这里插入图片描述

public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();
        super.start();
    }


synchronized public void startLeaderElection() {
       try {
           if (getPeerState() == ServerState.LOOKING) {
               // 创建选票
		// 1)选票组件 epoch leader的任期代号)、 zxid(某个 leader当选期间执行的事务编号)、 myid serverid
		// 2)开始选票时,都是先投自己
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }

       // if (!getView().containsKey(myid)) {
      //      throw new RuntimeException("My id " + myid + " not in the peer list");
        //}
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(getQuorumAddress().getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
    	// 创建 选举算法实例
        this.electionAlg = createElectionAlgorithm(electionType);
    }


protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            // 1创建 QuorumCnxnManager,负责选举过程中的所有 网络通信
            QuorumCnxManager qcm = createCnxnManager();
            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
            if (oldQcm != null) {
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();
            }
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                // 2启动监听线程
                listener.start();
                // 3准备开始选举
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

网络通信组件初始化

    public QuorumCnxManager createCnxnManager() {
        return new QuorumCnxManager(this,
                this.getId(),
                this.getView(),
                this.authServer,
                this.authLearner,
                this.tickTime * this.syncLimit,
                this.getQuorumListenOnAllIPs(),
                this.quorumCnxnThreadsSize,
                this.isQuorumSaslAuthEnabled());
    }


public QuorumCnxManager(QuorumPeer self,
                            final long mySid,
                            Map<Long,QuorumPeer.QuorumServer> view,
                            QuorumAuthServer authServer,
                            QuorumAuthLearner authLearner,
                            int socketTimeout,
                            boolean listenOnAllIPs,
                            int quorumCnxnThreadsSize,
                            boolean quorumSaslAuthEnabled) {
    	// 创建各种队列
        this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
        this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
        this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
        this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();

        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if(cnxToValue != null){
            this.cnxTO = Integer.parseInt(cnxToValue);
        }

        this.self = self;

        this.mySid = mySid;
        this.socketTimeout = socketTimeout;
        this.view = view;
        this.listenOnAllIPs = listenOnAllIPs;

        initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
                quorumSaslAuthEnabled);

        // Starts listener thread that waits for connection requests
        listener = new Listener();
        listener.setName("QuorumPeerListener");
    }

监听线程初始化

点击QuorumCnxManager.Listener,找到对应的 run方法

public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            Socket client = null;
            Exception exitException = null;
            while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
                try {
                    if (self.shouldUsePortUnification()) {
                        LOG.info("Creating TLS-enabled quorum server socket");
                        ss = new UnifiedServerSocket(self.getX509Util(), true);
                    } else if (self.isSslQuorum()) {
                        LOG.info("Creating TLS-only quorum server socket");
                        ss = new UnifiedServerSocket(self.getX509Util(), false);
                    } else {
                        ss = new ServerSocket();
                    }

                    ss.setReuseAddress(true);

                    if (self.getQuorumListenOnAllIPs()) {
                        int port = self.getElectionAddress().getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        // Resolve hostname for this server in case the
                        // underlying ip address has changed.
                        self.recreateSocketAddresses(self.getId());
                        addr = self.getElectionAddress();
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(addr.toString());
                    // 绑定服务器地址
                    ss.bind(addr);
                    // 死循环
                    while (!shutdown) {
                        try {
                            client = ss.accept();
                            setSockOpts(client);
                            LOG.info("Received connection request "
                                    + formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
                            // Receive and handle the connection request
                            // asynchronously if the quorum sasl authentication is
                            // enabled. This is required because sasl server
                            // authentication process may take few seconds to finish,
                            // this may delay next peer connection requests.
                            if (quorumSaslAuthEnabled) {
                                receiveConnectionAsync(client);
                            } else {
                                receiveConnection(client);
                            }
                            numRetries = 0;
                        } catch (SocketTimeoutException e) {
                            LOG.warn("The socket is listening for the election accepted "
                                     + "and it timed out unexpectedly, but will retry."
                                     + "see ZOOKEEPER-2836");
                        }
                    }
				... }
            }
        }

选举准备

点击FastLeaderElection

 public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        this.stop = false;
        this.manager = manager;
        starter(self, manager);
    }
    
    
private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;
		// 初始化队列和信息
        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);
    }    

选举执行源码分析

在这里插入图片描述

QuorumPeer.java

public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
    // 冷启动数据恢复
        loadDataBase();
        startServerCnxnFactory();
        try {
            // 启动通信工厂实例对象
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
    // 准备选举环境
        startLeaderElection();
    // 执行选举
        super.start();
    }

执行super.start();就相当于执行QuorumPeer.java 类中的run()方法
当Zookeeper 启动后,首先都是Looking 状态,通过选举,让其中一台服务器成为Leader,
其他的服务器成为Follower。

    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk =
                            new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
    
                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            
                            // 进行选举,选举结束,返回最终成为 Leader胜选的那张选票
                            
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                           reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }                        
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );
                    } finally {
                        observer.shutdown();
                        setObserver(null);  
                       updateServerState();
                    }
                    break;
                case FOLLOWING:
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
			...
        }
    }

点击 lookForLeader()的实现类 FastLeaderElection.java

public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
            // 正常启动中,所有其他服务器,都会给我发送一个投票
			// 保存每一个服务器的最新合法有效的投票		
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
			
            // 存储合法选举之外的投票结果
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
			
            // 一次选举的最大等待时间,默认值是 0.2s
            int notTimeout = finalizeWait;
			
            // 每发起一轮选举, logicalclock++ // 在没有合法的 epoch数据之前,都使用逻辑时钟代替
			// 选举 leader的规则:依次比较 epoch(任期 zxid(事务 id serveridmyid 谁大谁当选 leader
            synchronized(this){
                // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
                logicalclock.incrementAndGet();
                
                // 更新选票( serverid zxid, epoch)
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            
            // 广播选票,把自己的选票发给其他服务器
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            // 一轮一轮的选举 直到选举成功
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
               .....
            return null;
        } finally {
           ...
            }
    }

点击 sendNotifications 广播选票,把自己的选票发给其他服务器

/**
     * Send notifications to all peers upon a change in our vote
     */
    private void sendNotifications() {
        // 遍历投票参与者,给每台服务器发送选票
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock.get(),
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch, qv.toString().getBytes());
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
            // 把发送选票放入发送队列
            sendqueue.offer(notmsg);
        }
    }

在 FastLeaderElection.java类中查找 WorkerSender线程。

class WorkerSender extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerSender(QuorumCnxManager manager){
                super("WorkerSender");
                this.stop = false;
                this.manager = manager;
            }

            public void run() {
                while (!stop) {
                    try {
                        // 队列阻塞,时刻准备接收要发送的选票
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if(m == null) continue;
						// 处理要发送的选票
                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

            /**
             * Called by run() once there is a new message to send.
             *
             * @param m     message to send
             */
            void process(ToSend m) {
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                    m.leader,
                                                    m.zxid,
                                                    m.electionEpoch,
                                                    m.peerEpoch,
                                                    m.configData);
				// 发送选票
                manager.toSend(m.sid, requestBuffer);

            }
        }


public void toSend(Long sid, ByteBuffer b) {
        /*
         * If sending message to myself, then simply enqueue it (loopback).
         */
    // 判断如果是发给自己的消息,直接进入自己的 RecvQueue
        if (this.mySid == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
            /*
             * Otherwise send to the corresponding thread to send.
             */
        } else {
             /*
              * Start a new connection if doesn't have one already.
              */
            // 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列
			// 并把要发送的消息放入该队列
             ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                SEND_CAPACITY);
             ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
             if (oldq != null) {
                 addToSendQueue(oldq, b);
             } else {
                 addToSendQueue(bq, b);
             }
            // 将选票发送出去
             connectOne(sid);

        }
    }

如果数据是发送给自己的,添加到自己的接收队列

public void addToRecvQueue(Message msg) {
        synchronized(recvQLock) {
            if (recvQueue.remainingCapacity() == 0) {
                try {
                    recvQueue.remove();
                } catch (NoSuchElementException ne) {
                    // element could be removed by poll()
                     LOG.debug("Trying to remove from an empty " +
                         "recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                // 将发送给自己的选票添加到 recvQueue队列
                recvQueue.add(msg);
            } catch (IllegalStateException ie) {
                // This should never happen
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

数据添加到发送队列

private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
          ByteBuffer buffer) {
        if (queue.remainingCapacity() == 0) {
            try {
                queue.remove();
            } catch (NoSuchElementException ne) {
                // element could be removed by poll()
                LOG.debug("Trying to remove from an empty " +
                        "Queue. Ignoring exception " + ne);
            }
        }
        try {
            // 将要发送的消息添加到发送队列
            queue.add(buffer);
        } catch (IllegalStateException ie) {
            // This should never happen
            LOG.error("Unable to insert an element in the queue " + ie);
        }
    }

与要发送的服务器节点建立通信连接

synchronized void connectOne(long sid){
        if (senderWorkerMap.get(sid) != null) {
            LOG.debug("There is a connection already for server " + sid);
            return;
        }
        synchronized (self.QV_LOCK) {
            boolean knownId = false;
            // Resolve hostname for the remote server before attempting to
            // connect in case the underlying ip address has changed.
            self.recreateSocketAddresses(sid);
            Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
            if (lastCommittedView.containsKey(sid)) {
                knownId = true;
                if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
                    return;
            }
            if (lastSeenQV != null && lastProposedView.containsKey(sid)
                    && (!knownId || (lastProposedView.get(sid).electionAddr !=
                    lastCommittedView.get(sid).electionAddr))) {
                knownId = true;
                if (connectOne(sid, lastProposedView.get(sid).electionAddr))
                    return;
            }
            if (!knownId) {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
        }
    }


synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
        if (senderWorkerMap.get(sid) != null) {
            LOG.debug("There is a connection already for server " + sid);
            return true;
        }

        Socket sock = null;
        try {
            LOG.debug("Opening channel to server " + sid);
            if (self.isSslQuorum()) {
                 SSLSocket sslSock = self.getX509Util().createSSLSocket();
                 setSockOpts(sslSock);
                 sslSock.connect(electionAddr, cnxTO);
                 sslSock.startHandshake();
                 sock = sslSock;
                 LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());
             } else {
                 sock = new Socket();
                 setSockOpts(sock);
                 sock.connect(electionAddr, cnxTO);

             }
             LOG.debug("Connected to server " + sid);
            // Sends connection request asynchronously if the quorum
            // sasl authentication is enabled. This is required because
            // sasl server authentication process may take few seconds to
            // finish, this may delay next peer connection requests.
            if (quorumSaslAuthEnabled) {
                initiateConnectionAsync(sock, sid);
            } else {
                // 处理连接
                initiateConnection(sock, sid);
            }
            return true;
        } catch (UnresolvedAddressException e) {
           
        }
    }

public void initiateConnection(final Socket sock, final Long sid) {
        try {
            startConnection(sock, sid);
        } catch (IOException e) {
            LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
                    new Object[] { sid, sock.getRemoteSocketAddress() }, e);
            closeSocket(sock);
            return;
        }
    }

创建并启动发送器线程和接收器线程

private boolean startConnection(Socket sock, Long sid)
            throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {
            // Use BufferedOutputStream to reduce the number of IP packets. This is
            // important for x-DC scenarios.
            // 通过输出流,向服务器发送数据
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            dout = new DataOutputStream(buf);

            // Sending id and challenge
            // represents protocol version (in other words - message type)
            dout.writeLong(PROTOCOL_VERSION);
            dout.writeLong(self.getId());
            String addr = formatInetAddr(self.getElectionAddress());
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();
			// 通过输入流读取对方发送过来的选票
            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }

        // authenticate learner
        QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
        if (qps != null) {
            // TODO - investigate why reconfig makes qps null.
            authLearner.authenticate(sock, qps.hostname);
        }

        // If lost the challenge, then drop the new connection
    	// 如果对方的 id比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端
        if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                    "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
            // 初始化,发送器 和 接收器
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);

            if(vsw != null)
                vsw.finish();

            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
                    SEND_CAPACITY));
			// 启动发送器线程和接收器线程
            sw.start();
            rw.start();

            return true;

        }
        return false;
    }

点击 SendWorker,并查找 该类下的 run方法

QuorumCnxManager.java

public void run() {
            threadCnt.incrementAndGet();
            try {
                /**
                 * If there is nothing in the queue to send, then we
                 * send the lastMessage to ensure that the last message
                 * was received by the peer. The message could be dropped
                 * in case self or the peer shutdown their connection
                 * (and exit the thread) prior to reading/processing
                 * the last message. Duplicate messages are handled correctly
                 * by the peer.
                 *
                 * If the send queue is non-empty, then we have a recent
                 * message than that stored in lastMessage. To avoid sending
                 * stale message, we should send the message in the send queue.
                 */
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq == null || isSendQueueEmpty(bq)) {
                   ByteBuffer b = lastMessageSent.get(sid);
                   if (b != null) {
                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
                       send(b);
                   }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }
			
			// 只要连接没有断开
            try {
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                .get(sid);
                        if (bq != null) {
                         // 不断从发送队列 SendQueue中,获取发送消息,并执行发送
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }	

                        if(b != null){
                            // 更新对于 sid这台服务器的最近一条消息
                            lastMessageSent.put(sid, b);
                            // 执行发送
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Exception when using channel: for id " + sid
                         + " my id = " + QuorumCnxManager.this.mySid
                         + " error = " + e);
            }
            this.finish();
            LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
        }
    }



synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            } catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", be);
                return;
            }
    		// 输出流向外发送
            dout.writeInt(b.capacity());
            dout.write(b.array());
            dout.flush();
        }

点击 RecvWorker,并查找 该类下的 run方法

@Override
        public void run() {
            threadCnt.incrementAndGet();
            try {
                // 只要连接没有断开
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    byte[] msgArray = new byte[length];
                    // 输入流接收消息
                    din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    // 接收对方发送过来的选票
                    addToRecvQueue(new Message(message.duplicate(), sid));
                }
            } catch (Exception e) {
                LOG.warn("Connection broken for id " + sid + ", my id = "
                         + QuorumCnxManager.this.mySid + ", error = " , e);
            } finally {
                LOG.warn("Interrupting SendWorker");
                sw.finish();
                closeSocket(sock);
            }
        }
    }



    public void addToRecvQueue(Message msg) {
        synchronized(recvQLock) {
            if (recvQueue.remainingCapacity() == 0) {
                try {
                    recvQueue.remove();
                } catch (NoSuchElementException ne) {
                    // element could be removed by poll()
                     LOG.debug("Trying to remove from an empty " +
                         "recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                // 将接收到的消息,放入接收消息队列
                recvQueue.add(msg);
            } catch (IllegalStateException ie) {
                // This should never happen
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

在 FastLeaderElection.java类中查找 WorkerReceiver线程。

        class WorkerReceiver extends ZooKeeperThread  {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerReceiver(QuorumCnxManager manager) {
                super("WorkerReceiver");
                this.stop = false;
                this.manager = manager;
            }

            public void run() {

                Message response;
                while (!stop) {
                    // Sleeps on receive
                    try {
                    // 从 RecvQueue中取出选举投票消息(其他服务器发送过来的)
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        ....
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted Exception while waiting for new message" +
                                e.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }
        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/438661.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Camunda流程引擎 Modeler (二)

Camunda Modeler是Camunda官方提供的建模器&#xff1a; Modeler - 独立安装的建模器&#xff08;[windows、linux、mac] 一、下载camunda-modeler Download The Camunda BPMN / DMN Process Modeler | Camunda Release v5.10.0 camunda/camunda-modeler GitHubAn integrate…

深度强化学习【1】-强化学习入门必备基础(含Python迷宫游戏求解实例)

强化学习入门必备基础 文章目录 强化学习入门必备基础1. 强化学习与机器学习1.1 有监督学习1.2 半监督学习1.3 无监督学习1.4 强化学习1.5 深度学习 2. 强化学习中的一些概念2.1 智能体、动作、状态2.2 策略函数、奖励2.3 状态转移2.4 智能体与环境的交互过程2.5 折扣奖励2.6 动…

【Leetcode -21.合并两个有序链表 -83.删除排序链表中的重复元素】

Leetcode Leetcode-21.合并两个有序链表Leetcode-83.删除排序链表中的重复元素 Leetcode-21.合并两个有序链表 题目&#xff1a;将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1&#xff1a; 输入&#xff1a;l1 […

CUDA编程一天入门

目录 0 环境准备 1 套路 2 并行执行内核设置 3 示例代码simpleTexture3D 4 参考链接 0 环境准备 1 套路 CUDA 编程模型是一个异构模型&#xff0c;其中同时使用 CPU 和 GPU。在 CUDA 中&#xff0c;主机是指 CPU 及其内存&#xff0c;而设备是指 GPU 及其内存。在主机…

Prophet学习(三)饱和的预测与饱和最低

目录 饱和的预测&#xff08;Forecasting Growth&#xff09; 饱和最低&#xff08;Saturating Minimum&#xff09; 饱和的预测&#xff08;Forecasting Growth&#xff09; 默认情况下&#xff0c;Prophet使用线性模型进行预测。在预测增长时&#xff0c;通常有一个最大可达…

Ubuntu22.04lts NVIDIA驱动安装

1. NVIDIA驱动安装 1.1 手动安装 驱动下载地址&#xff1a;NVIDIA Driver Downloads 注意要在文本模式安装&#xff0c;进入文本模式的方法&#xff1a;sudo init 3。 返回图形化界面的方法sudo init 5 安装时报错&#xff1a; 安装NVIDIA驱动&#xff08;手动安装NVIDIA-L…

两种方式,轻松实现ChatGPT联网

两种方式效果&#xff1a; 方式一&#xff1a;浏览器搜索内嵌插件 方式二&#xff1a;官方聊天页内嵌插件 首先&#xff0c;要有一个谷歌浏览器&#xff0c;然后再安装一个叫ChatGPT for Google&#xff0c;直接在谷歌里搜一下就能找&#xff0c;也可以Chrome应用商店里搜索&a…

SAP批次主数据增强屏幕增强<转载>

原文链接&#xff1a;https://blog.csdn.net/hustliangchen/article/details/111163361 msc1n/msc2n/msc3n 批次增强 这几个事务码的主程序为SAPLCHRG&#xff0c;在如下图界面上有两块地方是可以做增强所预留的子屏幕&#xff0c;其中2是一个tab页签的子屏幕。注意版本较老的…

AI企划-《大明镇抚司到此一游》商业计划书

结合中国文化和当地资源打造的独特旅游产品 一、项目概述 本项目旨在利用中国文化特色&#xff0c;结合新西兰当地资源和市场需求&#xff0c;打造一款独特的旅游产品——“大明镇抚司到此一游”网红打卡点。该打卡点以大明镇抚司为主题&#xff0c;以大明小龙虾美食为特色&am…

千亿市场!低代码平台极大提升企业应用开发效率,颠覆传统模式!

背景 低代码平台的5个核心亮点&#xff1a; 可视化的表单设计&#xff1b; 2.可视化的审批流&#xff0c;逻辑流设计&#xff1b; 3.可视化的BI图标设计&#xff1b; 4.可视化的以应用为中心的开放和集成接口设计&#xff1b; 5.以应用为中心&#xff0c;依托于云原生技术的快…

线性表,栈和队列

什么是线性表&#xff1f; 线性表是由n个相同类型的数据元素组成的有限序列&#xff0c;它是最基本的一种线性结构。顾名思义&#xff0c;线性表就像是一条线&#xff0c;不会分叉。线性表有唯一的开始和结束&#xff0c;除了第一个元素外&#xff0c;每个元素都有唯一的直接前…

docker容器:docker资源管理

目录 一、docker资源控制 1、资源控制工具 2、设置CPU使用率上限 3、CPU压力测试 4、Cgroups限制cpu使用率 5、查看容器运行状态 6、 配置容器指定CPU 7、限制容器内存使用 8、对磁盘IO配额控制&#xff08;blkio&#xff09;的限制 二、docker数据管理 1、数据管理…

量子退火算法入门(7):如何QUBO中的三次多项式怎么转换?

文章目录 前言一、三次多项式的例题二、Python实现1.引入库 总结 前言 本文还是大部分截图来自于&#xff1a;《最適化問題とWildqatを用いた量子アニーリング計算入門》 https://booth.pm/ja/items/1415833 终于有人问到怎么将QUBO中的三次多项式转换为二次多项式了。直接以…

购物车--购物车模块完善

很明显&#xff0c;之前的购物车模块功能的实现不够完善&#xff0c;缺少一个删除购物车中产品的操作&#xff0c;针对此我们加一个删除功能。 1、写负责删除的servelt类 2、然后我们给购物车页面的jsp添加超链接传递目标id 搞定。

王道计组(23版)1_计算机系统概述

1.计算机发展历程 硬件的发展&#xff1a; 电子管、晶体管、中小规模集成电路、超大规模集成电路 摩尔定律&#xff1a; 价格不变时&#xff0c;每18个月&#xff0c;集成电路可容纳的晶体管数量和性能增加一倍 微型计算机的发展以微处理器技术为标志 计算机体系结构&#xf…

u盘里的文件被自动删除了怎么办?五种数据恢复方案

u盘是我们日常生活中常常用到的一种便携式存储设备&#xff0c;可以帮助我们存储和携带大量的文件信息。但是&#xff0c;使用过程中难免会遇到一些问题&#xff0c;例如u盘会自己删除文件的情况&#xff0c;如果你遇到了这种情况&#xff0c;该怎样找回u盘自己删除的文件呢&am…

程序猿眼中的协议:TCP / IP 五层网络模型

哈喽&#xff0c;大家好~我是你们的老朋友&#xff1a; 保护小周ღ&#xff0c;本期为大家带来的是 网络基础原理中的 TCP / IP 五层网络模型&#xff0c;主要从协议的概念&#xff0c;网络模型&#xff0c;数据分层传输的流程&#xff0c;几个方面讲解&#xff0c;看完之后可以…

jdk-FuteureForkJoin框架源码学习

背景 日常的计算任务大部分都是串行来执行&#xff0c;但是如果一个复杂的任务需要进行拆分为多个小任务&#xff0c;那么以往是自行写一个递归或者循环计算等算法来实现&#xff0c;随着这类需求的提升&#xff0c;java7中引入了ForkJoin框架来支持这类计算&#xff0c;可以比…

计算机组成原理——第六章总线

误逐世间乐&#xff0c;颇穷理乱情 文章目录 前言6.1.1 总线概述6.1.2 总线的性能指标6.2 总线仲裁(408不考)6.3 总线操作和定时 前言 本章在概述部分我们会首先介绍一下总线的基本概念&#xff0c;介绍一下总线的分类以及经典结构&#xff0c;介绍一些性能指标来评价总线的性…

部分面试题记录

Spring相关&#xff1a; 1. spring框架中的单例bean是线程安全的嘛&#xff1f; 1.1 bean单例多例配置&#xff1a; bean可以手动设置单例或者多例&#xff1a; Service Scope("singleton") public class UserServicelmpl implements UserService { }singleton:b…