4.11)Zookeeper分布式锁-概念
•在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
•但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
•那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
4.12)Zookeeper 分布式锁-zookeeper分布式锁原理
•核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
1.客户端获取锁时,在lock节点下创建临时顺序节点。
2.然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
3.如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
4.如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点
是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。
4.13)Zookeeper 分布式锁-模拟12306售票案例
Curator实现分布式锁API
-
在Curator中有五种锁方案:
-
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
-
InterProcessMutex:分布式可重入排它锁
-
InterProcessReadWriteLock:分布式读写锁
-
InterProcessMultiLock:将多个锁作为单个实体管理的容器
-
InterProcessSemaphoreV2:共享信号量
-
Ticket12306.java
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import sun.security.util.math.intpoly.IntegerPolynomial;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets = 10; /// 相当于数据库的票数
private InterProcessMutex interProcessMutex; /// 分布式锁
public Ticket12306() {
/// 创建重试策略 休眠时间, 重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).build();
build.start();
/**
* @param client 当前客户端
* @param path 当前路径
*/
interProcessMutex = new InterProcessMutex(build,"/lock");
}
/**
* 模拟
*/
@Override
public void run() {
while (true){
/// 获取锁锁
/**
* @param time 时间
* @param unit 时间单位
*/
try {
interProcessMutex.acquire(3,TimeUnit.SECONDS);
if(tickets > 0) {
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception exception) {
exception.printStackTrace();
} finally {
/// 释放锁
try {
interProcessMutex.release();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
}
}
LockTest.java
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
/// 创建客户端
Thread thread = new Thread(ticket12306,"携程");
Thread thread1 = new Thread(ticket12306,"飞猪");
// Thread[携程,5,main]:10
// Thread[飞猪,5,main]:9
// Thread[携程,5,main]:8
// Thread[飞猪,5,main]:7
// Thread[携程,5,main]:6
// Thread[飞猪,5,main]:5
// Thread[携程,5,main]:4
// Thread[飞猪,5,main]:3
// Thread[携程,5,main]:2
// Thread[飞猪,5,main]:1
thread.start();
thread1.start();
}
}