- 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
- 📕系列专栏:Spring源码、JUC源码、Kafka原理、分布式技术原理
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
- 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- Zookeeer leader选举
- Leader Latch
- Leader Selector
- 实际应用
- Zookeeper的实现原理分析
- 数据一致性模型
- 什么是顺序一致性呢?
- ZAB
- 消息广播
- 崩溃恢复
- zxid(事务id)
- 源码分析猜想启动zookeeper
- Leader选举源码及原理分析
- 构建源码
- 启动Zookeeper Server
- 猜想
- 源码分析
- startServerCnxnFactory 开启监听
- startLeaderElection 开启leader选举
- 最终执行
- makeLEStrategy().lookForLeader() leader选举的算法
- sendNotifications
我们引入中间件是为了解决我们应用场景中的某些特定问题
Zookeeer leader选举
在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
接下来会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析
Leader Latch
利用临时有序节点
Leader Selector
利用-> curator-recepis 中锁的实现
zookeeper里面有leader和follower的概念,但是它本身也可以帮助其它进行leader选举,例如kafka、nacos。
那么怎么基于Zookeeper来实现选举呢?创建临时有序节点
那么具体的判断方式有哪些,比如基于最小的来判断(kafka)
Leader Latch 的实现其实是基于 临时有序节点,没有抢到的节点都会去监听它前一个节点,如果前一个节点删除以后,它会重新去抢主。
实际应用
public class QuartzJob extends QuartzJobBean{
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("[QuartzJob]-----:"+
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
@Configuration
public class QuartzConfig {
@Bean
public JobDetail jobDetail(){
return JobBuilder.newJob(QuartzJob.class).storeDurably().build();
}
@Bean
public Trigger trigger(){
SimpleScheduleBuilder simpleScheduleBuilder=
SimpleScheduleBuilder.simpleSchedule().
withIntervalInSeconds(1).repeatForever();
return TriggerBuilder.newTrigger().forJob(jobDetail()).
withSchedule(simpleScheduleBuilder).build();
}
}
单体应用下,实际上一个定时任务就启动了。如果多节点下呢?比如说复制一下,修改下端口,同时启动,发现两个项目都会同时执行。那么如何去解决呢?在这里可以用到leader选举,两个节点,谁是leader,谁有资格执行。
定时调度里面有一个 SchedulerFactoryBean ,我们可以继承这个 SchedulerFactoryBean,让它不自动启动,并且按照我们的意愿去启动
public class ZkSchedulerFactoryBean extends SchedulerFactoryBean{
private static CuratorFramework zkClient;
private static String ZOOKEEPER_CONNECTION_STRING="192.168.216.128:2181";
private LeaderLatch leaderLatch; //leader选举的api
private static final String LEADER_PATH="/leader";
Logger LOG= LoggerFactory.getLogger(ZkSchedulerFactoryBean.class);
public ZkSchedulerFactoryBean() throws Exception {
this.setAutoStartup(false); //设置为非自动启动 此时不会去启动默认的定时任务
leaderLatch=new LeaderLatch(getClient(),LEADER_PATH);
leaderLatch.addListener(new ZkJobLeaderLatchListener(getIp(),this));
leaderLatch.start(); //表示当前节点参与到leader选举中来
}
@Override
protected void startScheduler(Scheduler scheduler, int startupDelay) throws SchedulerException {
if(this.isAutoStartup()){//默认情况下,是true
super.startScheduler(scheduler,startupDelay);
}
}
@Override
public void destroy() throws SchedulerException {
CloseableUtils.closeQuietly(leaderLatch);
super.destroy();
}
//初始化连接
private CuratorFramework getClient(){
// 重试策略
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);
zkClient = CuratorFrameworkFactory.builder().
connectString(ZOOKEEPER_CONNECTION_STRING).retryPolicy(retryPolicy).build();
zkClient.start();
return zkClient;
}
private String getIp(){
String host=null;
try {
host= InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return host;
}
class ZkJobLeaderLatchListener implements LeaderLatchListener{
private String ip;
private SchedulerFactoryBean schedulerFactoryBean;
public ZkJobLeaderLatchListener(String ip) {
this.ip = ip;
}
public ZkJobLeaderLatchListener(String ip, SchedulerFactoryBean schedulerFactoryBean) {
this.ip = ip;
this.schedulerFactoryBean = schedulerFactoryBean;
}
@Override
public void isLeader() {
LOG.info("ip:{} 成为leader,执行scheduler~",ip);
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.start(); //启动(抢占到leader的节点去执行任务)
}
@Override
public void notLeader() {
LOG.info("ip:{} 不是leader,停止scheduler~",ip);
schedulerFactoryBean.setAutoStartup(false);
schedulerFactoryBean.stop(); //启动(抢占到leader的节点去执行任务)
}
}
}
此时再次修改QuartzConfig
@Configuration
public class QuartzConfig {
@Bean
public ZkSchedulerFactoryBean schedulerFactoryBean() throws Exception {
ZkSchedulerFactoryBean schedulerFactoryBean=new ZkSchedulerFactoryBean();
schedulerFactoryBean.setJobDetails(jobDetail());
schedulerFactoryBean.setTriggers(trigger());
return schedulerFactoryBean;
}
@Bean
public JobDetail jobDetail(){
return JobBuilder.newJob(QuartzJob.class).storeDurably().build();
}
@Bean
public Trigger trigger(){
SimpleScheduleBuilder simpleScheduleBuilder=
SimpleScheduleBuilder.simpleSchedule().
withIntervalInSeconds(1).repeatForever();
return TriggerBuilder.newTrigger().forJob(jobDetail()).
withSchedule(simpleScheduleBuilder).build();
}
}
此时运行两个节点,只有leader才可以执行定时任务,当leader节点停止的时候,此时非leader节点就会接收,然后执行定时任务。
Leader Latch和Leader Selector是Apache Curator提供的两种用于协调分布式系统中领导者选举的机制。
Leader Latch是一种简单的机制,它允许一个进程获得锁并成为领导者,直到它显式地释放该锁为止。一旦该进程释放了锁,其他进程可以争取获得领导者锁。这种机制适用于那些只允许一个进程作为领导者的场景。
Leader Selector是另一种机制,它允许多个进程在同一时间成为领导者。每个进程都会尝试获得领导者标记,并在成功后执行操作,直到它被另一个进程替代。这种机制适用于那些需要多个进程参与计算的场景,例如MapReduce等。
其实际应用有很多,比如:
- dubbo +zookeeper 注册中心、注册服务
- dubbo+ zookeeper 配置中心、元数据管理
- 实现分布式锁(Curator)
- leader选举(定时任务的互斥执行)
1.leader latch(kafka的leader选取基于此)
Kafka的leader选举机制是基于Leader Latch的。在Kafka集群中,每个分区都会有一个领导者(leader)和若干个副本(replica),领导者负责处理读写请求,而副本则用于数据冗余和故障转移。
当领导者出现故障或网络分区等情况时,Kafka需要进行新的领导者选举。这时,每个副本都可以争取成为新的领导者,但只有其中一个副本能够成功地获得领导者锁。获得领导者锁的副本将成为新的领导者,并负责处理读写请求,直到它自己出现故障或被替换为止。
Kafka使用ZooKeeper来实现领导者选举的机制。每个分区的领导者选举都会在ZooKeeper中创建一个临时节点,并竞争成为该节点的拥有者。只有拥有该节点的副本才能成为领导者,其他副本则成为副本。当领导者出现故障时,其对应的临时节点将被删除,触发新一轮的领导者选举过程。
2.leader selector
Zookeeper的实现原理分析
数据一致性模型
- 弱一致性模型
- 2pc协议
- 过半提交
zookeeper是一个顺序一致性模型。由于zookeeper设计出来是提供分布式锁服务,那么意味着它本身需要实现顺序一致性。顺序一致性是在分布式环境中实现分布式锁的基本要求,比如当一个多个程序来争抢锁,如果clientA获得锁以后,后续所有来争抢锁的程序看到的锁的状态都应该是被clientA 锁定了,而不是其他状态。
什么是顺序一致性呢?
在讲顺序一致性之前,咱们思考一个问题,假如说zookeeper是一个最终一致性模型,那么他会发生什么情况
ClientA/B/C假设只串行执行, clientA更新zookeeper上的一个值x。ClientB和clientC分别读取集群的不同副本,返回的x的值是不一样的。clientC的读取操作是发生在clientB之后,但是却读到了过期的值。很明显,这是一种弱一致模型。如果用它来实现锁机制是有问题的。
顺序一致性提供了更强的一致性保证,我们来观察下面这个图,从时间轴来看,B0发生在A0之前,读取的值是0,B2发生在A0之后,读取到的x的值为1.而读操作B1/C0/C1和写操作A0在时间轴上有重叠,因此他们可能读到旧的值为0,也可能读到新的值1. 但是在强顺序一致性模型中,如果B1得到的x的值为1,那么C1看到的值也一定是1.
需要注意的是:由于网络的延迟以及系统本身执行请求的不确定性,会导致请求发起的早的客户端不一定会在服务端执行得早。最终以服务端执行的结果为准。
简单来说:顺序一致性是针对单个操作,单个数据对象。属于CAP中C这个范畴。一个数据被更新后,能够立马被后续的读操作读到。
但是zookeeper的顺序一致性实现是缩水版的,zookeeper不保证在每个实例中,两个不同的客户端具有相同的zookeeper数据视图,由于网络延迟等因素,一个客户端可能会在另外一个客户端收到更改通知之前执行更新。
考虑到2个客户端A和B的场景,如果A把znode /a的值从0设置为1,然后告诉客户端B读取 /a, 则客户端B可能会读取到旧的值0,具体取决于他连接到那个服务器,如果客户端A和B要读取必须要读取到相同的值,那么client B在读取操作之前执行sync方法。 zooKeeper.sync();
具体详解:https://zookeeper.apache.org/doc/r3.6.1/zookeeperProgrammers.html#ch_zkGuarantees
ZAB
ZAB(Zookeeper Atomic Broadcast) 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。
- 崩溃恢复
- 原子广播
这个协议的本质
消息广播
其存在一个全局唯一的64位的事务id,zxid,zxid随着数据的更新也会不断地变化,zxid越大,也就决定着数据的新和旧。
上图是这样做的,leader会为每一个follower准备一个队列,其实在整个zookeeper中存在着很多的生产者消费者模型,其本质上是fifo队列,所有的请求都是基于这个队列去做一个数据同步,其会在每个消息中带一个zxid,它会将消息 与 zxid分发到所有的follower节点上,这个过程就叫做 propose事务提案。
follower节点收到这个提案的时候,会先将其写入本地磁盘,写入之后再返回ack,收到ack后会判断合法数量,如果超过了半数,就commit提交本地的事务
- Observer(不参与投票和ack,只和leader保持数据同步)
崩溃恢复
- 选举出新leader(选举谁作为leader)
- 数据同步
选举谁作为leader?需要怎么考虑?
- 已经被处理的消息不能丢失
当leader收到合法数量的ack之后,会向follower发送commit请求,follower会提交本地的事务。
但是如果在follower节点收到commit命令之前,leader挂掉了,也就是 server3没有commit,所以我们需要保证,已经被提交的消息不能被丢失。
所以如果server2想成为leader,那么必须发起一个commit给server3
- 被丢弃的消息不能再次出现
也就是没有提交的消息不能被使用。
如何实现?(leader)
如果选举出来的leader的zxid是在整个集群中最大的,也就意味着当前节点数据是最新的,那么zxid是和leader选举是有关系的。
每一轮新的leader选举,都会有一个epoch,是递加的。所以zxid由两部分组成(epoch + zxid)
通过这两个方式,能保障数据最新的是他的设计。
zxid(事务id)
代表着当前我们的每一个事务的操作都会生成一个唯一的id,是全局唯一的,所以为了保证事务的顺序一致性,使用一个递增的事务id来进行标记。前面说过所有的提案都会增加zxid。
zxid是64位的数字,其高32位是epoch(每一轮leader变化所带来的时钟周期的变化),低32位代表递增事务编号
源码分析猜想启动zookeeper
- 加载配置(zoo.cfg)
- 初始化
- 启动2181的端口(独立的业务服务),监听客户端请求(zkClient)
- 启动(2888、 3888)这个端口的监听
- 初始化leader选举(----)
- 开启leader选举
- 加载磁盘的数据
// 下面代码是zookeeper2.7.0的加载分析流程
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
--------------------------------------------------------------------
protected void initializeAndRun(String[] args) throws ConfigException, IOException {
// 用来保存全局配置的
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// 启动定时任务去做日志清理
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
// 如果是分布式的配置就启动这个
if (args.length == 1 && config.servers.size() > 0) {
this.runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
ZooKeeperServerMain.main(args); // 否则启动单机的
}
}
--------------------------------------------------------------------
public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException var4) {
LOG.warn("Unable to register log4j JMX control", var4);
}
LOG.info("Starting quorum peer");
try {
/*
ClientCnxn 客户端和服务端进行网络交互的类
ServerCnxn 服务端的网络通信处理类
*/
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
// quorumPeer 表示集群中某个节点的信息,它会将启动时加载的一些东西配置在这里面
this.quorumPeer = new QuorumPeer();
this.quorumPeer.setClientPortAddress(config.getClientPortAddress());
this.quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())));
this.quorumPeer.setQuorumPeers(config.getServers());
this.quorumPeer.setElectionType(config.getElectionAlg());// 选举算法
this.quorumPeer.setMyid(config.getServerId());
this.quorumPeer.setTickTime(config.getTickTime()); // 心跳时间间隔
this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化的时长
this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步的时长
this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
this.quorumPeer.setCnxnFactory(cnxnFactory);
this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化
this.quorumPeer.setLearnerType(config.getPeerType());
this.quorumPeer.setSyncEnabled(config.getSyncEnabled());
this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
this.quorumPeer.start(); // 启动任务
this.quorumPeer.join();// quorumPeer是一个线程,代表要等到quorumPeer这个线程执行结束
} catch (InterruptedException var3) {
LOG.warn("Quorum Peer interrupted", var3);
}
}
Leader选举源码及原理分析
构建源码
- 基于Maven构建
启动Zookeeper Server
- 集群的方式 : QuorumPeerMain
- 单机的方式运行 : ZookeeperServerMain
猜想
- zoo.cfg , 加载配置
- 监听2181 , (NIO / Netty) ->NIO (TCP协议的课程中有讲到)
- 初始化一些2888、3888, 选举/数据同步的监听, BIO的方式
- 选举算法的初始化以及选举的执行(leader)
- 本地文件的加载和恢复
- 数据同步.
源码分析
基于zookeeper3.6.1
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
---------------------------------------------------------------------
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
// 用来保存全局配置
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]); // 相当于zoo.cfg文件,解析配置文件并保存到QuorumPeerConfig
}
// 启动一个定时任务来清理日志
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) { // 集群
this.runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
ZooKeeperServerMain.main(args); // 单独节点
}
}
---------------------------------------------------------------------
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException var17) {
LOG.warn("Unable to register log4j JMX control", var17);
}
LOG.info("Starting quorum peer");
MetricsProvider metricsProvider; // 指标数据
// 会发布当前zookeeper当中的指标数据
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException var16) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), var16);
}
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
/*
这个和2181端口监听有关系
ClientCnxn 客户端和服务端进行网络交互的类
ServerCnxn 服务端的网络通信处理类
*/
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
// 安全连接的方式
if (config.getClientPortAddress() != null) { // 默认情况下
cnxnFactory = ServerCnxnFactory.createFactory();
// 监听对应的端口
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
this.quorumPeer = this.getQuorumPeer();
this.quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
this.quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
this.quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
this.quorumPeer.setElectionType(config.getElectionAlg()); // 采用什么样的选举算法
this.quorumPeer.setMyid(config.getServerId());// myid
this.quorumPeer.setTickTime(config.getTickTime());// 心跳时间间隔(2000)
this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
this.quorumPeer.setInitLimit(config.getInitLimit());// 数据初始化时长
this.quorumPeer.setSyncLimit(config.getSyncLimit());// 数据同步时长
this.quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
this.quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
this.quorumPeer.setConfigFileName(config.getConfigFilename());
this.quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); // 内存数据库 -》 对应磁盘的持久化
this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
this.quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
this.quorumPeer.initConfigInZKDatabase();
this.quorumPeer.setCnxnFactory(cnxnFactory);
this.quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
this.quorumPeer.setSslQuorum(config.isSslQuorum());
this.quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
this.quorumPeer.setLearnerType(config.getPeerType());
this.quorumPeer.setSyncEnabled(config.getSyncEnabled());
this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
this.quorumPeer.getX509Util().enableCertFileReloading();
}
this.quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
this.quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
this.quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
this.quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (this.quorumPeer.isQuorumSaslAuthEnabled()) {
this.quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
this.quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
this.quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
this.quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
this.quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
this.quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
this.quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
this.quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
this.quorumPeer.start(); // 启动任务
ZKAuditProvider.addZKStartStopAuditLog();
this.quorumPeer.join(); // quorumPeer是线程,代表要等到quorumPeer 这个线程执行结束
} catch (InterruptedException var15) {
LOG.warn("Quorum Peer interrupted", var15);
} finally {
if (metricsProvider != null) {
try {
metricsProvider.stop();
} catch (Throwable var14) {
LOG.warn("Error while stopping metrics", var14);
}
}
}
}
连接zookeeper监控数据
连接上后可以看到其属性的指标
我们创建zookeeper节点的时候,会在zk的内存中jkdbbase,会有一个DataTree的对象,这里面主要就保存了zk里面节点的数据
// 进入 start 代码块
public synchronized void start() { // 重写线程的 start方法
if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面
throw new RuntimeException("My id " + this.myid + " not in the peer list");
} else {
this.loadDataBase(); // 加载数据
this.startServerCnxnFactory();// 这里来启动2181的服务监听
try {
this.adminServer.start();
} catch (AdminServerException var2) {
LOG.warn("Problem starting AdminServer", var2);
System.out.println(var2);
}
this.startLeaderElection(); // 开启leader选举
this.startJvmPauseMonitor(); // 监控方面的东西
super.start(); // 启动线程
}
}
startServerCnxnFactory 开启监听
---------------------------------------------------------------
// 初始化
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
public static ServerCnxnFactory createFactory() throws IOException {
// 获取一个系统的环境变量
String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory");
if (serverCnxnFactoryName == null) {
// 使用NIO
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
// 否则会根据名字去加载对应的实现
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception var3) {
IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, var3);
throw ioe;
}
}
---------------------------------------------------------------
// 针对 configure 不同的Factory中有不同的实现
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
} else {
// 安全逻辑方式登录
this.configureSaslLogin();
this.maxClientCnxns = maxcc;
this.initMaxCnxns();
this.sessionlessCnxnTimeout = Integer.getInteger("zookeeper.nio.sessionlessCnxnTimeout", 10000);
this.cnxnExpiryQueue = new ExpiryQueue(this.sessionlessCnxnTimeout);
this.expirerThread = new NIOServerCnxnFactory.ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors(); // 获取当前系统处理器的核心数
this.numSelectorThreads = Integer.getInteger("zookeeper.nio.numSelectorThreads", Math.max((int)Math.sqrt((double)((float)numCores / 2.0F)), 1));
if (this.numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
} else {
this.numWorkerThreads = Integer.getInteger("zookeeper.nio.numWorkerThreads", 2 * numCores);
this.workerShutdownTimeoutMS = Long.getLong("zookeeper.nio.shutdownTimeout", 5000L);
String logMsg = "Configuring NIO connection handler with " + this.sessionlessCnxnTimeout / 1000 + "s sessionless connection timeout, " + this.numSelectorThreads + " selector thread(s), " + (this.numWorkerThreads > 0 ? this.numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : "" + directBufferBytes / 1024 + " kB direct buffers.");
LOG.info(logMsg);
// 通过前面的计算,
for(int i = 0; i < this.numSelectorThreads; ++i) {
this.selectorThreads.add(new NIOServerCnxnFactory.SelectorThread(i));
}
this.listenBacklog = backlog;
// 打开一个ServerSocketChannel实例
this.ss = ServerSocketChannel.open();
this.ss.socket().setReuseAddress(true);
LOG.info("binding to port {}", addr);
if (this.listenBacklog == -1) {
this.ss.socket().bind(addr); // 绑定监听的端口号
} else {
this.ss.socket().bind(addr, this.listenBacklog);
}
this.ss.configureBlocking(false);// 配置为非阻塞
this.acceptThread = new NIOServerCnxnFactory.AcceptThread(this.ss, addr, this.selectorThreads);// 接收线程
/**
* * AcceptThread 用于处理接收客户端的请求
* * selectorThreads 用来处理selector的读写请求
*/
}
}
}
-----------------------------------------------------------------------
private void startServerCnxnFactory() {
if (this.cnxnFactory != null) {
this.cnxnFactory.start(); //在之前的配置中进行了赋值
}
if (this.secureCnxnFactory != null) {
this.secureCnxnFactory.start(); //在之前的配置中进行了赋值
}
}
public void start() {
this.stopped = false;
if (this.workerPool == null) {
this.workerPool = new WorkerService("NIOWorker", this.numWorkerThreads, false);
}
Iterator var1 = this.selectorThreads.iterator();
while(var1.hasNext()) {
NIOServerCnxnFactory.SelectorThread thread = (NIOServerCnxnFactory.SelectorThread)var1.next();
if (thread.getState() == State.NEW) {
thread.start(); // 而这里面的start 会调用里面的run方法
// 开启selector轮询io操作的线程,如果当前没有就绪的连接,那么就会阻塞到select()里,因为里面的run方法里面的select 里面的 selector.select() 进行复路器的选择的时候,如果没有就绪连接,那么它会阻塞。
}
}
if (this.acceptThread.getState() == State.NEW) {
this.acceptThread.start();// 用于处理客户端的连接
}
// 在这里可以借鉴的是,其将io处理 和 连接处理的两个操作分开,到两个线程去处理,而且其采用多个线程进行一个轮询,整体提升其处理的性能。
if (this.expirerThread.getState() == State.NEW) {
this.expirerThread.start();
}
}
public void run() {
try {
while(!NIOServerCnxnFactory.this.stopped && !this.acceptSocket.socket().isClosed()) {
try {
this.select();
} catch (RuntimeException var6) {
NIOServerCnxnFactory.LOG.warn("Ignoring unexpected runtime exception", var6);
} catch (Exception var7) {
NIOServerCnxnFactory.LOG.warn("Ignoring unexpected exception", var7);
}
}
} finally {
this.closeSelector();
if (!this.reconfiguring) {
NIOServerCnxnFactory.this.stop();
}
NIOServerCnxnFactory.LOG.info("accept thread exitted run method");
}
}
private void select() {
try {
this.selector.select(); // 客户端进行轮询,去轮询就绪的连接
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while(!NIOServerCnxnFactory.this.stopped && selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey)selectedKeys.next();
selectedKeys.remove();
if (key.isValid()) {
if (key.isAcceptable()) {
if (!this.doAccept()) {
this.pauseAccept(10L);
}
} else {
NIOServerCnxnFactory.LOG.warn("Unexpected ops in accept select {}", key.readyOps());
}
}
}
} catch (IOException var3) {
NIOServerCnxnFactory.LOG.warn("Ignoring IOException while selecting", var3);
}
}
startLeaderElection 开启leader选举
public synchronized void startLeaderElection() {
try {
// 得到当前节点的状态,如果是LOOKING
if (this.getPeerState() == QuorumPeer.ServerState.LOOKING) {
// 构建一个Vote(myid, zxid, epoch)
this.currentVote = new Vote(this.myid, this.getLastLoggedZxid(), this.getCurrentEpoch());
}
} catch (IOException var3) {
RuntimeException re = new RuntimeException(var3.getMessage());
re.setStackTrace(var3.getStackTrace());
throw re;
}
// 根据 electionType 来创建选举算法
this.electionAlg = this.createElectionAlgorithm(this.electionType);
}
------------------------------------------------------------------------------
// 在3.6版本中,无论怎么配置其他的选举算法,最后都支持一种,因为在源码中可以看到,前两种直接抛出了异常。
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
switch(electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
// cnxn (和网络通信有关的一个类,ServerCnxn,ClientCnxn)
// QuorumCnxManager 管理集群选举和投票相关的
QuorumCnxManager qcm = this.createCnxnManager();
QuorumCnxManager oldQcm = (QuorumCnxManager)this.qcmRef.getAndSet(qcm);
if (oldQcm != null) { // 判断是否已经开启了选举
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();// 终止掉当前的选举
}
// 监听集群中的票据
Listener listener = qcm.listener;
if (listener != null) {
listener.start();
// 初始化了 FastLeaderElection
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();// 启动leader选举
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
---------------------------------------------------------------------------
// 进入start方法中
public void start() {
this.messenger.start();
}
void start() {
// 启动两个线程
/*
从这里可以看到zookeeper的整个流程基本上都是异步化的
WorkerSender 发送票据
WorkerReceiver 接收票据
*/
this.wsThread.start();
this.wrThread.start();
}
protected class Messenger {
FastLeaderElection.Messenger.WorkerSender ws;
FastLeaderElection.Messenger.WorkerReceiver wr;
Thread wsThread = null;
Thread wrThread = null;
Messenger(QuorumCnxManager manager) {
// 启动的线程的初始化
this.ws = new FastLeaderElection.Messenger.WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new FastLeaderElection.Messenger.WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]");
this.wrThread.setDaemon(true);
}
前面部分所有的逻辑如图所示
最终执行
前面的实际上通过阅读都是一些初始化逻辑,而真正的执行是 其中的 super.start()
// 进入 start 代码块
public synchronized void start() { // 重写线程的 start方法
if (!this.getView().containsKey(this.myid)) { // 判断当前的myid是否在当前集群配置的里面
throw new RuntimeException("My id " + this.myid + " not in the peer list");
} else {
this.loadDataBase(); // 加载数据
this.startServerCnxnFactory();// 这里来启动2181的服务监听
try {
this.adminServer.start();
} catch (AdminServerException var2) {
LOG.warn("Problem starting AdminServer", var2);
System.out.println(var2);
}
this.startLeaderElection(); // 开启leader选举
this.startJvmPauseMonitor(); // 监控方面的东西
super.start(); // 启动线程
}
}
--------------------------------------------------------------------------
public void run() {
this.updateThreadName();
LOG.debug("Starting quorum peer");
// 整个这部分都是注册JMS 也就是上报指标的数据
try {
this.jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(this.jmxQuorumBean, (ZKMBeanInfo)null);
Iterator var1 = this.getView().values().iterator();
while(var1.hasNext()) {
QuorumPeer.QuorumServer s = (QuorumPeer.QuorumServer)var1.next();
if (this.getId() == s.id) {
LocalPeerBean p = this.jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, this.jmxQuorumBean);
} catch (Exception var88) {
LOG.warn("Failed to register with JMX", var88);
this.jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, this.jmxQuorumBean);
this.jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception var87) {
LOG.warn("Failed to register with JMX", var87);
}
}
}
} catch (Exception var92) {
LOG.warn("Failed to register with JMX", var92);
this.jmxQuorumBean = null;
}
while(true) {
boolean var27 = false;
// 只要程序一直在运行,那么就是死循环
try {
var27 = true;
if (!this.running) {
var27 = false;
break;
}
switch(this.getPeerState()) { // 得到当前节点的状态(leader follower looking observer)
case LOOKING:
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1L);
// 如果是只读模式
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(this.logFactory, this, this.zkDb);
Thread roZkMgr = new Thread() {
public void run() {
try {
sleep((long)Math.max(2000, QuorumPeer.this.tickTime));
if (QuorumPeer.ServerState.LOOKING.equals(QuorumPeer.this.getPeerState())) {
roZk.startup();
}
} catch (InterruptedException var2) {
QuorumPeer.LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception var3) {
QuorumPeer.LOG.error("FAILED to start ReadOnlyZooKeeperServer", var3);
}
}
};
try {
roZkMgr.start();
this.reconfigFlagClear();
if (this.shuttingDownLE) {
this.shuttingDownLE = false;
this.startLeaderElection();
}
this.setCurrentVote(this.makeLEStrategy().lookForLeader());
} catch (Exception var85) {
LOG.warn("Unexpected exception", var85);
this.setPeerState(QuorumPeer.ServerState.LOOKING);
} finally {
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
this.reconfigFlagClear();
if (this.shuttingDownLE) {
this.shuttingDownLE = false;
this.startLeaderElection();
}
// makeLEStrategy().lookForLeader() 得到一个vote,说明这个vote是满足被同意的vote,得到vote是一个leader的vote 当前节点一定会在选举算法中,得到leader之后,重新设置一个状态
this.setCurrentVote(this.makeLEStrategy().lookForLeader());
} catch (Exception var84) {
LOG.warn("Unexpected exception", var84);
this.setPeerState(QuorumPeer.ServerState.LOOKING);
}
}
break;
case LEADING:
LOG.info("LEADING");
try {
try {
// 说明自己就是leader,那么把自己设置成leader
this.setLeader(this.makeLeader(this.logFactory));
this.leader.lead();
this.setLeader((Leader)null);
} catch (Exception var80) {
LOG.warn("Unexpected exception", var80);
}
break;
} finally {
if (this.leader != null) {
this.leader.shutdown("Forcing shutdown");
this.setLeader((Leader)null);
}
this.updateServerState();
}
case FOLLOWING:
try {
try {
LOG.info("FOLLOWING");
this.setFollower(this.makeFollower(this.logFactory));
this.follower.followLeader();// 如果当前节点是follower,则连接到leader
} catch (Exception var81) {
LOG.warn("Unexpected exception", var81);
}
break;
} finally {
this.follower.shutdown();
this.setFollower((Follower)null);
this.updateServerState();
}
case OBSERVING:
try {
LOG.info("OBSERVING");
this.setObserver(this.makeObserver(this.logFactory));
this.observer.observeLeader();
} catch (Exception var83) {
LOG.warn("Unexpected exception", var83);
} finally {
this.observer.shutdown();
this.setObserver((Observer)null);
this.updateServerState();
if (this.isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
}
} finally {
if (var27) {
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(this.jmxQuorumBean);
instance.unregister(this.jmxLocalPeerBean);
Iterator var12 = this.jmxRemotePeerBean.values().iterator();
while(var12.hasNext()) {
RemotePeerBean remotePeerBean = (RemotePeerBean)var12.next();
instance.unregister(remotePeerBean);
}
this.jmxQuorumBean = null;
this.jmxLocalPeerBean = null;
this.jmxRemotePeerBean = null;
}
}
}
LOG.warn("QuorumPeer main thread exited");
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(this.jmxQuorumBean);
instance.unregister(this.jmxLocalPeerBean);
Iterator var96 = this.jmxRemotePeerBean.values().iterator();
while(var96.hasNext()) {
RemotePeerBean remotePeerBean = (RemotePeerBean)var96.next();
instance.unregister(remotePeerBean);
}
this.jmxQuorumBean = null;
this.jmxLocalPeerBean = null;
this.jmxRemotePeerBean = null;
}
makeLEStrategy().lookForLeader() leader选举的算法
因为前面分析过源码,其实内部只有一种选举算法 FastLeaderElection
public Vote lookForLeader() throws InterruptedException {
// 前面还是和JMX有关,监控数据
try {
this.self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
} catch (Exception var24) {
LOG.warn("Failed to register with JMX", var24);
this.self.jmxLeaderElectionBean = null;
}
this.self.start_fle = Time.currentElapsedTime();
try {
Map<Long, Vote> recvset = new HashMap();
Map<Long, Vote> outofelection = new HashMap();
int notTimeout = minNotificationInterval;
synchronized(this) {
// 原地底层 logicalclock 代表逻辑时钟 epoch
this.logicalclock.incrementAndGet();
// 更新事务的票据 myid zxid epoch 把自己投票的提案设置成自己的信息
this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
}
LOG.info("New election. My id = {}, proposed zxid=0x{}", this.self.getId(), Long.toHexString(this.proposedZxid));
this.sendNotifications(); // 发送通知,将自己的票据发送到集群的其他节点,相当于每个节点都会收到其他节点的vote
FastLeaderElection.Notification n;
// 只要当前节点的状态是Looking,就不断地循环
while(this.self.getPeerState() == ServerState.LOOKING && !this.stop) {
// 从接收队列中获取一个 Notification 其表示接收到的集群中任意一个节点的vote
n = (FastLeaderElection.Notification)this.recvqueue.poll((long)notTimeout, TimeUnit.MILLISECONDS);
if (n == null) { // 判断是否有数据
if (this.manager.haveDelivered()) {
this.sendNotifications(); // 重新发送一次
} else {
this.manager.connectAll(); // 重连
}
// 延迟重试
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
// 先验证收到的Notification是否合法
// sid/leader 都代表myid
} else if (this.validVoter(n.sid) && this.validVoter(n.leader)) {
SyncedLearnerTracker voteSet;
Vote endVote;
Vote var7;
switch(n.state) { // 收到通知的节点状态
case LOOKING: // 在leader选举的这个阶段,收到投票时,所有节点的状态都是looking
if (this.getInitLastLoggedZxid() == -1L) {
LOG.debug("Ignoring notification as our zxid is -1");
}
else if (n.zxid == -1L) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
} else {
// 收到的epoch 要大于 自己的epoch
// 说明收到要比自己的更新,所以应该接纳对方的vote
if (n.electionEpoch > this.logicalclock.get()) {
this.logicalclock.set(n.electionEpoch); // 先将自己的epoch改成对方的
recvset.clear(); // 清空集合,重新归票
// 比较vote
if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) {
// 更新成收到的vote(leader zxid epoch)
this.updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
}
// 更新完成后继续发送广播 vote (可能是自己的,也可能是别人的)取决于epoch zxid myid
this.sendNotifications();
} else {
// 如果收到的epoch小于自己的,那么说明没资格影响自己的票据
if (n.electionEpoch < this.logicalclock.get()) {
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", Long.toHexString(n.electionEpoch), Long.toHexString(this.logicalclock.get()));
continue;
}
// 说明epoch相等
if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
this.updateProposal(n.leader, n.zxid, n.peerEpoch);
this.sendNotifications();
}
}
LOG.debug("Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}", new Object[]{n.sid, n.leader, Long.toHexString(n.zxid), Long.toHexString(n.electionEpoch)});
// 通过上面的多层比较,将其存在 recvset
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 根据选票数量来决定是否决定选举
voteSet = this.getVoteTracker(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch));
--------------------------------------------------------------------------------
// 总体来说,该方法的目的是根据给定的投票信息和特定投票对象构建一个SyncedLearnerTracker对象,并将满足条件的确认节点添加到其中。
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(this.self.getQuorumVerifier());
if (this.self.getLastSeenQuorumVerifier() != null && this.self.getLastSeenQuorumVerifier().getVersion() > this.self.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(this.self.getLastSeenQuorumVerifier());
}
Iterator var4 = votes.entrySet().iterator();
while(var4.hasNext()) {
Entry<Long, Vote> entry = (Entry)var4.next();
if (vote.equals(entry.getValue())) {
voteSet.addAck((Long)entry.getKey());
}
}
return voteSet;
}
-------------------------------------------------------------------------------- // 这里面的方法其实就是判断是否当前的myid 过了半数
if (!voteSet.hasAllQuorums()) {
continue;
}
--------------------------------------------------------------------------------
public boolean hasAllQuorums() {
Iterator var1 = this.qvAcksetPairs.iterator();
SyncedLearnerTracker.QuorumVerifierAcksetPair qvAckset;
do {
if (!var1.hasNext()) {
return true;
}
qvAckset = (SyncedLearnerTracker.QuorumVerifierAcksetPair)var1.next();
} while(qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()));
return false;
}
public boolean containsQuorum(Set<Long> ackSet) {
return ackSet.size() > this.half;
}
--------------------------------------------------------------------------------
// 验证是否会出现改变
while((n = (FastLeaderElection.Notification)this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
this.recvqueue.put(n);
break;
}
}
if (n == null) {
// 投票结束,这是当前节点的状态
this.setPeerState(this.proposedLeader, voteSet);
// 更新最终选举的状态
endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch);
this.leaveInstance(endVote);
var7 = endVote;
return var7;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
if (n.electionEpoch == this.logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = this.getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && this.checkLeader(recvset, n.leader, n.electionEpoch)) {
this.setPeerState(n.leader, voteSet);
endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
this.leaveInstance(endVote);
var7 = endVote;
return var7;
}
}
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = this.getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && this.checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized(this) {
this.logicalclock.set(n.electionEpoch);
this.setPeerState(n.leader, voteSet);
}
endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
this.leaveInstance(endVote);
var7 = endVote;
return var7;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
}
} else {
if (!this.validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!this.validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
n = null;
return n;
} finally {
try {
if (this.self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
}
} catch (Exception var21) {
LOG.warn("Failed to unregister with JMX", var21);
}
this.self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", this.manager.getConnectionThreadCount());
}
}
整体流程
sendNotifications
private void sendNotifications() {
Iterator var1 = this.self.getCurrentAndNextConfigVoters().iterator();
while(var1.hasNext()) {
long sid = (Long)var1.next();
QuorumVerifier qv = this.self.getQuorumVerifier();
FastLeaderElection.ToSend notmsg = new FastLeaderElection.ToSend(FastLeaderElection.ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock.get(), ServerState.LOOKING, sid, this.proposedEpoch, qv.toString().getBytes());
LOG.debug("Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient), {} (myid), 0x{} (n.peerEpoch) ", new Object[]{this.proposedLeader, Long.toHexString(this.proposedZxid), Long.toHexString(this.logicalclock.get()), sid, this.self.getId(), Long.toHexString(this.proposedEpoch)});
this.sendqueue.offer(notmsg);
}
}
如果存在多个节点的话
先假设,三个节点,分别是 vote(1) vote(2) vote(3)
假设下边的收到 vote(1) 通知,发现 vote(2) 比 vote(1) 的epoch大,此时需要将 vote(1) -》 vote(2),下次发送出去也是 vote(2)