一、Zookeeper概述
1.1 Zookeeper的定义
Zookeeper是一个开源的分布式协调服务,主要用于分布式应用程序中的协调管理。它由Apache软件基金会维护,是Hadoop生态系统中的重要成员。Zookeeper提供了一个高效且可靠的分布式锁服务,以及群集管理功能,在分布式系统中起到了“守护神”的作用。
1.2 Zookeeper的核心理念
Zookeeper基于以下关键概念构建:
-
数据模型:Zookeeper的数据模型是一个层次结构,这个层次类似于一个文件系统,与liunx的文件系统类似,整体可以看作为一棵树。它由节点组成,节点也成为ZNode,每个节点可以有子节点。节点可以存储数据,但数据尺寸有限默认存储为1MB的数据。
-
节点(ZNode):Zookeeper中的每个数据单元称为ZNode。ZNode有两种类型:持久(Persistent)和临时(Ephemeral)。持久节点在客户端断开连接后仍存在,而临时节点在客户端断开连接后会被自动删除。
-
观察者(Watcher):客户端可以在ZNode上设置观察者,当ZNode的数据或子节点发生变化时,Watcher会通知对应的客户端。
-
有序性(Orderliness):Zookeeper通过全局顺序来确保所有操作的顺序一致。
-
数据一致性 :每个server保存一份相同的数据拷贝,客户端无论请求到被集群中哪个server处理,得到的数据都是一致的。
-
集群服务:在Zookeeper集群服务由一个领导者(leader),多个跟随者(follower)组成的集群。领导者(leader)负责进行投票的发起和决议,更新集群服务状态。跟随者用于接收客户请求并向客户端返回结果,在选举Leader过程中参与投票。集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
二、Zookeeper的应用场景
2.1 分布式锁服务
Zookeeper能够非常有效地实现分布式锁。这在需要同步或并发控制的分布式系统中尤为重要。通过利用Zookeeper的临时ZNode特性,可以实现锁的自动释放,防止死锁。
2.2 统一配置管理
在分布式系统中,应用程序的配置管理成为一个复杂的问题。Zookeeper提供了一种集中式管理配置的方式,所有的配置文件可以存储在Zookeeper中,并且可以动态更新。当配置变化时,Zookeeper可以通知到所有客户端,从而使应用程序能够立即响应变化。
2.3 命名服务
Zookeeper可以作为分布式系统的命名服务,通过维护名称和元数据的映射关系,提供高效的名称解析能力。
2.4 集群管理
Zookeeper能够监控集群中各个节点的状态,决定节点是否健康,而节点的加入和离开能够动态调整。
三、Zookeeper的安全管理操作方法
3.1 基本安全措施
-
验证(Authentication):Zookeeper支持基于客户端和服务器之间的认证机制。通过设置用户和密码,可以限制对ZNode的访问。
3.1.1认证方式
- world:默认方式,开放的权限,意解为全世界都能随意访问。
- auth:已经授权且认证通过的用户才可以访问。
- digest:用户名:密码方式认证,实际业务开发中最常用的方式。
- IP白名单:授权指定的Ip地址,和指定的权限点,控制访问。
-
ACL(Access Control Lists):通过设定ACL,能够控制ZNode的读写权限。ACL规则可以根据不同的需求设定,如只读、完全控制等。
3.1.2 ACL授权流程
- 添加认证用户
addauth digest 用户名:密码
- 设置权限
setAcl /path auth:用户名:密码:权限
- 查看Acl设置
getAcl /path
完整的操作如下代码
-- 添加授权用户
[zk: localhost:2181] addauth digest user1:123456
-- 创建节点
[zk: localhost:2181] create /testNode testNode
-- 节点授权
[zk: localhost:2181] setAcl /testNode auth:user1:123456:cdrwa
-- 查看授权
[zk: localhost:2181] getAcl /testNode
3.2 数据加密
在Zookeeper的配置文件中,可以启用数据传输加密(例如SSL/TLS)来保证数据在网络传输中的安全性。
3.3 安全配置示例
# zookeeper configuration
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
digest.authenticationHandler.sasl.clientAllowedProtocols=GSSAPI:CRAM-MD5
四、Zookeeper与Spring Boot 2的整合
4.1 引入依赖
在Spring Boot 2项目中,首先需要引入Curator依赖,这是用于简化Zookeeper操作的一个高层次API。Curator框架在Zookeeper原生API接口上进行二次包装。提供ZooKeeper各种应用场景:比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等API封装。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
4.2 Springboot项目yml配置
在application.properties
中,配置Zookeeper的连接信息:
zoo:
keeper:
#开启标志
enabled: true
#服务器地址
server: 127.0.0.1:2181
#命名空间,被称为ZNode
namespace: testNode
#权限控制,加密
digest: user1:123456
#会话超时时间
sessionTimeoutMs: 3000
#连接超时时间
connectionTimeoutMs: 60000
#最大重试次数
maxRetries: 2
#初始休眠时间
baseSleepTimeMs: 1000
4.3 编写配置类
编写Zookeeper配置类,用于初始化Zookeeper客户端:
@Configuration
public class ZookeeperConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class) ;
//注入Zookeeper配置文件类,用于获取yml的配置项值
@Autowired
private ZookeeperParam zookeeperParam ;
private static CuratorFramework client = null ;
/**
* 初始化
*/
@PostConstruct
public void init (){
//重试策略,初试时间1秒,重试10次
RetryPolicy policy = new ExponentialBackoffRetry(
zookeeperParam.getBaseSleepTimeMs(),
zookeeperParam.getMaxRetries());
//通过工厂创建Curator
client = CuratorFrameworkFactory.builder()
.connectString(zookeeperParam.getServer()) //链接的服务的地址
.authorization("digest",zookeeperParam.getDigest().getBytes()) //认证方式
.connectionTimeoutMs(zookeeperParam.getConnectionTimeoutMs())
.sessionTimeoutMs(zookeeperParam.getSessionTimeoutMs())
.retryPolicy(policy).build();
//开启连接
client.start();
LOGGER.info("zookeeper 初始化完成...");
}
public static CuratorFramework getClient (){
return client ;
}
public static void closeClient (){
if (client != null){
client.close();
}
}
}
4.4 示例代码
示例代码展示如何在Spring Boot 2项目中使用Zookeeper:
Zookeeper接口类
public interface ZookeeperService {
/**
* 判断节点是否存在
*/
boolean isExistNode (final String path) ;
/**
* 创建节点
*/
void createNode (CreateMode mode,String path ) ;
/**
* 设置节点数据
*/
void setNodeData (String path, String nodeData) ;
/**
* 创建节点
*/
void createNodeAndData (CreateMode mode, String path , String nodeData) ;
/**
* 获取节点数据
*/
String getNodeData (String path) ;
/**
* 获取节点下数据
*/
List<String> getNodeChild (String path) ;
/**
* 是否递归删除节点
*/
void deleteNode (String path,Boolean recursive) ;
/**
* 获取读写锁
*/
InterProcessReadWriteLock getReadWriteLock (String path) ;
}
Zookeeper接口实现类IMPL
@Service
public class ZookeeperServiceImpl implements ZookeeperService {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImpl.class);
@Override
public boolean isExistNode(String path) {
CuratorFramework client = ZookeeperConfig.getClient();
client.sync() ;
try {
Stat stat = client.checkExists().forPath(path);
return client.checkExists().forPath(path) != null;
} catch (Exception e) {
LOGGER.error("isExistNode error...", e);
e.printStackTrace();
}
return false;
}
@Override
public void createNode(CreateMode mode, String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 递归创建所需父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
LOGGER.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public void setNodeData(String path, String nodeData) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 设置节点数据
client.setData().forPath(path, nodeData.getBytes("UTF-8"));
} catch (Exception e) {
LOGGER.error("setNodeData error...", e);
e.printStackTrace();
}
}
@Override
public void createNodeAndData(CreateMode mode, String path, String nodeData) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 创建节点,关联数据
client.create().creatingParentsIfNeeded().withMode(mode)
.forPath(path,nodeData.getBytes("UTF-8"));
} catch (Exception e) {
LOGGER.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public String getNodeData(String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 数据读取和转换
byte[] dataByte = client.getData().forPath(path) ;
String data = new String(dataByte,"UTF-8") ;
if (StringUtils.isNotEmpty(data)){
return data ;
}
}catch (Exception e) {
LOGGER.error("getNodeData error...", e);
e.printStackTrace();
}
return null;
}
@Override
public List<String> getNodeChild(String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
List<String> nodeChildDataList = new ArrayList<>();
try {
// 节点下数据集
nodeChildDataList = client.getChildren().forPath(path);
} catch (Exception e) {
LOGGER.error("getNodeChild error...", e);
e.printStackTrace();
}
return nodeChildDataList;
}
@Override
public void deleteNode(String path, Boolean recursive) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
if(recursive) {
// 递归删除节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
// 删除单个节点
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
LOGGER.error("deleteNode error...", e);
e.printStackTrace();
}
}
@Override
public InterProcessReadWriteLock getReadWriteLock(String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
// 写锁互斥、读写互斥
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
return readWriteLock ;
}
}
Zookeeper业务API场景使用
@Api("Zookeeper接口使用实例")
@RestController
public class ZookeeperController {
@Autowired
private ZookeeperService zookeeperService ;
@ApiOperation(value="查询节点数据")
@GetMapping("/getNodeData")
public HttpResult getNodeData (String path) {
return HttpResult.create(HttpStatus.SUCCESS,zookeeperService.getNodeData(path));
}
@ApiOperation(value="判断节点是否存在")
@GetMapping("/isExistNode")
public HttpResult isExistNode (final String path){
return HttpResult.create(HttpStatus.SUCCESS,zookeeperService.isExistNode(path));
}
@ApiOperation(value="创建节点")
@GetMapping("/createNode")
public HttpResult createNode (CreateMode mode, String path ){
zookeeperService.createNode(mode,path) ;
return HttpResult.create(HttpStatus.SUCCESS);
}
@ApiOperation(value="设置节点数据")
@GetMapping("/setNodeData")
public HttpResult setNodeData (String path, String nodeData) {
zookeeperService.setNodeData(path,nodeData) ;
return HttpResult.create(HttpStatus.SUCCESS);
}
@ApiOperation(value="创建并设置节点数据")
@GetMapping("/createNodeAndData")
public HttpResult createNodeAndData (CreateMode mode, String path , String nodeData){
zookeeperService.createNodeAndData(mode,path,nodeData) ;
return HttpResult.create(HttpStatus.SUCCESS);
}
@ApiOperation(value="递归获取节点数据")
@GetMapping("/getNodeChild")
public HttpResult getNodeChild (String path) {
return HttpResult.create(HttpStatus.SUCCESS,zookeeperService.getNodeChild(path));
}
@ApiOperation(value="是否递归删除节点")
@GetMapping("/deleteNode")
public HttpResult deleteNode (String path,Boolean recursive) {
zookeeperService.deleteNode(path,recursive) ;
return HttpResult.create(HttpStatus.SUCCESS);
}
}
接口返回HttpResult统一类
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author Lqzhang
* @date 2020/5/19
*/
@Data
@ApiModel(value = "HttpResult", description = "统一返回数据结构")
public class HttpResult<T> {
@ApiModelProperty(value = "返回状态码")
private Integer code;
@ApiModelProperty(value = "返回信息")
private String msg;
@ApiModelProperty(value = "返回数据")
@JsonInclude(value = JsonInclude.Include.NON_NULL)
private T data;
public static <E> HttpResult<E> create(HttpStatus httpStatus) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(httpStatus.getCode());
httpResult.setMsg(httpStatus.getMessage());
return httpResult;
}
public static <E> HttpResult<E> create(HttpStatus httpStatus, String msg) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(httpStatus.getCode());
httpResult.setMsg(msg);
return httpResult;
}
public static <E> HttpResult<E> create(HttpStatus httpStatus, E data) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(httpStatus.getCode());
httpResult.setMsg(httpStatus.getMessage());
httpResult.setData(data);
return httpResult;
}
public static <E> HttpResult<E> create(HttpStatus httpStatus, String msg, E data) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(httpStatus.getCode());
httpResult.setMsg(msg);
httpResult.setData(data);
return httpResult;
}
public static <E> HttpResult<E> create(Integer code, String msg, E data) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(code);
httpResult.setMsg(msg);
httpResult.setData(data);
return httpResult;
}
public static <E> HttpResult<E> success() {
return success(null);
}
public static <E> HttpResult<E> success(E data) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(200);
httpResult.setMsg("操作成功");
httpResult.setData(data);
return httpResult;
}
public static <E> HttpResult<E> fail() {
return fail(HttpStatus.FAIL.getMessage());
}
public static <E> HttpResult<E> fail(String message) {
HttpResult<E> httpResult = new HttpResult<>();
httpResult.setCode(HttpStatus.FAIL.getCode());
httpResult.setMsg(message);
return httpResult;
}
}
/**
* 请求结果状态枚举常量类
*
* @author Lqzhang
*/
public enum HttpStatus {
SUCCESS(200, "请求成功"),
NO_DATA(201, "没有查询到对应的数据"),
FAIL(203, "请求异常"),
PARAM_ERROR(204, "参数名错误或参数为空,请检查"),
NO_LOGIN(205, "没有授权"),
SAVE_ERROR(206, "操作失败"),
NO_DATA_IN_AUTH(207, "权限范围内没有查询到数据"),
ARREARS(208, "账户可用余额已不足,请充值"),
UNBOUND_PHONE(210, "用户账户未绑定微信手机"),
USER_NOT_EXITS(211, "用户不存在"),
PASSWORD_ERROR(212, "密码错误"),
TOKEN_EXPIRED(403, "当前登录凭证已失效,请重新登录"),
SERVICE_NOT_OPENED(215, "该功能未开通,联系管理员开通使用"),;
/**
* 状态码
*/
private int code;
/**
* 状态信息
*/
private String message;
HttpStatus(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getStatusTypeCode(HttpStatus httpStatus) {
return httpStatus.getCode();
}
public String getStatusTypeMessage(HttpStatus httpStatus) {
return httpStatus.getMessage();
}
public static HttpStatus getStatusTypeByCode(int code) {
HttpStatus httpStatus = null;
for (HttpStatus status : values()) {
if (status.getCode() == code) {
httpStatus = status;
break;
}
}
return httpStatus;
}
public static HttpStatus getStatusTypeByMessage(String message) {
HttpStatus httpStatus = null;
for (HttpStatus status : values()) {
if (status.getMessage().equals(message)) {
httpStatus = status;
break;
}
}
return httpStatus;
}
}
结论
Zookeeper作为分布式系统中的重要组件,提供了多种功能和强大的协调能力。在实际应用中,可以利用Zookeeper实现分布式锁、统一配置管理、命名服务及集群管理等功能。通过与Spring Boot 2的整合,能更好地在应用中利用Zookeeper这些功能,以提升系统的可用性和可靠性。希望通过本文的介绍,您对Zookeeper有更加深入的了解,并能够在实际项目中加以应用。