文章目录
- 一、Zookeeper算法一致性
- 1、Paxos 算法
- 1.1 概述
- 1.2 算法流程
- 1.3 算法缺陷
- 2、ZAB 协议
- 2.1 概述
- 2.2 Zab 协议内容
- 3、CAP理论
- 二、源码详解
- 1、辅助源码
- 1.1 持久化源码(了解)
- 1.2 序列化源码
- 2、ZK 服务端初始化源码解析
- 2.1 启用脚本分析
- 2.2 ZK 服务端启动入口
- 2.3 解析参数 zoo.cfg 和 myid
- 2.4 过期快照删除
- 2.5 初始化通信组件
- 3、ZK 服务端加载数据源码解析
- 3.1 冷启动数据恢复快照数据
- 3.2 冷启动数据恢复编辑日志
- 4、ZK 选举源码解析
- 4.1 选举准备
- 4.2 选举执行
- 5、Follower 和 Leader 状态同步源码
- 6、服务端启动
- 6.1 Leader 启动
- 6.2 Follower 启动
- 7、客户端启动
- 7.1 创建 ZookeeperMain
- 7.2 初始化监听器
- 7.3 解析连接地址
- 7.4 创建通信
- 7.5 执行 run()
一、Zookeeper算法一致性
1、Paxos 算法
1.1 概述
Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。Paxos算法解决的问题:就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性。
在一个Paxos系统中,首先将所有节点划分为Proposer(提议者),Acceptor(接受者),和Learner(学习者)。(注意:每个节点都可以身兼数职),一个完整的Paxos算法流程分为三个阶段:
Prepare准备阶段
- Proposer向多个Acceptor发出Propose请求Promise(承诺)
- Acceptor针对收到的Propose请求进行Promise(承诺)
Accept接受阶段
- Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
- Acceptor针对收到的Propose请求进行Accept处理
Learn学习阶段:Proposer将形成的决议发送给所有Learners
1.2 算法流程
1.3 算法缺陷
在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。造成这种情况的原因是系统中有一个以上的 Proposer,多个Proposers 相互争夺Acceptor,造成迟迟无法达成一致的情况。针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个Proposer,不会出现活锁的情况
2、ZAB 协议
2.1 概述
Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader 客户端将数据同步到其他 Follower 节点。即 Zookeeper 只有一个 Leader 可以发起提案
2.2 Zab 协议内容
Zab 协议包括两种基本的模式:消息广播、崩溃恢复
3、CAP理论
CAP理论告诉我们,一个分布式系统不可能同时满足以下三种
-
一致性(C:Consistency)
在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
-
可用性(A:Available)
可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果
-
分区容错性(P:Partition Tolerance)
分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障
这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。ZooKeeper保证的是CP
- ZooKeeper不能保证每次服务请求的可用性。(注:在极端环境下,ZooKeeper可能会丢弃一些请求,消费者程序需要重新请求才能获得结果)。所以说,ZooKeeper不能保证服务可用性。
- 进行Leader选举时集群都是不可用
二、源码详解
Zookeeper源码下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
1、辅助源码
1.1 持久化源码(了解)
Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中,在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码
public interface SnapShot {
// 反序列化方法
long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;
// 序列化方法
void serialize(DataTree dt, Map<Long, Integer> sessions,
File name) throws IOException;
/**
*find the most recent snapshot file
*查找最近的快照文件
*/
File findMostRecentSnapshot() throws IOException;
// 释放资源
void close() throws IOException;
}
public interface TxnLog {
// 设置服务状态
void setServerStats(ServerStats serverStats);
// 滚动日志
void rollLog() throws IOException;
// 追 加
boolean append(TxnHeader hdr, Record r) throws IOException;
// 读取数据
TxnIterator read(long zxid) throws IOException;
// 获取最后一个 zxid
long getLastLoggedZxid() throws IOException;
// 删除日志
boolean truncate(long zxid) throws IOException;
// 获 取 DbId
long getDbId() throws IOException;
// 提 交
void commit() throws IOException;
// 日志同步时间
long getTxnLogSyncElapsedTime();
// 关闭日志
void close() throws IOException;
// 读取日志的接口
public interface TxnIterator {
// 获取头信息
TxnHeader getHeader();
// 获取传输的内容
Record getTxn();
// 下一条记录
boolean next() throws IOException;
// 关闭资源
void close() throws IOException;
// 获取存储的大小
long getStorageSize() throws IOException;
}
}
1.2 序列化源码
zookeeper-jute 代码是关于Zookeeper 序列化相关源码
2、ZK 服务端初始化源码解析
2.1 启用脚本分析
zkServer.sh start 底层的实际执行内容,所以程序的入口是 QuorumPeerMain.java 类
nohup "$JAVA"
+ 一堆提交参数
+ $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
+ "$ZOOCFG"(zkEnv.sh 文件中 ZOOCFG="zoo.cfg")
2.2 ZK 服务端启动入口
源码里查找QuorumPeerMain类
public static void main(String[] args) {
// 创建了一个 zk 节点
QuorumPeerMain main = new QuorumPeerMain();
try {
// 初始化节点并运行,args 相当于提交参数中的 zoo.cfg
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
... ...
}
LOG.info("Exiting normally"); System.exit(0);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException{
// 管理 zk 的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1 解析参数,zoo.cfg 和 myid
config.parse(args[0]);
}
// 2 启动定时任务,对过期的快照,执行删除(默认该功能关闭)
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3 启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args);
}
}
2.3 解析参数 zoo.cfg 和 myid
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);
try {
// 校验文件路径及是否存在
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
// 加载配置文件
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
// 解析配置文件
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
... ...
}
// parseProperties(cfg)方法拉到最下面setupQuorumPeerConfig
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
private void setupMyId() throws IOException {
File myIdFile = new File(dataDir, "myid");
// standalone server doesn't need myid file.
if (!myIdFile.isFile()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
// 将解析 myid 文件中的 id 赋值给 serverId
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString
+ " is not a number");
}
}
2.4 过期快照删除
可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的
// 2 启动定时任务,对过期的快照,执行删除(默认是关闭)
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0 默认 0 表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running."); return;
}
// 默认情况 purgeInterval=0,该任务关闭,直接返回
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
// 创建一个定时器
timer = new Timer("PurgeTask", true);
// 创建一个清理快照任务
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
// 如果 purgeInterval 设置的值是 1,表示 1 小时检查一次,判断是否有过期快照, 有则删除
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir; private int snapRetainCount;
public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir; snapRetainCount = count;
}
@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理过期的数据
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}
2.5 初始化通信组件
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
// 通信协议默认 NIO
public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{
......
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
// 通信组件初始化,默认是 NIO 通信
if (config.getClientPortAddress() != null) {
// zookeeperAdmin.md 文件中
//Default is `NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(), true);
}
// 把解析的参数赋值给该 zookeeper 节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 管理 zk 数据的存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),false);
}
quorumPeer.initConfigInZKDatabase();
// 管理 zk 的通信
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
......
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
// 启动 zk
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
初始化 NIO 服务端 Socket(并未启动),ctrl + alt +B 查找 configure 实现类,NIOServerCnxnFactory.java
@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
......
// 初始化 NIO 服务端 socket,绑定 2181 端口,可以接收客户端请求
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
// 绑定 2181 端口
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
3、ZK 服务端加载数据源码解析
3.1 冷启动数据恢复快照数据
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
// 冷启动数据恢复
loadDataBase();
startServerCnxnFactory();
try {
// 启动通信工厂实例对象
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e); System.out.println(e);
}
// 准备选举环境
startLeaderElection();
// 执行选举
super.start();
}
private void loadDataBase() {
try {
// 加载磁盘数据到内存,恢复 DataTree
// zk 的操作分两种:事务操作和非事务操作
// 事务操作:zk.cteate();都会被分配一个全局唯一的 zxid,zxid 组成:64 位:(前 32 位:epoch 每个 leader 任期的代号;后 32 位:txid 为事务 id)
// 非事务操作:zk.getData()
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
......
}
public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {
// 恢复快照文件数据到 DataTree
long deserializeResult = snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
RestoreFinalizer finalizer = () -> {
// 恢复编辑日志数据到 DataTree
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
return highestZxid;
......
}
//ctrl + alt +B 查找 deserialize 实现类 FileSnap.java
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
......
// 依次遍历每一个快照的数据
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
// 反序列化环境准备
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 反序列化,恢复数据到 DataTree
deserialize(dt, sessions, ia);
......
}
public void deserialize(DataTree dt, Map<Long, Integer> sessions,
InputArchive ia) throws IOException {
FileHeader header = new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers "
+ header.getMagic() +
" != " + FileSnap.SNAP_MAGIC);
}
// 恢复快照数据到 DataTree
SerializeUtils.deserializeSnapshot(dt,ia,sessions);
}
public static void deserializeSnapshot(DataTree dt,InputArchive ia,
Map<Long, Integer> sessions) throws IOException {
int count = ia.readInt("count");
while (count > 0) {
long id = ia.readLong("id");
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id
+ " with timeout: " + to);
}
count--;
}
// 恢复快照数据到 DataTree
dt.deserialize(ia, "tree");
}
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
// 从快照中恢复每一个 datanode 节点数据到 DataTree
while (!"/".equals(path)) {
// 每次循环创建一个节点对象
DataNode node = new DataNode();
ia.readRecord(node, "node");
// 将 DataNode 恢复到 DataTree
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
// 处理父节点
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}
// 处理子节点
parent.addChild(path.substring(lastSlash + 1));
// 处理临时节点和永久节点
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}
3.2 冷启动数据恢复编辑日志
回到 FileTxnSnapLog.java 类中的 restore 方法
public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {
// 恢复快照文件数据到 DataTree
long deserializeResult = snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
RestoreFinalizer finalizer = () -> {
// 恢复编辑日志数据到 DataTree
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
return highestZxid;
};
......
}
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
// 在此之前,已经从快照文件中恢复了大部分数据,接下来只需从快照的 zxid + 1位置开始恢复
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
// 快照中最大的 zxid,在执行编辑日志时,这个值会不断更新,直到所有操作执行完
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
// 从 lastProcessedZxid 事务编号器开始,不断的从编辑日志中恢复剩下的还没有恢复的数据
while (true) {
// 获取事务头信息(有 zxid)
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}",
highestZxid, hdr.getZxid(), hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
// 根据编辑日志恢复数据到 DataTree,每执行一次,对应的事务 id,highestZxid + 1
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
4、ZK 选举源码解析
选举流程可以参考之前的zookeeper基础学习
4.1 选举准备
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
// 选举准备
startLeaderElection();
super.start();
}
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
// 创建选票
// (1)选票组件:epoch(leader 的任期代号)、zxid(某个 leader 当选期间执行的事务编号)、myid(serverid)
// (2)开始选票时,都是先投自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
......
// 创建选举算法实例
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
......
case 3:
// 1 创建 QuorumCnxnManager,负责选举过程中的所有网络通信
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// 2 启动监听线程
listener.start();
// 3 准备开始选举
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
......
}
// 网络通信组件初始化
public QuorumCnxManager createCnxnManager() {
return new QuorumCnxManager(this,
this.getId(),
this.getView(),
this.authServer,
this.authLearner,
this.tickTime * this.syncLimit,
this.getQuorumListenOnAllIPs(),
this.quorumCnxnThreadsSize,
this.isQuorumSaslAuthEnabled());
}
public QuorumCnxManager(QuorumPeer self,
final long mySid,
Map<Long,QuorumPeer.QuorumServer> view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled) {
// 创建各种队列
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
listener = new Listener();
listener.setName("QuorumPeerListener");
}
监听线程初始化,点击 QuorumCnxManager.Listener,找到对应的 run 方法
@Override
public void run() {
......
LOG.info("My election bind port: " + addr.toString());
setName(addr.toString());
// 绑定服务器地址
ss.bind(addr);
// 死循环
while (!shutdown) {
try {
// 阻塞,等待处理请求
client = ss.accept();
......
}
选举准备,点击 FastLeaderElection
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self;
proposedLeader = -1;
proposedZxid = -1;
// 初始化队列和信息
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager);
}
4.2 选举执行
QuorumPeer.java中执行 super.start();
就相当于执行 QuorumPeer.java 类中的 run()方法。当 Zookeeper 启动后,首先都是 Looking 状态,通过选举,让其中一台服务器成为 Leader,其他的服务器成为 Follower。
@Override
public void run() {
......
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
......
// 进行选举,选举结束,返回最终成为 Leader 胜选的那张选票
setCurrentVote(makeLEStrategy().lookForLeader());
......
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
......
}
ctrl+alt+b 点击 lookForLeader()的实现类 FastLeaderElection.java
public Vote lookForLeader() throws InterruptedException {
......
try {
// 正常启动中,所有其他服务器,都会给我发送一个投票
// 保存每一个服务器的最新合法有效的投票
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
// 存储合法选举之外的投票结果
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
// 一次选举的最大等待时间,默认值是 0.2s
int notTimeout = finalizeWait;
// 每发起一轮选举,logicalclock++
// 在没有合法的 epoch 数据之前,都使用逻辑时钟代替
// 选举 leader 的规则:依次比较 epoch(任期) zxid(事务 id) serverid(myid) 谁大谁当选 leader
synchronized(this){
// 更新逻辑时钟,每进行一次选举,都需要更新逻辑时钟
logicalclock.incrementAndGet();
// 更新选票(serverid, zxid, epoch)
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 广播选票,把自己的选票发给其他服务器
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
// 一轮一轮的选举直到选举成功
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
......
}
点击 sendNotifications,广播选票,把自己的选票发给其他服务器
private void sendNotifications() {
// 遍历投票参与者,给每台服务器发送选票
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
// 创建发送选票
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 把发送选票放入发送队列
sendqueue.offer(notmsg);
}
}
在 FastLeaderElection.java 类中查找 WorkerSender 线程
class WorkerSender extends ZooKeeperThread {
......
public void run() {
while (!stop) {
try {
// 队列阻塞,时刻准备接收要发送的选票
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
// 处理要发送的选票
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
/**
* Called by run() once there is a new message to send.
*
* @param m message to send
*/
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
// 发送选票
manager.toSend(m.sid, requestBuffer);
}
}
public void toSend(Long sid, ByteBuffer b) {
// 判断如果是发给自己的消息,直接进入自己的 RecvQueue
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
// 如果是发给其他服务器,创建对应的发送队列或者获取已经存在的发送队列
// ,并把要发送的消息放入该队列
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
// 将选票发送出去
connectOne(sid);
}
}
与要发送的服务器节点建立通信连接,创建并启动发送器线程和接收器线程
//connectOne->connectOne->initiateConnection->startConnection
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// 通过输出流,向服务器发送数据
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = formatInetAddr(self.getElectionAddress());
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
// 通过输入流读取对方发送过来的选票
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// 如果对方的 id 比我的大,我是没有资格给对方发送连接请求的,直接关闭自己的客户端
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
// 初始化,发送器 和 接收器
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
// 启动发送器线程和接收器线程
sw.start();
rw.start();
return true;
}
return false;
}
点击 SendWorker,并查找该类下的 run 方法;点击 RecvWorker,并查找该类下的 run 方法(这里不举例了),在 FastLeaderElection.java 类中查找 WorkerReceiver 线程
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
// 从 RecvQueue 中取出选举投票消息(其他服务器发送过来的)
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
......
} catch (InterruptedException e) {
......
}
5、Follower 和 Leader 状态同步源码
当选举结束后,每个节点都需要根据自己的角色更新自己的状态。选举出的 Leader 更新自己状态为Leader,其他节点更新自己状态为 Follower。Leader 更新状态入口:leader.lead()
;Follower 更新状态入口:follower.followerLeader()
注意:
- follower 必须要让 leader 知道自己的状态:epoch、zxid、sid 必须要找出谁是leader;发起请求连接 leader;发送自己的信息给leader;leader 接收到信息,必须要返回对应的信息给 follower
- 当leader 得知follower 的状态了,就确定需要做何种方式的数据同步DIFF、TRUNC、SNAP
- 执行数据同步
- 当 leader 接收到超过半数 follower 的 ack 之后,进入正常工作状态,集群启动完成了
最终总结同步的方式:
- DIFF 咱两一样,不需要做什么
- TRUNC follower 的 zxid 比 leader 的 zxid 大,所以 Follower 要回滚
- COMMIT leader 的zxid 比 follower 的 zxid 大,发送 Proposal 给 foloower 提交执行
- 如果 follower 并没有任何数据,直接使用 SNAP 的方式来执行数据同步(直接把数据全部序列到follower)
6、服务端启动
6.1 Leader 启动
ZooKeeperServer全局查找 Leader,然后 ctrl + f 查找 lead()
void lead() throws IOException, InterruptedException {
... ...
// 启动 zookeeper 服务
startZkServer();
... ...
}
private synchronized void startZkServer() {
......
// 启动 zookeeper 服务
zk.startup();
......
}
//LeaderZooKeeperServer.java->super.startup();
//ZookeeperServer.java
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
// 接受请求相关处理
setupRequestProcessors();
registerJMX();
setState(State.RUNNING);
notifyAll();
}
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
//点击 PrepRequestProcessor,并查找它的 run 方法
6.2 Follower 启动
FollowerZooKeeperServer全局查找 Follower,然后 ctrl + f 查找 followLeader()
void followLeader() throws InterruptedException {
......
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
......
}
void readPacket(QuorumPacket pp) throws IOException {
synchronized (leaderIs) {
leaderIs.readRecord(pp, "packet");
}
if (LOG.isTraceEnabled()) {
final long traceMask =
(pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
: ZooTrace.SERVER_PACKET_TRACE_MASK;
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
}
}
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
if (hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
case Leader.COMMITANDACTIVATE:
// get the new configuration from the request
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(qp.getZxid());
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
fzk.sync();
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}
7、客户端启动
在 ZkCli.sh 启动 Zookeeper 时,会调用 ZooKeeperMain.java,查找 ZooKeeperMain,找到程序的入口 main()方法
public static void main(String args[]) throws CliException, IOException, InterruptedException{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}
7.1 创建 ZookeeperMain
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
System.out.println("Connecting to " + cl.getOption("server"));
connectToZK(cl.getOption("server"));
}
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
if (cl.getOption("secure") != null) {
System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
System.out.println("Secure connection is enabled");
}
zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}
7.2 初始化监听器
ZooKeeperAdmin一直点进去
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
// 赋值 watcher 给默认的 defaultWatcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
// 客户端与服务器端通信的终端
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
7.3 解析连接地址
public ConnectStringParser(String connectString) {
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
// "hadoop102:2181,hadoop103:2181,hadoop104:2181"用逗号切割
List<String> hostsList = split(connectString,",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
public class InetSocketAddress extends SocketAddress{
// Private implementation class pointed to by all public methods.
private static class InetSocketAddressHolder {
// The hostname of the Socket Address 主机名称
private String hostname;
// The IP address of the Socket Address 通信地址
private InetAddress addr;
// The port number of the Socket Address 端口号
private int port;
... ...
}
... ...
}
7.4 创建通信
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
......
// 客户端与服务器端通信的终端
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}
// 一直点下去,直到这里
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
initRequestTimeout();
}
点击SendThread,查找run方法
// ZooKeeperThread 是一个线程,执行它的 run()方法
@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// 在循环里面,循环发送,循环接收
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
// 启动连接服务端
startConnect(serverAddress);
......
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
// 接收服务端响应,并处理
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
......
}
private void startConnect(InetSocketAddress addr) throws IOException {
......
logStartConnect(addr);
// 建立连接
clientCnxnSocket.connect(addr);
}
ctrl + alt +B 查找 connect 实现类,ClientCnxnSocketNIO.java
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
void primeConnection() throws IOException {
LOG.info("Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
// 标记不是第一次连接
isFirstConnect = false;
... ...
}
ctrl + alt +B 查找 doTransport 实现类,ClientCnxnSocketNIO.java
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 读取服务端应答
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
7.5 执行 run()
public static void main(String args[]) throws CliException, IOException, InterruptedException{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}
void run() throws CliException, IOException, InterruptedException {
......
if (jlinemissing) {
System.out.println("JLine support is disabled");
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = br.readLine()) != null) {
// 一行一行读取命令
executeLine(line);
}
}
} else {
// Command line args non-null. Run what was passed.
processCmd(cl);
}
System.exit(exitCode);
}
public void executeLine(String line) throws CliException, InterruptedException, IOException {
if (!line.equals("")) {
cl.parseCommand(line);
addToHistory(commandCount,line);
// 处理客户端命令
processCmd(cl);
commandCount++;
}
}
protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
boolean watch = false;
try {
// 解析命令
watch = processZKCmd(co);
exitCode = 0;
} catch (CliException ex) {
exitCode = ex.getExitCode();
System.err.println(ex.getMessage());
}
return watch;
}
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
throw new MalformedCommandException("No command entered");
}
if (!commandMap.containsKey(cmd)) {
usage();
throw new CommandNotFoundException("Command not found " + cmd);
}
boolean watch = false;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
zk.close();
System.exit(exitCode);
} else if (cmd.equals("redo") && args.length >= 2) {
Integer i = Integer.decode(args[1]);
if (commandCount <= i || i < 0) { // don't allow redoing this redo
throw new MalformedCommandException("Command index out of range");
}
cl.parseCommand(history.get(i));
if (cl.getCommand().equals("redo")) {
throw new MalformedCommandException("No redoing redos");
}
history.put(commandCount, history.get(i));
processCmd(cl);
} else if (cmd.equals("history")) {
for (int i = commandCount - 10; i <= commandCount; ++i) {
if (i < 0) continue;
System.out.println(i + " - " + history.get(i));
}
} else if (cmd.equals("printwatches")) {
if (args.length == 1) {
System.out.println("printwatches is " + (printWatches ? "on" : "off"));
} else {
printWatches = args[1].equals("on");
}
} else if (cmd.equals("connect")) {
if (args.length >= 2) {
connectToZK(args[1]);
} else {
connectToZK(host);
}
}
// Below commands all need a live connection
if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
// execute from commandMap
CliCommand cliCmd = commandMapCli.get(cmd);
if(cliCmd != null) {
cliCmd.setZk(zk);
watch = cliCmd.parse(args).exec();
} else if (!commandMap.containsKey(cmd)) {
usage();
}
return watch;
}