系列文章目录
文章目录
- 系列文章目录
- 前言
前言
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。
zookeeper安装单机模式
http://www.javacui.com/opensource/445.html
SpringBoot集成Curator实现Zookeeper基本操作
http://www.javacui.com/tool/615.html
SpringBoot集成Curator实现Watch事件监听
http://www.javacui.com/tool/616.html
Zookeeper实现分布式锁的机制
使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。
创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。
如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。
比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。
如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。
比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。
锁分类
InterProcessSemaphoreMutex:分布式不可重入排它锁
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
Shared Lock 分布式非可重入锁
官网地址:http://curator.apache.org/curator-recipes/shared-lock.html
InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。
Shared Reentrant Lockf分布式可重入锁
官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
此锁可以重入,但是重入几次需要释放几次。
InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。
Shared Reentrant Read Write Lock可重入读写锁
官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html
读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。
读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。
Multi Shared Lock 多共享锁
官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html
多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。
Shared Semaphore共享信号量
官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html
一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。
有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。
如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。
acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。
编码测试
package com.example.springboot;
import com.example.springboot.tool.ZkConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* @Auther: Java小强
* @Date: 2022/2/4 - 19:33
* @Decsription: com.example.springboot
* @Version: 1.0
*/
@SpringBootTest(classes = Application.class)
public class CuratorTest {
@Autowired
private ZkConfiguration zk;
// 共享信号量,多个信号量
@Test
public void testInterProcessSemaphoreV22() throws Exception {
CuratorFramework client = zk.curatorFramework();
// 创建一个信号量, Curator 以公平锁的方式进行实现
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
String threadName = Thread.currentThread().getName();
// 获取2个许可
Collection<Lease> acquire = semaphore.acquire(2);
System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(2 * 1000);
semaphore.returnAll(acquire);
System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
String threadName = Thread.currentThread().getName();
// 获取一个许可
Lease lease = semaphore.acquire();
System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(2 * 1000);
semaphore.returnLease(lease);
System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
}
}
// 共享信号量
@Test
public void testInterProcessSemaphoreV2() throws Exception {
CuratorFramework client = zk.curatorFramework();
// 创建一个信号量, Curator 以公平锁的方式进行实现
final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
String threadName = Thread.currentThread().getName();
// 获取一个许可
Lease lease = semaphore.acquire();
System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(2 * 1000);
semaphore.returnLease(lease);
System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
String threadName = Thread.currentThread().getName();
// 获取一个许可
Lease lease = semaphore.acquire();
System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(2 * 1000);
semaphore.returnLease(lease);
System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
}
}
// 多重共享锁
@Test
public void testInterProcessMultiLock() throws Exception {
CuratorFramework client = zk.curatorFramework();
// 可重入锁
final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock");
// 不可重入锁
final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock");
// 创建多重锁对象
final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));
new Thread(new Runnable() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
// 获取参数集合中的所有锁
lock.acquire();
// 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入
System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS));
// interProcessLock1 是可重入锁, 所以可以继续获取锁
System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS));
// interProcessLock2 是不可重入锁, 所以获取锁失败
System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
}
}
// 分布式读写锁
@Test
public void testReadWriteLock() throws Exception {
CuratorFramework client = zk.curatorFramework();
// 创建共享可重入读写锁
final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock");
final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");
new Thread(new Runnable() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
locl1.writeLock().acquire(); // 获取锁对象
System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(1 * 1000);
locl1.readLock().acquire(); // 获取读锁,锁降级
System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(1 * 1000);
locl1.readLock().release();
System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
locl1.writeLock().release();
System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
lock2.writeLock().acquire(); // 获取锁对象
System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(1 * 1000);
lock2.readLock().acquire(); // 获取读锁,锁降级
System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(1 * 1000);
lock2.readLock().release();
System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");
lock2.writeLock().release();
System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
}
}
// 分布式可重入排它锁
@Test
public void testInterProcessMutex() throws Exception {
CuratorFramework client = zk.curatorFramework();
// 分布式可重入排它锁
final InterProcessLock lock = new InterProcessMutex(client, "/lock");
final InterProcessLock lock2 = new InterProcessMutex(client, "/lock");
new Thread(new Runnable() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
lock.acquire(); // 获取锁对象
System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
lock.acquire(); // 测试锁重入
System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(1 * 1000);
lock.release();
System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
lock.release();
System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
lock.acquire(); // 获取锁对象
System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
lock.acquire(); // 测试锁重入
System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(1 * 1000);
lock.release();
System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
lock.release();
System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
}
// 顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁
}
// 分布式不可重入排它锁
@Test
void testInterProcessSemaphoreMutex() throws Exception {
CuratorFramework client = zk.curatorFramework();
// 分布式不可重入排它锁
final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock");
final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock");
new Thread(new Runnable() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
lock.acquire(); // 获取锁对象
System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
// 测试锁重入
Thread.sleep(2 * 1000);
lock.release();
System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
String threadName = Thread.currentThread().getName();
lock2.acquire();
System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(2 * 1000);
lock2.release();
System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
}
// 顺序不一定,但是必须是获取后再释放其他线程才能获取到锁
}
}