持久化源码
快照
public interface SnapShot {
/**
* deserialize a data tree from the last valid snapshot and
* return the last zxid that was deserialized
* 反序列化方法
*/
long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException;
/**
* persist the datatree and the sessions into a persistence storage
* 序列化代码
*/
void serialize(DataTree dt, Map<Long, Integer> sessions,
File name)
throws IOException;
/**
* find the most recent snapshot file
* 查找最近的快照文件
*/
File findMostRecentSnapshot() throws IOException;
/**
* free resources from this snapshot immediately
* 释放资源
*/
void close() throws IOException;
}
操作日志
public interface TxnLog {
/**
* Setter for ServerStats to monitor fsync threshold exceed
* 设置服务状态
*/
void setServerStats(ServerStats serverStats);
/**
* roll the current
* log being appended to
* 滚动日志
*/
void rollLog() throws IOException;
/**
* Append a request to the transaction log
* returns true iff something appended, otw false
* 追加
*/
boolean append(TxnHeader hdr, Record r) throws IOException;
/**
* Start reading the transaction logs
* 读取数据
*/
TxnIterator read(long zxid) throws IOException;
/**
* the last zxid of the logged transactions.
* 获取最后一个 zxid
*/
long getLastLoggedZxid() throws IOException;
/**
* truncate the log to get in sync with the
* leader.
* 删除日志
*/
boolean truncate(long zxid) throws IOException;
/**
* the dbid for this transaction log.
* @return the dbid for this transaction log.
* 获取 DbId
*/
long getDbId() throws IOException;
/**
* commit the transaction and make sure
* they are persisted
* 提交
*/
void commit() throws IOException;
/**
*
* @return transaction log's elapsed sync time in milliseconds
* 日志同步时间
*/
long getTxnLogSyncElapsedTime();
/**
* close the transactions logs
* 关闭日志
*/
void close() throws IOException;
/**
* an iterating interface for reading
* transaction logs.
// 读取日志的接口
*/
public interface TxnIterator {
/**
* return the transaction header.
* 获取头信息
*/
TxnHeader getHeader();
/**
* return the transaction record.
* 获取传输的内容
*/
Record getTxn();
/**
* go to the next transaction record.
* 下一条 记录
*/
boolean next() throws IOException;
/**
* close files and release the
* resources
* 关闭资源
*/
void close() throws IOException;
/**
* Get an estimated storage space used to store transaction records
* that will return by this iterator
* 获取存储的大小
*/
long getStorageSize() throws IOException;
}
}
处理持久化的核心类
序列化源码
zookeeper-jute代码是关于 Zookeeper序列化相关源码
序列化和反序列化代码
public interface Record {
public void serialize(OutputArchive archive, String tag) throws IOException;
public void deserialize(InputArchive archive, String tag) throws IOException;
}
迭代代码
public interface Index {
public boolean done();
public void incr();
}
支持序列化的数据类型
public interface OutputArchive {
public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag) throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List<?> v, String tag) throws IOException;
public void endVector(List<?> v, String tag) throws IOException;
public void startMap(TreeMap<?,?> v, String tag) throws IOException;
public void endMap(TreeMap<?,?> v, String tag) throws IOException;
}
支持反序列化的数据类型
public interface InputArchive {
public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;
}
ZK 服务端初始化源码解析
ZK 服务端启动脚本分析
Zookeeper 服务的启动命令是zkServer.sh start
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh //相当于获取zkEnv.sh 中的环境变量(ZOOCFG="zoo.cfg")
fi
...
zkServer.sh start底层的实际执行内容
nohup "$JAVA" + 一堆提交参数
+ $ZOOMAIN org.apache.zookeeper.server.quorum.QuorumPeerMain
+ "$ZOOCFG" zkEnv.sh文件中 ZOOCFG="zoo.
所以程序的入口是 QuorumPeerMain.java类
ZK服务端启动入口
initializeAndRun
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 管理 zk的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 解析参数 zoo.cfg和 myid
config.parse(args[0]);
}
// Start and schedule the the purge task
// 启动 定时 任务, 对过期的快照,执行删除 (默认该功能关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
解析参数 zoo.cfg和 myid
QuorumPeerConfig.java
QuorumPeerConfig.java
/**
* Parse config from a Properties.
* @param zkProp Properties to parse from.
* @throws IOException
* @throws ConfigException
*/
public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
int clientPort = 0;
int secureClientPort = 0;
String clientPortAddress = null;
String secureClientPortAddress = null;
// 读取 zoo.cfg文件中的属性值,并赋值给 QuorumPeerConfig的类对象
VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = vff.create(value);
} else if (key.equals("dataLogDir")) {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("localSessionsEnabled")) {
localSessionsEnabled = Boolean.parseBoolean(value);
} else if (key.equals("localSessionsUpgradingEnabled")) {
localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
}
...
}
QuorumPeerConfig.java
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
private void setupMyId() throws IOException {
File myIdFile = new File(dataDir, "myid");
// standalone server doesn't need myid file.
if (!myIdFile.isFile()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
// 将解析 myid文件中的 id赋值给 serverId
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString
+ " is not a number");
}
}
过期快照删除
可以启动定时任务,对过期的快照,执行删除 。默认该功能时关闭的
QuorumPeerMain.java
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 管理 zk的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1解析参数, zoo.cfg和 myid
config.parse(args[0]);
}
// Start and schedule the the purge task
// 2启动 定时 任务, 对过期的快照,执行删除 (默认是关闭
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0 默认 0表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3 启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
// 本地模式
ZooKeeperServerMain.main(args);
}
}
DatadirCleanupManager.java
跟进定时任务的start方法
定义一个静态内部类实现定时删除快照的任务
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// 默认情况 purgeInterval=0,该任务关闭,直接返回
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
// 创建一个定时器
timer = new Timer("PurgeTask", true);
// 创建一个清理快照任务
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
// 如果 purgeInterval设置的值是 1,表示 1小时检查一次,判断是否有过期快照,有则删除
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
/**
* Shutdown the purge task.
*/
public void shutdown() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.info("Shutting down purge task.");
timer.cancel();
purgeTaskStatus = PurgeTaskStatus.COMPLETED;
} else {
LOG.warn("Purge task not started. Ignoring shutdown!");
}
}
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir;
private int snapRetainCount;
public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}
@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理过期的数据
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}
查看具体清理过期数据的方法
PurgeTxnLog.java
/**
* Purges the snapshot and logs keeping the last num snapshots and the
* corresponding logs. If logs are rolling or a new snapshot is created
* during this process, these newest N snapshots or any data logs will be
* excluded from current purging cycle.
*/
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}
初始化通信组件
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 管理 zk的配置信息
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
// 1解析参数, zoo.cfg和 myid
config.parse(args[0]);
}
// Start and schedule the the purge task
// 2启动 定时 任务, 对过期的快照,执行删除 (默认是关闭
// config.getSnapRetainCount() = 3 最少保留的快照个数
// config.getPurgeInterval() = 0 默认 0表示关闭
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 3 启动集群
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
// 本地模式
ZooKeeperServerMain.main(args);
}
}
通信协议默认 NIO(可以支持 Netty
进入启动集群的方法
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
// 通信组件初始化,默认是 NIO通信
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
// 把解析的参数赋值给该 zookeeper节点
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
// 管理 zk数据的存储
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
// 管理 zk的通信
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
// 启动 zk
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
通信初始化具体实现代码
ServerCnxnFactory.java
static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
zookeeperAdmin.md 文件中
初始化 NIO服务端 Socket(并未启动)
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();
maxClientCnxns = maxcc;
sessionlessCnxnTimeout = Integer.getInteger(
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
cnxnExpiryQueue =
new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores/2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}
numWorkerThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
LOG.info("Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+ " timeout, " + numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." :
("" + (directBufferBytes/1024) + " kB direct buffers.")));
for(int i=0; i<numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
// 初始化 NIO服务端 socket,绑定 2181端口,可以接收客户端请求
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
// 绑定 2181端口
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}