目录
Zookeeper 1
第一章 Zookeeper简介... 1
1.1 Zookeeper概述和功能... 1
1.2 Zookeper安装... 1
1.3 Zookeper数据模型... 3
第二章 Zookeeper命令操作... 4
1.1 Zookeeper Client 4
1.2 Zookeeper JavaClient 6
第三章 集群角色... 28
Zookeeper
第一章 Zookeeper简介
1.1 Zookeeper概述和功能
Zookeeper 是 Apache Hadoop 项目下的一个子项目, 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员.简称zk.
Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。其本质上就是提供一种集中式信息存储服务.就是一个将数据存放到内存中,以树形结构存储数据的服务.我们根据其存储数据的特点可以实现分布式统一配置中心,分布式锁等功能,主要用于分布式应用程序的高性能协调,集群高可靠等.
主要功能:
配置管理
分布式锁
3.服务器动态上下线
1.2 Zookeper安装
上传解压
#进入到apps文件夹 使用rz上传 也可以直接拖拽到crt上传 cd /opt/apps #上传后解压 tar -zxvf zookeeper-3.5.6.tar.gz
配置
#1.进入到解压后的zk文件夹下 创建一个文件夹zkData cd /opt/apps/zookeeper-3.5.6 mkdir zkData #2.进入到zk的conf文件夹下 将zoo_sample.cfg改为zoo.cfg cd conf mv zoo_sample.cfg zoo.cfg #3.修改zoo.cfg的配置 vi zoo.cfg #将第12行dataDir的值改为我们刚才创建的文件夹 12+shift+G 直接跳转 dataDir=/opt/apps/zookeeper-3.5.6/zkData #在文件的最后配置server的信息和端口 server.1=linux01:2888:3888 server.2=linux02:2888:3888 server.3=linux03:2888:3888 #4.进入到zkData文件夹下 添加linux01的myid为1 cd ../zkData echo 1 > myid
分发
#进入到linux01的/opt/apps文件夹下 scp -r zookeeper-3.5.6 linux02:$PWD scp -r zookeeper-3.5.6 linux03:$PWD #设置linux02上zk的myid为2 echo 2 > /opt/apps/zookeeper-3.5.6/zkData/myid cat /opt/apps/zookeeper-3.5.6/zkData/myid #设置linux03上zk的myid为3 echo 3 > /opt/apps/zookeeper-3.5.6/zkData/myid cat /opt/apps/zookeeper-3.5.6/zkData/myid
启动
#zk没有一键启动我们需要挨个启动 /opt/apps/zookeeper-3.5.6/bin/zkServer.sh start #zk查看状态 /opt/apps/zookeeper-3.5.6/bin/zkServer.sh status
一键启动脚本
#!/bin/bash for hostname in linux01 linux02 linux03 do echo "当前正在操作的主机$hostname" ssh $hostname "source /etc/profile ; /opt/apps/zookeeper-3.5.6/bin/zkServer.sh $1 ;exit" done
1.3 Zookeper数据模型
ZooKeeper是一个树形目录服务,其数据模型和linux的文件系统目录树很类似,拥有一个层次化结构。
这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息.节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
- PERSISTENT持久化节点 - EPHEMERAL临时节点:-e - PERSISTENT_SEQUENTIAL持久化顺序节点:-s - EPHEMERAL_SEQUENTIAL临时顺序节点: -es
第二章 Zookeeper命令操作
1.1 Zookeeper Client
1.1.1 服务端命令
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh start /opt/apps/zookeeper-3.5.6/bin/zkServer.sh stop /opt/apps/zookeeper-3.5.6/bin/zkServer.sh status /opt/apps/zookeeper-3.5.6/bin/zkServer.sh restart
1.1.2 客户端命令
连接服务端 /opt/apps/zookeeper-3.5.6/bin/zkCli.sh -server linux001:2181 连接本地 /opt/apps/zookeeper-3.5.6/bin/zkCli.sh 查看命令帮助 help 查看 ls 目录 创建节点 create /节点名 描述 create -e /节点名 描述 临时节点 create -s /节点名 描述 顺序节点 create -e -s /节点 描述 临时顺序节点 查看节点 get /节点名 设置节点描述 set /节点名 描述 删除节点 delete /节点名 删除非空节点 deleteAll /节点名
监听
ls -w 路径 -- 监听根目录 所有子节点的变化 ls - w / WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/ get -w 路径 -- 监听 /app3 节点数据的变化 WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/app3
1.2 Zookeeper JavaClient
1.2.1 Cuator介绍
Curator是 ApacheZooKeeper的Java客户端库.
常见的ZooKeeper Java APl :
-
原生JavaAPI
-
zkClient
-
Curator
Curator项目的目标是简化ZooKeeper客户端的使用。
Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
http://curator.apache.org
坐标
<!--curator--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> </dependency>
1.2.2 建立连接
/* CuratorFrameworkFactory 静态方法获取连接对象 static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) String connectString:连接的字符串 zkserver地址:端口 int sessionTimeoutMs:会话超时时间 单位ms 默认 60 * 1000 int connectionTimeoutMs 连接超时时间 单位ms 默认 15 * 1000 RetryPolicy retryPolicy 重试策略 接口 实现类ExponentialBackoffRetry static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) CuratorFramework 方法 void start() 开始连接 */ public class Test { public static void main(String[] args) { String connectString = "linux001:2181"; // //基础等待时间 20 重试次数 10 RetryPolicy r = new ExponentialBackoffRetry(20,10); // CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, r); // curatorFramework.start();; //第二种方式 CuratorFramework client = CuratorFrameworkFactory.builder(). connectString(connectString) .retryPolicy(r).build(); client.start(); } }
1.2.3 创建节点
/* CuratorFramework 创建节点 create().forPath(节点) 创建节点 不指定数据 默认为 ip地址 create().forPath(节点,byte[] data) 创建节点 指定数据 字节数组 create().withMode().forPath(节点,byte[] data) 创建节点 指定节点类型 默认类型 持久化 CreateMode.EPHEMERAL 临时节点 CreateMode.PERSISTENT 持久节点 CreateMode.PERSISTENT_SEQUENTIAL 持久顺序节点 CreateMode.EPHEMERAL_SEQUENTIAL 临时顺序节点 create().creatingParentsIfNeeded().forPath(节点,byte[] data) 创建多级节点 */ public class Test { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); //创建节点 由于指定了命名空间 所以会在 /doit下创建 //没有指定数据 默认数据是当前客户端的ip地址 // client.create().forPath("/app1"); //创建节点 指定数据 //client.create().forPath("/app2","abc".getBytes()); //创建临时节点 指定数据 // client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","aa".getBytes()); //Thread.sleep(10000); //创建多级节点 client.create().creatingParentsIfNeeded().forPath("/app3/p1/p", "abc".getBytes()); client.close(); } }
1.2.4 查看节点
/* 查看节点 * 1. 查询数据:get: getData().forPath() * 2. 查询子节点: ls: getChildren().forPath() * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath() */ public class Test02 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); //查询节点数据 byte[] bytes = client.getData().forPath("/app1"); System.out.println(new String(bytes)); //查询子节点 List<String> list = client.getChildren().forPath("/"); for (String s : list) { System.out.println(s); } //查询节点状态信息 ls -s Stat s = new Stat(); client.getData().storingStatIn(s).forPath("/app1"); System.out.println(s.getCzxid()); client.close(); } }
1.2.5 修改数据
/** * 修改数据 * 1. 基本修改数据:setData().forPath() * 2. 根据版本修改: setData().withVersion().forPath() * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。 */ public class Test03 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); //基本修改数据 // client.setData().forPath("/app1","v1".getBytes()); // byte[] bytes = client.getData().forPath("/app1"); // System.out.println(new String(bytes)); //查询版本 Stat s = new Stat(); client.getData().storingStatIn(s).forPath("/app1"); int version = s.getVersion(); //使用版本进行设置 client.setData().withVersion(version).forPath("/app1"); byte[] bytes = client.getData().forPath("/app1"); System.out.println(new String(bytes)); client.close(); } }
1.2.6 删除节点
/** * 删除节点: delete deleteall * 1. 删除单个节点:delete().forPath("/app1"); * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1"); * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2"); * 4. 回调:inBackground */ public class Test04 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); //删除单个节点 // client.delete().forPath("/app1"); //删除带子节点的节点 // client.delete().deletingChildrenIfNeeded().forPath("/app3"); //必须成功的删除:为了防止网络抖动。本质就是重试。 // client.delete().guaranteed().forPath("/aa"); client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("我被删除了"); System.out.println(event); } }).forPath("/app2"); client.close(); } }
1.2.7 注册监听
监听数据变化
public class Test05 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); //创建 NodeCache对象 NodeCache nodeCache = new NodeCache(client,"/app1"); //2.注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了"); byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据 nodeCache.start(true); // Thread.sleep(1000000); } }
监听子节点变化
public class Test06 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app3",true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("子节点变化了"); System.out.println(pathChildrenCacheEvent); PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ byte[] data = pathChildrenCacheEvent.getData().getData(); System.out.println(new String(data)); } } }); pathChildrenCache.start(); Thread.sleep(1000000); } }
/* TreeCache 相当于 NodeCache与PathChildrenCache组合 监听当前节点 与 子节点 */ public class Test07 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorUtils.getClient(); client.start(); TreeCache treeCache = new TreeCache(client,"/app4"); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println(treeCacheEvent); } }); treeCache.start(); Thread.sleep(10000000); } }
第三章 集群角色
在ZooKeeper集群服中务中有三个角色:
Leader 领导者 :
1. 处理事务请求 2. 集群内部各服务器的调度者
Follower 跟随者 :
-
处理客户端非事务请求,转发事务请求给Leader服务器
-
参与Leader选举投票
Observer 观察者:
-
处理客户端非事务请求,转发事务请求给Leader服务器
选举机制
假设3台linux linux01 启动 投自己一票 广播 linux02 启动 收到linux01 广播 投自己一票 linux01 发现 linux02的id比自己的id大 投linux02一票 linux02 投票超过半数 选为 leader linux03 启动 由于已经有了leader follower
如果leader宕机
看谁的版本号新 谁为leader 如果版本号相同 谁id大 谁是leader