什么是Zookeeper
分布式开源协调系统,数据模型简单,可以实现同步,配置管理,分组管理,分命名空间管理等。
技术本质
一个原子消息传递系统,它使所有服务器保持同步
FLP(3个科学家名字命名) 理论角度,Zookeeper是 SF的 ,使用超时机制来保证Livessnes
扩展FLP:
基于消息的异步通信系统,即使只有一个进程失败,也没有一个确定性算法,保证存活节点达到一致
确定性算法: 给定一个输入,一定会产生相同的输出
基于消息的异步通信系统 : 没有统一时钟,不能时间同步,不能使用超时,不能探测失败,消息可以任意延迟,消息可以乱序
扩展FLP不可能三角
Safety 系统中非故障节点达成了一致和合法的共识
Liveness 系统中非故障节点在有效时间能够达成共识
Fault Tolerance 协议必须在节点故障的时候同样有效
CAP理论角度 zookeeper 是CP的:
当leader节点down,剩余节点则会发起选举,选举期间导致短暂不可用。保证了强一致性而无法保证高可用性
数据模型
Watches监控节点变化
Data Access,Znode数据原子读写
Ephemeral Nodes临时节点,生命周期,与创建节点的进程一样
Sequence Nodes 创建的节点有序
client开发步骤
设计 path
选择znode类型
设计znode存储的内容
设计Watch client 关注什么事件,事件发生后如何处理
集群选举算法 - 最小节点获胜
算法说明
当 Leader 节点挂掉的时候,持有最小编号 znode 的集群节点成为新的 Leader
设计path
集群公用父节点,例如:/examples/leader
设计Znode类型
所有服务,需要在父节点下创建znode, 由于持有最小编号 znode是leader,选择用ephemeral_sequential 类型 znode
设计Znode存储的内容
一般服务部署在多台机子上,这里我们可以按需定制,一般会存储ip
设计Watch
节点启动或者重连后,在 parent 目录下创建自己的 ephemeral_sequntial znode
如果自己的 znode 编号是最小的,则成为Leader,否则 watch parent 目录;
当 parent 目录有节点删除的时候,首先判断其是否是 Leader 节点,然后再看其编号是否正好比自己小1,如果是则自己成为 Leader,如果不是继续 watch。
Curator Framework实现
Curator Framework简化zookeeper,原始api的工具
内部实现: LeaderSelector 最小节点获胜。
pom如下:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.1.0</version>
</dependency>
实例代码:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.*;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 从Curator Framework 的sample中fork
*/
public class LeaderSelectClient extends LeaderSelectorListenerAdapter
implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectClient(
CuratorFramework client,
String path,
String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
// 重新选举时自动入队
leaderSelector.autoRequeue();
}
public void start() throws IOException {
// 调用这个方法开启后台选举
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client)
throws Exception {
// 如果,不想开启下次选举,这个方法应该一致 不返回
final int waitSeconds = (int) (5 * Math.random()) + 1;
long sessionId = client.getZookeeperClient()
.getZooKeeper().getSessionId();
System.out.println(name + " is now the leader. Waiting "
+ waitSeconds + " seconds...");
System.out.println(name + " has been leader "
+ leaderCount.getAndIncrement()
+ " time(s) before.sessionId:"+ sessionId);
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
while(true);
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " relinquishing leadership.\n");
}
}
}
测试代码:
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;
public class LeaderSelectorExample {
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main( String[] args ) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelectClient> examples = Lists.newArrayList();
try {
for ( int i = 0; i < CLIENT_QTY; ++i ) {
CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.01:2181",
new ExponentialBackoffRetry( 1000, 3 )
);
LeaderSelectClient example =
new LeaderSelectClient(
client,
PATH,
"Client #" + i );
clients.add( client );
examples.add( example );
client.start();
example.start();
}
System.out.println( "Press enter/return to quit\n" );
new BufferedReader( new InputStreamReader( System.in ) )
.readLine();
} finally {
System.out.println( "Shutting down..." );
examples.forEach( CloseableUtils::closeQuietly );
clients.forEach( CloseableUtils::closeQuietly );
}
}
}
本地运行结果
sessionId: 72059991158751452
_c_5ba01e37-5e80-4f95-9fdd-a51a6b8e4ad0-lock-0000000239,
_c_71ebbb62-5b64-4808-9fae-b7af6c5a701a-lock-0000000236,
_c_7a7bbfcf-1178-40f2-a736-e13eb6459c90-lock-0000000241,
_c_a6ada0c2-afe6-49cf-8cb3-1d2fb95821dc-lock-0000000235,
_c_ab20582d-e5b8-4d56-b047-ae3ee4bef4c6-lock-0000000240,
_c_cdbf26bd-2b3f-4c6e-9386-a9b258c5f0c9-lock-0000000234,
_c_d79cdca1-24b4-495a-a198-11c9118017c2-lock-0000000243,
_c_f7f4632c-e1bb-4b0b-9aa8-42192e366637-lock-0000000238,
_c_f8e0889c-58bd-4d89-93d3-d397b6648a01-lock-0000000242,
_c_fd8d076d-abea-4138-826d-7fc7f395c108-lock-0000000237
是最小的,所有节点都是顺序节点,都有序号,最小的是0000000234
0x100022e1f8900dc 对应十进制为: 72059991158751452 , 与java程序中的sessionId一致
zonode 存储的是IP