master选举的基本概念
(1)假设有一个系统A,它向外提供了一个服务,叫做服务B。并且这个服务需要24小时持续不断的向外提供。也就是提供服务的机器不能够有单点故障。于是我们考虑使用集群。
(2)我们采用的是master-slave的方式。集群中有一台主机,多台备机。由主机向外提供服务,备机负责监听主机的状态。一旦出现了宕机,备机必须要很迅速的接管主机。继续向外服务。在这个过程中,由备机选出主机的过程就是master选举。
master选举的基本流程
(1)下图中左边代表的是zk集群,右边的三个立方体代表的是三台工作服务器。
(2)这三台机器在启动过程中会去zk的servers节点下创建三个临时节点。并且把自己的临时信息写入到临时节点中。这个过程我们 叫做服务注册。系统中的其它服务可以通过servers节点下的子节点列表来了解当前系统哪些服务器可用。这个过程叫做服务发现。
(4)紧接着这三台服务器就尝试去创建master节点,谁可以创建master节点,谁就是master.假设2号服务器可以创建master,那么2号服务器就称为master。其它的俩台服务器就是slave。所有的工作服务器都必须关注master节点的删除事件。
(5)一个临时节点在创建了会话失效以后,会自动的被删除掉。而创建这次会话的服务器宕机会直接导致会话失效。换句话说,我们可以监听master节点的删除来判断。
(6)master服务器是否宕机。一旦master服务器宕机,那么必须开始新一轮的选举。
master选举代码实现流程图
集群中每个工作服务器启动时的执行流程:
(1)每一台工作服务器都要关注master节点的删除工作。
(2)如果监测到master节点被删除了,那么每台工作服务器都尝试竞争成为master节点。
(3)如果某一工作服务器创建了master节点并且写入了自己的信息,那么表明此工作服务器已经成为了master节点。
如果存在网络抖动:
(1)在网络发生抖动的时候,可能会发生master 服务器会话的中断。那么这样的话,就会再进行master的选举,选举的结果有俩个,一个是原来的master服务器作为master。
(2)另外的一种情况是不是原来的master服务器作为master.但是如果选举了另外的服务器作为master,那么就会引起资源的迁移,会有消耗。所以,为了避免这种情况的发生,需要使原来的master服务器称为master的概率增大。最近一次的master服务器判断是否是自己。增强master。
master选举代码实现
(1)workServer:就是工作服务器
package com.jike.mastersel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
/**工作服务器**/
public class WorkServer {
private volatile boolean running = false;
private ZkClient zkClient;
private static final String MASTER_PATH = "/master";
private IZkDataListener dataListener;
private RunningData serverData;
private RunningData masterData;
private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
private int delayTime = 5;
public WorkServer(RunningData rd) {
this.serverData = rd;
this.dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
// TODO Auto-generated method stub
//takeMaster();
if (masterData!=null && masterData.getName().equals(serverData.getName())){
takeMaster();
}else{
delayExector.schedule(new Runnable(){
public void run(){
takeMaster();
}
}, delayTime, TimeUnit.SECONDS);
}
}
public void handleDataChange(String dataPath, Object data)
throws Exception {
// TODO Auto-generated method stub
}
};
}
public ZkClient getZkClient() {
return zkClient;
}
public void setZkClient(ZkClient zkClient) {
this.zkClient = zkClient;
}
public void start() throws Exception {
if (running) {
throw new Exception("server has startup...");
}
running = true;
zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
takeMaster();
}
public void stop() throws Exception {
if (!running) {
throw new Exception("server has stoped");
}
running = false;
delayExector.shutdown();
zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
releaseMaster();
}
private void takeMaster() {
if (!running)
return;
try {
zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
masterData = serverData;
System.out.println(serverData.getName()+" is master");
delayExector.schedule(new Runnable() {
public void run() {
// TODO Auto-generated method stub
if (checkMaster()){
releaseMaster();
}
}
}, 5, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e) {
RunningData runningData = zkClient.readData(MASTER_PATH, true);
if (runningData == null) {
takeMaster();
} else {
masterData = runningData;
}
} catch (Exception e) {
// ignore;
}
}
private void releaseMaster() {
if (checkMaster()) {
zkClient.delete(MASTER_PATH);
}
}
private boolean checkMaster() {
try {
RunningData eventData = zkClient.readData(MASTER_PATH);
masterData = eventData;
if (masterData.getName().equals(serverData.getName())) {
return true;
}
return false;
} catch (ZkNoNodeException e) {
return false;
} catch (ZkInterruptedException e) {
return checkMaster();
} catch (ZkException e) {
return false;
}
}
}
(2)RunningData:描述workServer的信息
package com.jike.mastersel;
import java.io.Serializable;
/**描述workServer的信息**/
public class RunningData implements Serializable {
/**
*
*/
private static final long serialVersionUID = 4260577459043203630L;
private Long cid;
private String name;
public Long getCid() {
return cid;
}
public void setCid(Long cid) {
this.cid = cid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
(3)leaderSelectorZkClinet:作为启动和停止WorkServer的调度器
package com.jike.mastersel;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
/**作为启动和停止WorkServer的调度器**/
public class LeaderSelectorZkClient
{
//启动的服务个数
private static final int CLIENT_QTY = 10;
//zookeeper服务器的地址
private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
public static void main(String[] args) throws Exception
{
//保存所有zkClient的列表
List<ZkClient> clients = new ArrayList<ZkClient>();
//保存所有服务的列表
List<WorkServer> workServers = new ArrayList<WorkServer>();
try
{
for ( int i = 0; i < CLIENT_QTY; ++i )
{
//创建zkClient
ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
clients.add(client);
//创建serverData
RunningData runningData = new RunningData();
runningData.setCid(Long.valueOf(i));
runningData.setName("Client #" + i);
//创建服务
WorkServer workServer = new WorkServer(runningData);
workServer.setZkClient(client);
workServers.add(workServer);
workServer.start();
}
System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
finally
{
System.out.println("Shutting down...");
for ( WorkServer workServer : workServers )
{
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
for ( ZkClient client : clients )
{
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}