Zookeeper3.5.7源码分析

news2024/12/23 1:55:15

文章目录

  • 一、Zookeeper算法一致性
    • 1、Paxos 算法
      • 1.1 概述
      • 1.2 算法流程
      • 1.3 算法缺陷
    • 2、ZAB 协议
      • 2.1 概述
      • 2.2 Zab 协议内容
    • 3、CAP理论
  • 二、源码详解
    • 1、辅助源码
      • 1.1 持久化源码(了解)
      • 1.2 序列化源码
    • 2、ZK 服务端初始化源码解析
      • 2.1 启用脚本分析
      • 2.2 ZK 服务端启动入口
      • 2.3 解析参数 zoo.cfg 和 myid
      • 2.4 过期快照删除
      • 2.5 初始化通信组件
    • 3、ZK 服务端加载数据源码解析
      • 3.1 冷启动数据恢复快照数据
      • 3.2 冷启动数据恢复编辑日志
    • 4、ZK 选举源码解析
      • 4.1 选举准备
      • 4.2 选举执行
    • 5、Follower 和 Leader 状态同步源码
    • 6、服务端启动
      • 6.1 Leader 启动
      • 6.2 Follower 启动
    • 7、客户端启动
      • 7.1 创建 ZookeeperMain
      • 7.2 初始化监听器
      • 7.3 解析连接地址
      • 7.4 创建通信
      • 7.5 执行 run()

一、Zookeeper算法一致性

1、Paxos 算法

1.1 概述

Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。Paxos算法解决的问题:就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。

在一个Paxos系统中,首先将所有节点划分为Proposer(提议者),Acceptor(接受者),和Learner(学习者)。(注意:每个节点都可以身兼数职),一个完整的Paxos算法流程分为三个阶段:

Prepare准备阶段

  • Proposer向多个Acceptor发出Propose请求Promise(承诺)
  • Acceptor针对收到的Propose请求进行Promise(承诺)

Accept接受阶段

  • Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
  • Acceptor针对收到的Propose请求进行Accept处理

Learn学习阶段:Proposer将形成的决议发送给所有Learners

1.2 算法流程

1.3 算法缺陷

在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。造成这种情况的原因是系统中有一个以上的 Proposer,多个Proposers 相互争夺Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个Proposer,不会出现活锁的情况

2、ZAB 协议

2.1 概述

Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案

2.2 Zab 协议内容

Zab 协议包括两种基本的模式:消息广播、崩溃恢复

3、CAP理论

CAP理论告诉我们,一个分布式系统不可能同时满足以下三种

  • 一致性(C:Consistency)

    在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。

  • 可用性(A:Available)

    可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果

  • 分区容错性(P:Partition Tolerance)

    分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障

这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。ZooKeeper保证的是CP

  • ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
  • 进行Leader选举时集群都是不可用

二、源码详解

Zookeeper源码下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/

1、辅助源码

1.1 持久化源码(了解)

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中,在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码

public interface SnapShot {
    // 反序列化方法
    long deserialize(DataTree dt, Map<Long, Integer> sessions)
    throws IOException;
    // 序列化方法
    void serialize(DataTree dt, Map<Long, Integer> sessions,
    File name) throws IOException;
    
    /**
    *find the most recent snapshot file
    *查找最近的快照文件
    */
    File findMostRecentSnapshot() throws IOException;
    // 释放资源
    void close() throws IOException;
}

public interface TxnLog {
    // 设置服务状态
    void setServerStats(ServerStats serverStats);
    // 滚动日志
    void rollLog() throws IOException;
    // 追 加
    boolean append(TxnHeader hdr, Record r) throws IOException;
    // 读取数据
    TxnIterator read(long zxid) throws IOException;
    // 获取最后一个 zxid
    long getLastLoggedZxid() throws IOException;
    // 删除日志
    boolean truncate(long zxid) throws IOException;
    // 获 取 DbId
    long getDbId() throws IOException;
    // 提 交
    void commit() throws IOException;
    // 日志同步时间
    long getTxnLogSyncElapsedTime();
    // 关闭日志
    void close() throws IOException;
    // 读取日志的接口
    public interface TxnIterator {
        // 获取头信息
        TxnHeader getHeader();
        // 获取传输的内容
        Record getTxn();
        // 下一条记录
        boolean next() throws IOException;
        // 关闭资源
        void close() throws IOException;
        // 获取存储的大小
        long getStorageSize() throws IOException;
    }
}

1.2 序列化源码

zookeeper-jute 代码是关于Zookeeper 序列化相关源码

2、ZK 服务端初始化源码解析

2.1 启用脚本分析

zkServer.sh start 底层的实际执行内容,所以程序的入口是 QuorumPeerMain.java 类

nohup "$JAVA" 
+ 一堆提交参数
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG"(zkEnv.sh 文件中 ZOOCFG="zoo.cfg")

2.2 ZK 服务端启动入口

源码里查找QuorumPeerMain类

public static void main(String[] args) {
    // 创建了一个 zk 节点
    QuorumPeerMain main = new QuorumPeerMain();
    try {
    // 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
    main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
    ... ...
    }
    LOG.info("Exiting normally"); System.exit(0);
}

protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException{
    // 管理 zk 的配置信息
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
    // 1 解析参数,zoo.cfg 和 myid
    config.parse(args[0]);
    }
    // 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
    .getDataDir(), config.getDataLogDir(), config
    .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
    
    if (args.length == 1 && config.isDistributed()) {
    // 3 启动集群
    runFromConfig(config);
    } else {
    LOG.warn("Either no config or no quorum defined in config, running "
    + " in standalone mode");
    // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args);
    }
}

2.3 解析参数 zoo.cfg 和 myid

public void parse(String path) throws ConfigException {
    LOG.info("Reading configuration from: " + path);
    try {
    // 校验文件路径及是否存在
    File configFile = (new VerifyingFileFactory.Builder(LOG)
    .warnForRelativePath()
    .failForNonExistingPath()
    .build()).create(path);
    
    Properties cfg = new Properties();
    FileInputStream in = new FileInputStream(configFile); 
    // 加载配置文件
    cfg.load(in);
    configFileStr = path;
    } finally {
    in.close();
    }
    // 解析配置文件
    parseProperties(cfg);
    } catch (IOException e) {
    throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
    throw new ConfigException("Error processing " + path, e);
    }
    ... ...
}

// parseProperties(cfg)方法拉到最下面setupQuorumPeerConfig

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
            throws IOException, ConfigException {
    quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
    
    setupMyId();
    setupClientPort();
    setupPeerType();
    checkValidity();
}


private void setupMyId() throws IOException {
    File myIdFile = new File(dataDir, "myid");
    // standalone server doesn't need myid file.
    if (!myIdFile.isFile()) {
        return;
    }
    BufferedReader br = new BufferedReader(new FileReader(myIdFile));
    String myIdString;
    try {
        myIdString = br.readLine();
    } finally {
        br.close();
    }
    try {
        // 将解析 myid 文件中的 id 赋值给 serverId
        serverId = Long.parseLong(myIdString);
        MDC.put("myid", myIdString);
    } catch (NumberFormatException e) {
        throw new IllegalArgumentException("serverid " + myIdString
                + " is not a number");
    }
}

2.4 过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

// 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0  默认 0 表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
        .getDataDir(), config.getDataLogDir(), config
        .getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) { 
        LOG.warn("Purge task is already running."); return;
    }
    // 默认情况 purgeInterval=0,该任务关闭,直接返回
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled."); 
        return;
    }
    // 创建一个定时器
    timer = new Timer("PurgeTask", true);
    // 创建一个清理快照任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    // 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照, 有则删除
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
    
    purgeTaskStatus = PurgeTaskStatus.STARTED;
}

static class PurgeTask extends TimerTask { 
    private File logsDir;
    private File snapsDir; private int snapRetainCount;
    
    public PurgeTask(File dataDir, File snapDir, int count) { 
    logsDir = dataDir;
    snapsDir = snapDir; snapRetainCount = count;
}

    @Override
    public void run() {
        LOG.info("Purge task started."); 
        try {
            // 清理过期的数据
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {
            LOG.error("Error occurred while purging.", e);
        }
            LOG.info("Purge task completed.");
    }
}

public static void purge(File dataDir, File snapDir, int num) throws IOException { 
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }
    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); 
    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size(); 
    if (numSnaps > 0) {
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

2.5 初始化通信组件

if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
} else {
    LOG.warn("Either no config or no quorum defined in config, running "
            + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
}

// 通信协议默认 NIO
public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{
......
LOG.info("Starting quorum peer");
try {
    ServerCnxnFactory cnxnFactory = null;
    ServerCnxnFactory secureCnxnFactory = null;
    // 通信组件初始化,默认是 NIO 通信
    if (config.getClientPortAddress() != null) {
        // zookeeperAdmin.md 文件中
        //Default is `NIOServerCnxnFactory
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
        config.getMaxClientCnxns(), false);
    }
    if (config.getSecureClientPortAddress() != null) {
        secureCnxnFactory = ServerCnxnFactory.createFactory();
        secureCnxnFactory.configure(config.getSecureClientPortAddress(),
        config.getMaxClientCnxns(), true);
    }
    // 把解析的参数赋值给该 zookeeper 节点
    quorumPeer = getQuorumPeer();
    quorumPeer.setTxnFactory(new FileTxnSnapLog(
    config.getDataLogDir(),
    config.getDataDir()));
    quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
    quorumPeer.enableLocalSessionsUpgrading(
    config.isLocalSessionsUpgradingEnabled());
    //quorumPeer.setQuorumPeers(config.getAllMembers());
    quorumPeer.setElectionType(config.getElectionAlg());
    quorumPeer.setMyid(config.getServerId());
    quorumPeer.setTickTime(config.getTickTime());
    quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
    quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
    quorumPeer.setInitLimit(config.getInitLimit());
    quorumPeer.setSyncLimit(config.getSyncLimit());
    quorumPeer.setConfigFileName(config.getConfigFilename());
    // 管理 zk 数据的存储
    quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
    quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
    if (config.getLastSeenQuorumVerifier()!=null) {
        quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),false);
    }
   quorumPeer.initConfigInZKDatabase();
   // 管理 zk 的通信
   quorumPeer.setCnxnFactory(cnxnFactory);
   quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
   quorumPeer.setSslQuorum(config.isSslQuorum());
   quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
   quorumPeer.setLearnerType(config.getPeerType());
   quorumPeer.setSyncEnabled(config.getSyncEnabled());
   quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
   if (config.sslQuorumReloadCertFiles) {
       quorumPeer.getX509Util().enableCertFileReloading();
   }
    ......
    quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
    quorumPeer.initialize();
    // 启动 zk
    quorumPeer.start();
    quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    }
}

初始化 NIO 服务端 Socket(并未启动),ctrl + alt +B 查找 configure 实现类,NIOServerCnxnFactory.java

@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
    ......
    // 初始化 NIO 服务端 socket,绑定 2181 端口,可以接收客户端请求
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    // 绑定 2181 端口
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

3、ZK 服务端加载数据源码解析

3.1 冷启动数据恢复快照数据

public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    // 冷启动数据恢复
    loadDataBase();
    startServerCnxnFactory(); 
    try {
      // 启动通信工厂实例对象
      adminServer.start();
    } catch (AdminServerException e) { 
       LOG.warn("Problem starting AdminServer", e); System.out.println(e);
    }
    // 准备选举环境
    startLeaderElection();
    //  执行选举
    super.start();
}


private void loadDataBase() {
    try {
        // 加载磁盘数据到内存,恢复 DataTree
        // zk 的操作分两种:事务操作和非事务操作
        // 事务操作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位:(前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id)
        // 非事务操作:zk.getData()
        zkDb.loadDataBase();

        // load the epochs
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        ......
}

public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {
    // 恢复快照文件数据到 DataTree
    long deserializeResult = snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    RestoreFinalizer finalizer = () -> {
    // 恢复编辑日志数据到 DataTree
    long highestZxid = fastForwardFromEdits(dt, sessions, listener);
    return highestZxid;
    ......
}

//ctrl + alt +B 查找 deserialize 实现类 FileSnap.java
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
        throws IOException {
    ......
    // 依次遍历每一个快照的数据
    for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
        snap = snapList.get(i);
        LOG.info("Reading snapshot " + snap);
        // 反序列化环境准备
        try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
             CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
            // 反序列化,恢复数据到 DataTree
            deserialize(dt, sessions, ia);
     ......
}

public void deserialize(DataTree dt, Map<Long, Integer> sessions,
        InputArchive ia) throws IOException {
    FileHeader header = new FileHeader();
    header.deserialize(ia, "fileheader");
    if (header.getMagic() != SNAP_MAGIC) {
        throw new IOException("mismatching magic headers "
                + header.getMagic() +
                " !=  " + FileSnap.SNAP_MAGIC);
    }
    // 恢复快照数据到 DataTree
    SerializeUtils.deserializeSnapshot(dt,ia,sessions);
}

public static void deserializeSnapshot(DataTree dt,InputArchive ia,
        Map<Long, Integer> sessions) throws IOException {
    int count = ia.readInt("count");
    while (count > 0) {
        long id = ia.readLong("id");
        int to = ia.readInt("timeout");
        sessions.put(id, to);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "loadData --- session in archive: " + id
                    + " with timeout: " + to);
        }
        count--;
    }
    // 恢复快照数据到 DataTree
    dt.deserialize(ia, "tree");
}


public void deserialize(InputArchive ia, String tag) throws IOException {
    aclCache.deserialize(ia);
    nodes.clear();
    pTrie.clear();
    String path = ia.readString("path");
    // 从快照中恢复每一个 datanode 节点数据到 DataTree
    while (!"/".equals(path)) {
        // 每次循环创建一个节点对象  
        DataNode node = new DataNode();
        ia.readRecord(node, "node");
        // 将 DataNode 恢复到 DataTree
        nodes.put(path, node);
        synchronized (node) {
            aclCache.addUsage(node.acl);
        }
        int lastSlash = path.lastIndexOf('/');
        if (lastSlash == -1) {
            root = node;
        } else {
            // 处理父节点
            String parentPath = path.substring(0, lastSlash);
            DataNode parent = nodes.get(parentPath);
            if (parent == null) {
                throw new IOException("Invalid Datatree, unable to find " +
                        "parent " + parentPath + " of path " + path);
            }
            // 处理子节点
            parent.addChild(path.substring(lastSlash + 1));
            // 处理临时节点和永久节点
            long eowner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(eowner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (eowner != 0) {
                HashSet<String> list = ephemerals.get(eowner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(eowner, list);
                }
                list.add(path);
            }
        }
        path = ia.readString("path");
    }
    nodes.put("/", root);
    // we are done with deserializing the
    // the datatree
    // update the quotas - create path trie
    // and also update the stat nodes
    setupQuota();

    aclCache.purgeUnused();
}

3.2 冷启动数据恢复编辑日志

回到 FileTxnSnapLog.java 类中的 restore 方法

public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {
    // 恢复快照文件数据到 DataTree
    long deserializeResult = snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);

    RestoreFinalizer finalizer = () -> {
        // 恢复编辑日志数据到 DataTree
        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
        return highestZxid;
    };
    ......
}

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                 PlayBackListener listener) throws IOException {
    // 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1位置开始恢复
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    // 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        // 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
        while (true) {
            // 获取事务头信息(有 zxid)
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(highestZxid) > {}(next log) for type {}",
                        highestZxid, hdr.getZxid(), hdr.getType());
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                // 根据编辑日志恢复数据到 DataTree,每执行一次,对应的事务 id,highestZxid + 1
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next())
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}

4、ZK 选举源码解析

选举流程可以参考之前的zookeeper基础学习

4.1 选举准备

@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
     }
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    // 选举准备
    startLeaderElection();
    super.start();
}



synchronized public void startLeaderElection() {
   try {
       if (getPeerState() == ServerState.LOOKING) {
           // 创建选票
           // (1)选票组件:epoch(leader 的任期代号)、zxid(某个 leader 当选期间执行的事务编号)、myid(serverid)
           // (2)开始选票时,都是先投自己
           currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
       }
    ......
    // 创建选举算法实例
    this.electionAlg = createElectionAlgorithm(electionType);
}


protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    ......
    case 3:
        // 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            // 2 启动监听线程
            listener.start();
            // 3 准备开始选举
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        ......
}

// 网络通信组件初始化
public QuorumCnxManager createCnxnManager() {
    return new QuorumCnxManager(this,
            this.getId(),
            this.getView(),
            this.authServer,
            this.authLearner,
            this.tickTime * this.syncLimit,
            this.getQuorumListenOnAllIPs(),
            this.quorumCnxnThreadsSize,
            this.isQuorumSaslAuthEnabled());
}


public QuorumCnxManager(QuorumPeer self,
                        final long mySid,
                        Map<Long,QuorumPeer.QuorumServer> view,
                        QuorumAuthServer authServer,
                        QuorumAuthLearner authLearner,
                        int socketTimeout,
                        boolean listenOnAllIPs,
                        int quorumCnxnThreadsSize,
                        boolean quorumSaslAuthEnabled) {
    // 创建各种队列
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
    this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();

    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }

    this.self = self;

    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;

    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);

    // Starts listener thread that waits for connection requests
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}

监听线程初始化,点击 QuorumCnxManager.Listener,找到对应的 run 方法

@Override
public void run() {
    ......
  LOG.info("My election bind port: " + addr.toString());
  setName(addr.toString());
  // 绑定服务器地址
  ss.bind(addr);
  // 死循环
  while (!shutdown) {
      try {
      // 阻塞,等待处理请求
          client = ss.accept();
        ......
}

选举准备,点击 FastLeaderElection

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    // 初始化队列和信息
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager);
}

4.2 选举执行

QuorumPeer.java中执行 super.start();就相当于执行 QuorumPeer.java 类中的 run()方法。当 Zookeeper 启动后,首先都是 Looking 状态,通过选举,让其中一台服务器成为 Leader,其他的服务器成为 Follower。

@Override
public void run() {
    ......
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                LOG.info("LOOKING");
                ......
                // 进行选举,选举结束,返回最终成为 Leader 胜选的那张选票
                 setCurrentVote(makeLEStrategy().lookForLeader());
                ......
            case FOLLOWING:
                try {
                   LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                   LOG.warn("Unexpected exception",e);
                } finally {
                   follower.shutdown();
                   setFollower(null);
                   updateServerState();
                }
                break;
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    updateServerState();
                }
                break;
            }
            start_fle = Time.currentElapsedTime();
        }
    } finally {
    ......
}


ctrl+alt+b 点击 lookForLeader()的实现类 FastLeaderElection.java

public Vote lookForLeader() throws InterruptedException {
    ......
    try {
        // 正常启动中,所有其他服务器,都会给我发送一个投票
         // 保存每一个服务器的最新合法有效的投票
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        // 存储合法选举之外的投票结果
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        // 一次选举的最大等待时间,默认值是 0.2s
        int notTimeout = finalizeWait;
        // 每发起一轮选举,logicalclock++
        // 在没有合法的 epoch 数据之前,都使用逻辑时钟代替
        // 选举 leader 的规则:依次比较 epoch(任期) zxid(事务 id) serverid(myid) 谁大谁当选 leader
        synchronized(this){
            // 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
            logicalclock.incrementAndGet();
            // 更新选票(serverid, zxid, epoch)
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        // 广播选票,把自己的选票发给其他服务器
        sendNotifications();

        /*
         * Loop in which we exchange notifications until we find a leader
         */
        // 一轮一轮的选举直到选举成功
        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){
     ......
}

点击 sendNotifications,广播选票,把自己的选票发给其他服务器

private void sendNotifications() {
    // 遍历投票参与者,给每台服务器发送选票
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        // 创建发送选票
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch, qv.toString().getBytes());
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        // 把发送选票放入发送队列
        sendqueue.offer(notmsg);
    }
}

在 FastLeaderElection.java 类中查找 WorkerSender 线程

class WorkerSender extends ZooKeeperThread {
    ......
    public void run() {
        while (!stop) {
            try {
                // 队列阻塞,时刻准备接收要发送的选票
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if(m == null) continue;
                // 处理要发送的选票
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
        LOG.info("WorkerSender is down");
    }

    /**
     * Called by run() once there is a new message to send.
     *
     * @param m     message to send
     */
    void process(ToSend m) {
        ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                            m.leader,
                                            m.zxid,
                                            m.electionEpoch,
                                            m.peerEpoch,
                                            m.configData);
        // 发送选票
        manager.toSend(m.sid, requestBuffer);

    }
}


public void toSend(Long sid, ByteBuffer b) { 
    // 判断如果是发给自己的消息,直接进入自己的 RecvQueue
    if (this.mySid == sid) {
         b.position(0);
         addToRecvQueue(new Message(b.duplicate(), sid));
    } else {
         // 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列
          // ,并把要发送的消息放入该队列
         ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
         ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
         if (oldq != null) {
             addToSendQueue(oldq, b);
         } else {
             addToSendQueue(bq, b);
         }
         // 将选票发送出去
         connectOne(sid);
    }
}

与要发送的服务器节点建立通信连接,创建并启动发送器线程和接收器线程

//connectOne->connectOne->initiateConnection->startConnection
private boolean startConnection(Socket sock, Long sid)
        throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // 通过输出流,向服务器发送数据
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);

        // Sending id and challenge
        // represents protocol version (in other words - message type)
        dout.writeLong(PROTOCOL_VERSION);
        dout.writeLong(self.getId());
        String addr = formatInetAddr(self.getElectionAddress());
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();
        // 通过输入流读取对方发送过来的选票
        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }

    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        // TODO - investigate why reconfig makes qps null.
        authLearner.authenticate(sock, qps.hostname);
    }

    // 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端
    if (sid > self.getId()) {
        LOG.info("Have smaller server identifier, so dropping the " +
                "connection: (" + sid + ", " + self.getId() + ")");
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        // 初始化,发送器 和 接收器
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if(vsw != null)
            vsw.finish();

        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
                SEND_CAPACITY));
        // 启动发送器线程和接收器线程
        sw.start();
        rw.start();

        return true;

    }
    return false;
}

点击 SendWorker,并查找该类下的 run 方法;点击 RecvWorker,并查找该类下的 run 方法(这里不举例了),在 FastLeaderElection.java 类中查找 WorkerReceiver 线程

public void run() {
     Message response;
     while (!stop) {
         // Sleeps on receive
         try {
            // 从 RecvQueue 中取出选举投票消息(其他服务器发送过来的)
             response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
             ......
         } catch (InterruptedException e) {
         ......
}

5、Follower 和 Leader 状态同步源码

当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的 Leader 更新自己状态为Leader,其他节点更新自己状态为 Follower。Leader 更新状态入口:leader.lead();Follower 更新状态入口:follower.followerLeader() 注意:

  • follower 必须要让 leader 知道自己的状态:epoch、zxid、sid 必须要找出谁是leader;发起请求连接 leader;发送自己的信息给leader;leader 接收到信息,必须要返回对应的信息给 follower
  • 当leader 得知follower 的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP
  • 执行数据同步
  • 当 leader 接收到超过半数 follower 的 ack 之后,进入正常工作状态,集群启动完成了

最终总结同步的方式:

  • DIFF 咱两一样,不需要做什么
  • TRUNC follower 的 zxid 比 leader 的 zxid 大,所以 Follower 要回滚
  • COMMIT leader 的zxid 比 follower 的 zxid 大,发送 Proposal 给 foloower 提交执行
  • 如果 follower 并没有任何数据,直接使用 SNAP 的方式来执行数据同步(直接把数据全部序列到follower)

6、服务端启动

6.1 Leader 启动

ZooKeeperServer全局查找 Leader,然后 ctrl + f 查找 lead()

void lead() throws IOException, InterruptedException {
    ... ...
    // 启动 zookeeper 服务
     startZkServer();
    ... ...
}

private synchronized void startZkServer() {
    ......
    // 启动 zookeeper 服务
    zk.startup();
    ......
}

//LeaderZooKeeperServer.java->super.startup();

//ZookeeperServer.java
public synchronized void startup() {
    if (sessionTracker == null) {
        createSessionTracker();
    }
    startSessionTracker();
    // 接受请求相关处理
    setupRequestProcessors();

    registerJMX();

    setState(State.RUNNING);
    notifyAll();
}

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

//点击 PrepRequestProcessor,并查找它的 run 方法

6.2 Follower 启动

FollowerZooKeeperServer全局查找 Follower,然后 ctrl + f 查找 followLeader()

void followLeader() throws InterruptedException {
......
        while (this.isRunning()) {
            readPacket(qp);
            processPacket(qp);
        }
......
}

void readPacket(QuorumPacket pp) throws IOException {
    synchronized (leaderIs) {
        leaderIs.readRecord(pp, "packet");
    }
    if (LOG.isTraceEnabled()) {
        final long traceMask =
            (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
                : ZooTrace.SERVER_PACKET_TRACE_MASK;

        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
    }
}


protected void processPacket(QuorumPacket qp) throws Exception{
    switch (qp.getType()) {
    case Leader.PING:            
        ping(qp);            
        break;
    case Leader.PROPOSAL:           
        TxnHeader hdr = new TxnHeader();
        Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
        if (hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                    + Long.toHexString(hdr.getZxid())
                    + " expected 0x"
                    + Long.toHexString(lastQueued + 1));
        }
        lastQueued = hdr.getZxid();
        
        if (hdr.getType() == OpCode.reconfig){
           SetDataTxn setDataTxn = (SetDataTxn) txn;       
           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
           self.setLastSeenQuorumVerifier(qv, true);                               
        }
        
        fzk.logRequest(hdr, txn);
        break;
    case Leader.COMMIT:
        fzk.commit(qp.getZxid());
        break;
        
    case Leader.COMMITANDACTIVATE:
       // get the new configuration from the request
       Request request = fzk.pendingTxns.element();
       SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();                                                                                                      
       QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));                                
       // get new designated leader from (current) leader's message
       ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
       long suggestedLeaderId = buffer.getLong();
        boolean majorChange = 
               self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
       // commit (writes the new config to ZK tree (/zookeeper/config)                     
       fzk.commit(qp.getZxid());
        if (majorChange) {
           throw new Exception("changes proposed in reconfig");
       }
       break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Follower started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        fzk.sync();
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

7、客户端启动

在 ZkCli.sh 启动 Zookeeper 时,会调用 ZooKeeperMain.java,查找 ZooKeeperMain,找到程序的入口 main()方法

public static void main(String args[]) throws CliException, IOException, InterruptedException{
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();
}

7.1 创建 ZookeeperMain

public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
    cl.parseOptions(args);
    System.out.println("Connecting to " + cl.getOption("server"));
    connectToZK(cl.getOption("server"));
}

protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }

    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    if (cl.getOption("secure") != null) {
        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
        System.out.println("Secure connection is enabled");
    }
    zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}

7.2 初始化监听器

ZooKeeperAdmin一直点进去

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly, HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    this.clientConfig = clientConfig;
    watchManager = defaultWatchManager();
    // 赋值 watcher 给默认的 defaultWatcher
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    hostProvider = aHostProvider;
    // 客户端与服务器端通信的终端
    cnxn = createConnection(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

7.3 解析连接地址

public ConnectStringParser(String connectString) {
    // parse out chroot, if any
    int off = connectString.indexOf('/');
    if (off >= 0) {
        String chrootPath = connectString.substring(off);
        // ignore "/" chroot spec, same as null
        if (chrootPath.length() == 1) {
            this.chrootPath = null;
        } else {
            PathUtils.validatePath(chrootPath);
            this.chrootPath = chrootPath;
        }
        connectString = connectString.substring(0, off);
    } else {
        this.chrootPath = null;
    }
    // "hadoop102:2181,hadoop103:2181,hadoop104:2181"用逗号切割
    List<String> hostsList = split(connectString,",");
    for (String host : hostsList) {
        int port = DEFAULT_PORT;
        int pidx = host.lastIndexOf(':');
        if (pidx >= 0) {
            // otherwise : is at the end of the string, ignore
            if (pidx < host.length() - 1) {
                port = Integer.parseInt(host.substring(pidx + 1));
            }
            host = host.substring(0, pidx);
        }
        serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    }
}

public class InetSocketAddress extends SocketAddress{
    // Private implementation class pointed to by all public methods.
    private static class InetSocketAddressHolder {
        // The hostname of the Socket Address 主机名称
        private String hostname;
        // The IP address of the Socket Address 通信地址
        private InetAddress addr;
        // The port number of the Socket Address 端口号
        private int port;
        ... ...
    }
    ... ...
}

7.4 创建通信

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly, HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {
    ......
    // 客户端与服务器端通信的终端
    cnxn = createConnection(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

protected ClientCnxn createConnection(String chrootPath,
        HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        boolean canBeReadOnly) throws IOException {
    return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
            watchManager, clientCnxnSocket, canBeReadOnly);
}

// 一直点下去,直到这里
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

    sendThread = new SendThread(clientCnxnSocket);
    eventThread = new EventThread();
    this.clientConfig=zooKeeper.getClientConfig();
    initRequestTimeout();
}

点击SendThread,查找run方法

// ZooKeeperThread 是一个线程,执行它的 run()方法
@Override
public void run() {
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // 在循环里面,循环发送,循环接收
    while (state.isAlive()) {
        try {
            if (!clientCnxnSocket.isConnected()) {
                // don't re-establish connection if we are closing
                if (closing) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    serverAddress = hostProvider.next(1000);
                }
                // 启动连接服务端
                startConnect(serverAddress);
             ......

            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }
            // 接收服务端响应,并处理
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        ......
}


private void startConnect(InetSocketAddress addr) throws IOException {
    ......
    logStartConnect(addr);
    // 建立连接
    clientCnxnSocket.connect(addr);
}

ctrl + alt +B 查找 connect 实现类,ClientCnxnSocketNIO.java

@Override
void connect(InetSocketAddress addr) throws IOException {
    SocketChannel sock = createSock();
    try {
       registerAndConnect(sock, addr);
  } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
    initialized = false;

    /*
     * Reset incomingBuffer
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
throws IOException {
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    boolean immediateConnect = sock.connect(addr);
    if (immediateConnect) {
        sendThread.primeConnection();
    }
}

void primeConnection() throws IOException {
    LOG.info("Socket connection established, initiating session, client: {}, server: {}",
    clientCnxnSocket.getLocalSocketAddress(),
    clientCnxnSocket.getRemoteSocketAddress());
    // 标记不是第一次连接
    isFirstConnect = false;
    ... ...
}

ctrl + alt +B 查找 doTransport 实现类,ClientCnxnSocketNIO.java

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
        throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    // Everything below and until we get back to the select is
    // non blocking, so time is effectively a constant. That is
    // Why we just have to do this once, here
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                updateSocketAddresses();
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            // 读取服务端应答
            doIO(pendingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        if (findSendablePacket(outgoingQueue,
                sendThread.tunnelAuthInProgress()) != null) {
            enableWrite();
        }
    }
    selected.clear();
}

7.5 执行 run()

public static void main(String args[]) throws CliException, IOException, InterruptedException{
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();
}

void run() throws CliException, IOException, InterruptedException {
    ......
        if (jlinemissing) {
            System.out.println("JLine support is disabled");
            BufferedReader br =
                new BufferedReader(new InputStreamReader(System.in));

            String line;
            while ((line = br.readLine()) != null) {
                // 一行一行读取命令
                executeLine(line);
            }
        }
    } else {
        // Command line args non-null.  Run what was passed.
        processCmd(cl);
    }
    System.exit(exitCode);
}

public void executeLine(String line) throws CliException, InterruptedException, IOException {
  if (!line.equals("")) {
    cl.parseCommand(line);
    addToHistory(commandCount,line);
    // 处理客户端命令
    processCmd(cl);
    commandCount++;
  }
}

protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
    boolean watch = false;
    try {
        // 解析命令
        watch = processZKCmd(co);
        exitCode = 0;
    } catch (CliException ex) {
        exitCode = ex.getExitCode();
        System.err.println(ex.getMessage());
    }
    return watch;
}

protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    if (args.length < 1) {
        usage();
        throw new MalformedCommandException("No command entered");
    }

    if (!commandMap.containsKey(cmd)) {
        usage();
        throw new CommandNotFoundException("Command not found " + cmd);
    }
    
    boolean watch = false;
    LOG.debug("Processing " + cmd);


    if (cmd.equals("quit")) {
        zk.close();
        System.exit(exitCode);
    } else if (cmd.equals("redo") && args.length >= 2) {
        Integer i = Integer.decode(args[1]);
        if (commandCount <= i || i < 0) { // don't allow redoing this redo
            throw new MalformedCommandException("Command index out of range");
        }
        cl.parseCommand(history.get(i));
        if (cl.getCommand().equals("redo")) {
            throw new MalformedCommandException("No redoing redos");
        }
        history.put(commandCount, history.get(i));
        processCmd(cl);
    } else if (cmd.equals("history")) {
        for (int i = commandCount - 10; i <= commandCount; ++i) {
            if (i < 0) continue;
            System.out.println(i + " - " + history.get(i));
        }
    } else if (cmd.equals("printwatches")) {
        if (args.length == 1) {
            System.out.println("printwatches is " + (printWatches ? "on" : "off"));
        } else {
            printWatches = args[1].equals("on");
        }
    } else if (cmd.equals("connect")) {
        if (args.length >= 2) {
            connectToZK(args[1]);
        } else {
            connectToZK(host);
        }
    }
    
    // Below commands all need a live connection
    if (zk == null || !zk.getState().isAlive()) {
        System.out.println("Not connected");
        return false;
    }
    
    // execute from commandMap
    CliCommand cliCmd = commandMapCli.get(cmd);
    if(cliCmd != null) {
        cliCmd.setZk(zk);
        watch = cliCmd.parse(args).exec();
    } else if (!commandMap.containsKey(cmd)) {
         usage();
    }
    return watch;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1409513.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LLM Agent-指令微调方案

上一章我们介绍了基于Prompt范式的工具调用方案&#xff0c;这一章介绍基于模型微调&#xff0c;支持任意多工具组合调用&#xff0c;复杂调用的方案。多工具调用核心需要解决3个问题&#xff0c;在哪个位置进行工具调用(where), 从众多工具中选择哪一个(Which), 工具的输入是什…

Java-NIO篇章(5)——Reactor反应器模式

前面已经讲过了Java-NIO中的三大核心组件Selector、Channel、Buffer&#xff0c;现在组件我们回了&#xff0c;但是如何实现一个超级高并发的socket网络通信程序呢&#xff1f;假设&#xff0c;我们只有一台内存为32G的Intel-i710八核的机器&#xff0c;如何实现同时2万个客户端…

音频特效SDK,满足内容生产的音频处理需求

美摄科技&#xff0c;作为音频处理技术的佼佼者&#xff0c;推出的音频特效SDK&#xff0c;旨在满足企业内容生产中的音频处理需求。这款SDK内置多种常见音频处理功能&#xff0c;如音频变声、均衡器、淡入淡出、音频变调等&#xff0c;帮助企业轻松应对各种音频处理挑战。 一…

Vue3笔记(2024)

1. Vue3简介 2020年9月18日&#xff0c;Vue.js发布版3.0版本&#xff0c;代号&#xff1a;One Piece&#xff08;n 经历了&#xff1a;4800次提交、40个RFC、600次PR、300贡献者 官方发版地址&#xff1a;Release v3.0.0 One Piece vuejs/core 截止2023年10月&#xff0c;最…

CSS 实现 flex布局最后一行左对齐的方案「多场景、多方案」

目录 前言解决方案场景一、子项宽度固定&#xff0c;每一行列数固定方法一&#xff1a;模拟两端对齐方法二&#xff1a;根据元素个数最后一个元素动态margin 场景二、子项的宽度不确定方法一&#xff1a;直接设置最后一项 margin-right:auto方法二&#xff1a;使用:after(伪元素…

IDEA插件(MyBatis Log Free)

引言 在Java开发中&#xff0c;MyBatis 是一款广泛使用的持久层框架&#xff0c;它简化了SQL映射并提供了强大的数据访问能力。为了更好地调试和优化MyBatis应用中的SQL语句执行&#xff0c;一款名为 MyBatis Log Free 的 IntelliJ IDEA 插件应运而生。这款插件旨在帮助开发者…

P4769 [NOI2018] 冒泡排序 洛谷黑题题解附源码

[NOI2018] 冒泡排序 题目背景 请注意&#xff0c;题目中存在 n 0 n0 n0 的数据。 题目描述 最近&#xff0c;小 S 对冒泡排序产生了浓厚的兴趣。为了问题简单&#xff0c;小 S 只研究对 1 1 1 到 n n n 的排列的冒泡排序。 下面是对冒泡排序的算法描述。 输入&#x…

光环云与跨境智算云网实验室联合发布“数据全链路安全与合规解决方案”

1月19日&#xff0c;国际数据经济产业创新大会在上海临港新片区召开&#xff0c;光环云受邀出席。会上&#xff0c;光环云与“上海国际数据港创新实验室——跨境智算云网实验室”联合发布“数据全链路安全与合规解决方案”&#xff0c;助力企业数据跨境流动更加便捷、安全、高效…

数据结构之二叉树的遍历

数据结构是程序设计的重要基础&#xff0c;它所讨论的内容和技术对从事软件项目的开发有重要作用。学习数据结构要达到的目标是学会从问题出发&#xff0c;分析和研究计算机加工的数据的特性&#xff0c;以便为应用所涉及的数据选择适当的逻辑结构、存储结构及其相应的操作方法…

BPM、低代码和人工智能:实现灵活、创新与转型的关键结合

随着零售业格局的不断演变&#xff0c;零售商正被迫在一个日益活跃、竞争日益激烈的客户驱动型市场中展开竞争。随着互联网上产品信息和评论的出现&#xff0c;消费者的态度发生了巨大的变化——购物者不再依赖销售人员来获取信息。他们现在知道的和许多零售销售人员一样多&…

解决 BeanUtil.copyProperties 不同属性直接的复制

1、引入hutool <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version> </dependency> hutool官网 2、直接上例子 对象&#xff1a;User.java Data public class User {p…

一文了解Ceph原理以及常见ceph指令

一、Ceph介绍 什么是分布式存储&#xff1f; 与集中式存储相反&#xff0c;分布式存储通常采用存储单元集群的形式。并且具有在集群节点之间进行数据同步和协调的机制。其目的是为了通过服务器解决大规模&#xff0c;高并发情况下的Web访问问题。 Ceph是一个统一的、分布式的存…

2017年认证杯SPSSPRO杯数学建模D题(第二阶段)教室的合理设计全过程文档及程序

2017年认证杯SPSSPRO杯数学建模 D题 教室的合理设计 原题再现&#xff1a; 某培训机构租用了一块如图&#xff08;见附件&#xff09;所示的场地&#xff0c;由于该机构开设了多种门类的课程&#xff0c;所以需要将这块场地通过加入一些隔墙来分割为多个独立的教室和活动区。…

数据目录驱动测试——深入探讨Pytest插件 pytest-datadir

在软件测试中,有效管理测试数据对于编写全面的测试用例至关重要。Pytest插件 pytest-datadir 提供了一种优雅的解决方案,使得数据目录驱动测试变得更加简单而灵活。本文将深入介绍 pytest-datadir 插件的基本用法和实际案例,助你更好地组织和利用测试数据。 什么是pytest-da…

分布式锁实现(mysql,以及redis)以及分布式的概念(续)redsync包使用

道生一&#xff0c;一生二&#xff0c;二生三&#xff0c;三生万物 这张尽量结合上一章进行使用&#xff1a;上一章 这章主要是讲如何通过redis实现分布式锁的 redis实现 这里我用redis去实现&#xff1a; 技术&#xff1a;golang&#xff0c;redis&#xff0c;数据结构 …

github 推送报错 ssh: connect to host github.com port 22: Connection timed out 解决

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

未来已来:概念车展漫游可视化的震撼之旅

随着科技的飞速发展&#xff0c;汽车行业正经历着前所未有的变革。而在这场变革中&#xff0c;概念车展无疑是一个引领潮流、展望未来的重要舞台。 想象一下&#xff0c;你站在一个巨大的展厅中&#xff0c;四周陈列着各式各样的概念车。它们有的造型独特&#xff0c;有的功能先…

[BJDCTF2020]ZJCTF,不过如此(特详解)

php特性 1.先看代码&#xff0c;提示了next.php&#xff0c;绕过题目的要求去回显next.php 2.可以看到要求存在text内容而且text内容强等于后面的字符串&#xff0c;而且先通过这个if才能执行下面的file参数。 3.看到用的是file_get_contents()函数打开text。想到用data://协…

真心话大冒险!关于自动驾驶的现状和未来,Mobileye的回答是?

过去的十年&#xff0c;可以说是从主动安全、辅助驾驶到自动驾驶快速演进的周期。这其中&#xff0c;无论是技术迭代&#xff0c;还是成本优化&#xff0c;以及技术和商业化路线的争论&#xff0c;备受行业关注。 同时&#xff0c;市场上的声音&#xff0c;也很多。有激进、谨慎…

详细Nginx和PHP-FPM的进程间通信使用

工作中考虑到PHP-FPM效率&#xff0c;发现PHP-FPM和NGINX的进程通信不止配置端口这一种方式:bowtie: Nginx和PHP-FPM的进程间通信有两种方式,一种是TCP,一种是UNIX Domain Socket. 其中TCP是IP加端口,可以跨服务器.而UNIX Domain Socket不经过网络,只能用于Nginx跟PHP-FPM都在同…