文章目录
- Java连接Zookeeper服务端
- 依赖
- 代码使用
- 应用场景
- 统一命名服务
- 统一配置管理
- 统一集群管理
- 服务器节点动态上下线
- 理解
- 实现
- 模拟服务提供者【客户端代码】-注册服务
- 模拟服务消费者【客户端代码】-获取服务信息进行请求消费
- 软负载均衡
- 分布式锁
- 理解
- 实现
- 生产集群安装N台机器合适
- 第三方基于zookeeper的包
- curator
- 依赖
- 代码
Java连接Zookeeper服务端
文档: https://zookeeper.apache.org/doc/r3.9.1/javaExample.html
依赖
依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.dromara.hutool/hutool-all -->
<dependency>
<groupId>org.dromara.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>6.0.0-M11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
</dependencies>
代码使用
Java代码
public class ZkClient {
@Test
@SneakyThrows
public void test1() {
String connectString = "192.168.19.107:2181"; // zookeeper服务端信息
int sessionTimeout = 2000; // 连接最大时长(毫秒)
ZooKeeper zooKeeperClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
Console.log("服务端推送给客户端的监听事件信息 == {}", event);
}
});
// 监听节点数据的变化 === 等价于get -w 命令
try {
String s = zooKeeperClient.create("/test", "testData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Console.log("创建节点成功:{}", s);
} catch (Exception e) {
Console.log("创建节点失败:{}", e.getMessage());
}
// 启动监听增删子节点的变化,然后在前面【Watcher】能收到监听事件 === 等价于ls -w 命令
List<String> children = zooKeeperClient.getChildren("/test", true);
Console.log("Zookeeper服务端当前/test所有的子节点名字:{}", children);
//启动节点的状态信息变化 === 等价于stat -w 命令
Stat statInfo = zooKeeperClient.exists("/test", true);
Console.log("Zookeeper服务端当前/test节点的状态信息:{}" , statInfo);
//程序永远不结束
while (true) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1. 前提背景
2. 开始执行代码
3. 命令增加节点
4. Java客户端监听到的消息
应用场景
统一命名服务
对应用、服务同意命名便于识别,比如一个对外服务接口的多集群,则需要统一的管理同一服务的所有IP
统一配置管理
- 场景:
- 一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群。
- 对配置文件修改后,希望能够快速同步到各个节点上
- 实现:配置信息写入到Zookeeper一个节点中,客户端监听这个节点即可
统一集群管理
- 场景:
- 分布式环境,实时掌握每个节点状态是必要的
- 实现: 节点信息写入ZooKeeper_上的一个ZNode。客户端监听这个ZNode可获取它的实时状态变化
服务器节点动态上下线
理解
特点: 客户端能实时洞察到服务器上下线的变化
实现
前提: 运行代码前自行在Zookeeper客户端创建/service节点【create /service “service”】,因为zookeeper创建子节点前必须有父节点,否则创建子节点失败
模拟服务提供者【客户端代码】-注册服务
public class ServiceProviderZkClient {
private static String connecting = StrUtil.join(StrUtil.COMMA,"192.168.19.107:2181","192.168.19.108:2181","192.168.19.109:2181");
private static Integer timeout = 2000;
@SneakyThrows
public static void main(String[] args) {
Arrays.asList("application1","application2","application3").stream()
.parallel()
.forEach(applicationName -> {
serviceRegister(applicationName);
});
}
@SneakyThrows
public static void serviceRegister(String applicationName) {
ZooKeeper zooKeeper = new ZooKeeper(connecting, timeout, new Watcher() {
public void process(WatchedEvent event) {
Console.log("服务端推送的监听信息:{}", event);
}
});
String zookeeperPath = StrUtil.format("/service/{}", applicationName);
byte[] zookeeperPathData = Convert.toPrimitiveByteArray(StrUtil.format("{}应用的IP地址等信息", applicationName));
String newNodePath = zooKeeper.create(zookeeperPath, zookeeperPathData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
Console.log("【{}】在线" , applicationName);
//程序永远不结束
while (true) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
模拟服务消费者【客户端代码】-获取服务信息进行请求消费
public class ServiceProviderConsumerZkClient {
private static String connecting = StrUtil.join(StrUtil.COMMA,"192.168.19.107:2181","192.168.19.108:2181","192.168.19.109:2181");
private static Integer timeout = 2000;
private static AtomicReference children = new AtomicReference(ListUtil.of());
private static ZooKeeper zooKeeper = null;
@SneakyThrows
public static void main(String[] args) {
zooKeeper = new ZooKeeper(connecting, timeout, new Watcher() {
public void process(WatchedEvent event) {
Console.log("服务端推送的监听信息:{}", event);
//每次收到监听通知消息,同步服务在线状态
getServiceNode();
}
});
//获取在线中的提供提供者
getServiceNode();
while (true) {
String targetServiceName = "application2";
//在线的服务真实路径
String targetServiceNodeName = CollUtil.emptyIfNull((List<String>)children.get())
.stream()
.filter(childrenPath -> StrUtil.contains(childrenPath, targetServiceName))
.findFirst()
.orElse(null);
String targetServiceNamePath = StrUtil.format("/service/{}" , targetServiceNodeName);
boolean targetServiceNameExistFlag = StrUtil.isNotBlank(targetServiceNodeName);
if(targetServiceNameExistFlag) {
//获取服务的配置信息进行服务调用 == 节点里面一般包含当前服务提供者http,端口等等信息
String nodeData = Convert.toStr(zooKeeper.getData(targetServiceNamePath, false, null));
Console.log("【{}】第三方服务上线,调用接口成功", targetServiceName);
}else {
Console.log("【{}】第三方服务未上线,调用接口失败" , targetServiceName);
}
ThreadUtil.sleep(5000);
}
}
@SneakyThrows
public static void getServiceNode() {
children.set(zooKeeper.getChildren("/service", true));
Console.log("系统中的服务提供者节点:{}" , children.get());
}
}
服务提供者进行注册服务时
服务消费者进行消费时
软负载均衡
特点: 在Zookeepert中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
分布式锁
理解
概念: 分布式系统中能保证多个进程有序地进行访问临界资源的锁,拿到锁的进程才可以访问资源,否则一直排队等待锁
实现
public class DistributedLock {
private static String connecting = StrUtil.join(StrUtil.COMMA, "192.168.19.107:2181", "192.168.19.108:2181", "192.168.19.109:2181");
private static Integer timeout = 2000;
private static ZooKeeper zooKeeper;
private static String parentPath = "/DistributedLock";
private static Map<String, CountDownLatch> threadIdToCountDownLatchMap = MapUtil.newSafeConcurrentHashMap();
static {
init();
}
@SneakyThrows
private static void init() {
zooKeeper = connectZooKeeper();
// 创建锁父节点
String realParentPath = zooKeeper.create(parentPath, Convert.toPrimitiveByteArray(parentPath), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 监听父节点下子节点增删的变化
// List<String> sonNodeNames = getParentSortedNodes(true);
}
@SneakyThrows
private static ZooKeeper connectZooKeeper() {
ZooKeeper connectedZooKeeper = new ZooKeeper(connecting, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
Console.log("zookeeper收到服务端监听通知消息:{}", event);
String dealNodePath = event.getPath();
Event.EventType eventType = event.getType();
if (eventType == Event.EventType.NodeDeleted) {
String nodeBelongThreadId = StrUtil.subBefore(StrUtil.subAfter(dealNodePath, "/", true), "_", false);
Console.log("收到删除节点通知,释放线程等待 == {}", nodeBelongThreadId);
// 释放锁
CountDownLatch countDownLatch = threadIdToCountDownLatchMap.get(nodeBelongThreadId);
if (countDownLatch != null) {
countDownLatch.countDown();
threadIdToCountDownLatchMap.remove(nodeBelongThreadId);
}
}
}
});
return connectedZooKeeper;
}
@SneakyThrows
public static List<String> getParentSortedNodes(Boolean watchFlag) {
List<String> sonNodeNames = CollUtil.emptyIfNull(zooKeeper.getChildren(parentPath, true))
.stream()
.sorted(CompareUtil::compare)
.collect(Collectors.toList());
return sonNodeNames;
}
/**
* 获取锁
*/
@SneakyThrows
public static void acquireLock() {
// 当前锁的节点前缀
String nodeNamePrefix = Thread.currentThread().getId() + "_";
// 当前锁的节点完整领前缀
String absolutenodeNamePathPrefix = StrUtil.format("{}/{}", parentPath, nodeNamePrefix);
// 完整的前缀
String realAbsolutenodeNamePath = zooKeeper.create(absolutenodeNamePathPrefix, Convert.toPrimitiveByteArray("absolutenodeNamePathPrefix"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取当前父路径下的所有节点
List<String> sonNodeNames = getParentSortedNodes(false);
if (CollUtil.size(sonNodeNames) == 1) { // 获取锁
Console.log("【子元素为1】获取锁【{}】",realAbsolutenodeNamePath);
return;
} else {
String firstNodeName = CollUtil.getFirst(sonNodeNames);
if (StrUtil.startWith(firstNodeName, nodeNamePrefix)) {
Console.log("【当前子节点在排序后序列为第一个】获取锁【{}】",realAbsolutenodeNamePath);
return;
} else {
// 一直等待直到有机会获取锁
CountDownLatch countDownLatch = new CountDownLatch(1);
// 监听该节点的前一个节点的增删变化
int currentNodeIndex = CollUtil.indexOf(sonNodeNames, nodeName -> StrUtil.endWith(realAbsolutenodeNamePath, nodeName));
String previousNodeName = CollUtil.get(sonNodeNames, currentNodeIndex - 1);
String previousNodePath = StrUtil.format("{}/{}", parentPath, previousNodeName);
zooKeeper.getData(previousNodePath, true, null);
threadIdToCountDownLatchMap.put(StrUtil.subBefore(previousNodeName,"_", false), countDownLatch);
countDownLatch.await();
Console.log("获取锁【{}】", realAbsolutenodeNamePath);
}
}
}
/**
* 释放锁
*/
@SneakyThrows
public static void releaseLock() {
String currentThreadId = Convert.toStr(Thread.currentThread().getId());
// CountDownLatch countDownLatch = threadIdToCountDownLatchMap.get(currentThreadId);
// if (ObjUtil.isNull(countDownLatch)) {
// Console.log("当前线程并没有等待锁的操作");
// return;
// }
// 当前锁的节点前缀
String nodeNamePrefix = currentThreadId + "_";
String realNodeName = getParentSortedNodes(false).stream()
.filter(nodeName -> StrUtil.startWith(nodeName, nodeNamePrefix))
.findFirst()
.orElse(null);
if (StrUtil.isBlank(realNodeName)) {
Console.log("当前线程并未有获取锁的操作");
return;
}
String completeNodePath = StrUtil.format("{}/{}", parentPath, realNodeName);
zooKeeper.delete(completeNodePath, -1);
Console.log("释放锁【{}】", completeNodePath);
}
public static void main(String[] args) {
String s = StrUtil.subAfter("fsd/fdsfsdfds", "/", true);
Console.log(s);
}
}
public class App2Test {
@Test
@SneakyThrows
public void test3() {
Bean publicBean = new Bean();
List<Thread> threadGroup = ListUtil.of();
for (int i = 0; i < 10; i++) {
Thread newThread = new Thread(() -> {
DistributedLock.acquireLock();
// 随机等待
try {
Thread.sleep(RandomUtil.randomInt(1000, 3000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
publicBean.num = ++publicBean.num;
Console.log("【{}】:数+1处理 == {}", Thread.currentThread().getId(), publicBean.num);
DistributedLock.releaseLock();
});
newThread.start();
threadGroup.add(newThread);
}
for (Thread runThread : threadGroup) {
// 等待线程运行完
runThread.join();
}
Console.log("公共数据最终的结果:{}", publicBean.num);
Assert.equals(publicBean.num, 10);
}
}
生产集群安装N台机器合适
特点: 好处提高可靠性、坏处数据同步有延迟
第三方基于zookeeper的包
curator
官网: https://curator.apache.org/docs/about
入门教程: https://curator.apache.org/docs/getting-started/
依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.6.0</version>
</dependency>
代码
发现: 运行代码可见curator的分布式锁的原理跟前面自己实现的逻辑差不多,都是通过增、删子节点,然后监控前一个节点被删释放锁的逻辑原理去做的
public class OtherTest {
@Test
@SneakyThrows
public void test2() {
String connectString = "192.168.19.107:2181,192.168.19.108:2181,192.168.19.109:2181";
RetryOneTime retryOneTime = new RetryOneTime(2000);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, 60 * 1000 * 10, 15 * 1000 * 10, retryOneTime);
curatorFramework.start();
//获取某父目录旗下的亲儿子节点名字信息
List<String> sonNodeNames = curatorFramework.getChildren().forPath("/");
Console.log(sonNodeNames);
// 分布式锁
InterProcessMutex interProcessMutexLock = new InterProcessMutex(curatorFramework, "/CuratorLock");
App2Test.Bean publicBean = new App2Test.Bean();
List<Thread> threadGroup = ListUtil.of();
for (int i = 0; i < 5; i++) {
Thread newThread = new Thread(() -> {
try {
interProcessMutexLock.acquire();
// 随机等待
try {
Thread.sleep(RandomUtil.randomInt(1000, 3000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
publicBean.num = ++publicBean.num;
Console.log("【{}】:数+1处理 == {}", Thread.currentThread().getId(), publicBean.num);
}catch (Exception e) {
}finally {
try {
interProcessMutexLock.release();
}catch (Exception e) {
}
}
});
newThread.start();
threadGroup.add(newThread);
}
for (Thread runThread : threadGroup) {
// 等待线程运行完
runThread.join();
}
Console.log("公共数据最终的结果:{}", publicBean.num);
Assert.equals(publicBean.num, 5);
}
}