文章目录
- 什么是锁?
- 场景描述
- 代码示例
- 总结
- Java中单体应用锁的局限性&分布式锁
- 前言
- 互联网系统架构的演进
- 单体应用锁的局限性
- 什么是分布式锁
- 分布式锁的设计思路
- 目前存在的分布式的方案
- Java中锁的解决方案
- 前言
- 乐观锁 与 悲观锁
- 公平锁 与 非公平锁
- 总结
- 分布式锁设计
- 如何使用锁解决电商项目中超卖的问题?
- 什么是超卖?
- 超卖现象一
- 超卖现象一:产生原因
- 超卖现象一:解决方法
- 代码示例
- 初始代码
- 解决超卖现象一
- 超卖现象二&解决思路
- 超卖现象二
- 超卖现象二:产生原因
- 超卖现象二:解决方法一(不使用锁)
- 超卖现象二:解决方法二(使用锁)
- 基于Synchronized锁解决超卖问题(方法锁)
- 解决直接在方法上加Synchronized锁依然存在的问题(手动控制事务)
- 基于Synchronized锁解决超卖问题(块锁)
- 基于ReentrantLock锁解决超卖问题
- 基于数据库悲观锁实现分布式锁
- 单体应用锁的局限性
- 基于数据库悲观锁实现分布式锁的操作步骤
- 数据库客户端会话操作演示
- 基于数据库实现分布式锁的优缺点
- 代码示例
- 测试代码
- mapper及xml层
- 测试方法和结果
- 基于redis的setnx实现分布式锁
- 基于redis的setnx实现分布式锁原理
- value不校验导致的问题原理图解
- 具体编码步骤
- 详细代码
- 锁的封装
- 使用锁演示
- 使用锁解决分布式定时任务
- 测试方法和结果
- 扩展:基于分布式锁解决定时任务重复问题的几种方式
- 基于Redis的Java客户端Redisson实现分布式锁
- Redisson概述
- Redisson特性
- 支持的Redis的配置
- 支持的Java实体
- Java分布式锁与同步器
- 分布式Java集合
- 与Spring框架的整合
- 基于Redisson实现分布式锁具体编码步骤
- 通过JAVA API方式引入Redisson
- Spring项目使用xml引入Redisson
- Spring Boot项目引入Redisson
- 基于Zookeeper的瞬时节点实现分布式锁
- Zookeeper的数据结构
- Zookeeper相关命令
- zookeeper分布式锁原理
- Zookeeper的观察器
- Zookeeper实现分布式锁原理
- zookeeper分布式锁代码实现
- 锁的封装
- 使用锁演示
- 测试方法和结果
- 基于Zookeeper的Curator客户端实现分布式锁
- Curator介绍
- 具体编码步骤
- 详细代码
- pom文件
- 测试代码
- 测试方法和结果
- 分布式锁的对比
- 分布式锁实现方案优缺点分析
- 对于实际工作中的总结
什么是锁?
场景描述
锁在JAVA中是一个非常重要的概念,尤其是在当今的互联网时代,高并发的场景下,更是离不开锁。那么锁到底是什么呢?在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于在有许多执行线程的环境中强制对资源的访问限制。锁旨在强制实施互斥排他、并发控制策略。咱们举一个生活中的例子:大家都去过超市买东西,如果你随身带了包呢,要放到储物柜里。咱们把这个例子再极端一下,假如柜子只有一个,现在同时来了3个人A,B,C,都要往这个柜子里放东西。这个场景就构造了一个多线程,多线程自然离不开锁。如下图所示:
A,B,C都要往柜子里放东西,可是柜子只能放一件东西,那怎么办呢?这个时候呢就引出了锁的概念,3个人中谁抢到了柜子的锁,谁就可以使用这个柜子,其他的人只能等待。比如:C抢到了锁,C可以使用这个柜子。A和B只能等待,等C使用完了,释放锁以后,A和B再争抢锁,谁抢到了,再继续使用柜子。
代码示例
我们再将上面的场景反应到程序中,首先创建一个柜子的类:
public class Cabinet {
// 柜子中存储的数字
private int storeNumber;
public void setStoreNumber(int storeNumber){
this.storeNumber = storeNumber;
}
public int getStoreNumber(){
return this.storeNumber;
}
}
柜子中存储的是数字。
然后我们将3个用户抽象成一个类:
public class User {
// 柜子
private Cabinet cabinet;
// 存储的数字
private int storeNumber;
public User(Cabinet cabinet,int storeNumber){
this.cabinet = cabinet;
this.storeNumber = storeNumber;
}
// 使用柜子
public void useCabinet(){
cabinet.setStoreNumber(storeNumber);
}
}
在用户的构造方法中,需要传入两个参数,一个是要使用的柜子,另一个是要存储的数字。到这里,柜子和用户都已经抽象成了类,接下来我们再写一个启动类,模拟一下3个用户使用柜子的场景:
public class Starter {
public static void main(String[] args){
Cabinet cabinet = new Cabinet();
ExecutorService es = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int storeNumber = i;
es.execute(()->{
User user = new User(cabinet, storeNumber);
user.useCabinet();
System.out.println("我是用户"+storeNumber+",我存储的数字是:"+cabinet.getStoreNumber());
});
}
es.shutdown();
}
}
我们仔细的看一下这个main函数的过程,
- 首先创建一个柜子的实例,由于场景中只有一个柜子,所以我们只创建了一个柜子实例。
- 然后我们新建一个线程池,线程池中有3个线程,每个线程执行一个用户的操作。
- 再来看看每个线程具体的执行过程,新建用户实例,传入的是用户使用的柜子,我们这里只有一个柜子,所以传入这个柜子的实例,然后传入这个用户要存储的数字,分别是1,2,3,也分别对应着用户A,用户B,和用户C。
- 再调用使用柜子的操作,也就是向柜子中放入要存储的数字,然后立刻从柜子中取出数字,并打印出来。
我们运行一下main函数,看看打印的结果是什么?
我是用户0,我存储的数字是:2
我是用户2,我存储的数字是:2
我是用户1,我存储的数字是:2
从结果中我们可以看出,3个用户在柜子中存储的数字都变成了2。我们再次运行程序,结果如下:
我是用户1,我存储的数字是:1
我是用户2,我存储的数字是:1
我是用户0,我存储的数字是:1
这次又变成了1。这是为什么呢?问题就出在user.useCabinet()
这个方法上,这是因为柜子这个实例没有加锁的原因,3个用户并行的执行,向柜子中存储他们的数字,虽然是3个用户并行的同时操作,但是在具体赋值时,也是有顺序的,因为变量storeNumber
只占有一块内存,storeNumber
只存储一个值,存储最后的线程所设置的值。至于哪个线程排在最后,则完全不确定。赋值语句执行完成后,进入到打印语句,打印语句取storeNumber
的值并打印,这时storeNumber
存储的是最后一个线程所设置的值,3个线程取到的值是相同的,就像上面打印的结果一样。
那么如何解决这个问题?这就引出了我们本文的重点内容——锁。我们在赋值语句上加锁,这样当多个线程(本文当中的多个用户)同时赋值时,谁抢到了这把锁,谁才能赋值。这样保证同一时刻只能有一个线程进行赋值操作,避免了之前的混乱的情况。
那么在程序中如何加锁呢?这就要使用JAVA中的一个关键字了——synchronized
。synchronized
分为synchronized
方法和synchronized
同步代码块。下面我们看一下两者的具体用法:
synchronized
方法,顾名思义,是把synchronized
关键字写在方法上,它表示这个方法是加了锁的,当多个线程同时调用这个方法时,只有获得锁的线程才可以执行。我们看一下下面的例子:
public synchronized String getTicket(){
return "xxx";
}
我们可以看到getTicket()
方法加了锁,当多个线程并发执行的时候,只有获得到锁的线程才可以执行,其他的线程只能等待。
- 我们再来看看
synchronized
块,synchronized
块的语法是:
synchronized (对象锁){
……
}
我们将需要加锁的语句都写在synchronized
块内,而在对象锁的位置,需要填写加锁的对象,它的含义是,当多个线程并发执行时,只有获得你写的这个对象的锁,才能执行后面的语句,其他的线程只能等待。synchronized
块通常的写法是synchronized(this)
,这个this
是当前类的实例,也就是说获得当前这个类的对象的锁,才能执行这个方法,这样写的效果和synchronized
方法是一样的。
再回到我们的示例当中,如何解决storeNumber
混乱的问题呢?咱们可以在设置storeNumber
的方法上加上锁,这样保证同时只有一个线程能调用这个方法。如下所示:
public class Cabinet {
//柜子中存储的数字
private int storeNumber;
public synchronized void setStoreNumber(int storeNumber){
this.storeNumber = storeNumber;
}
public int getStoreNumber(){
return this.storeNumber;
}
}
我们在set方法上加了synchronized
关键字,这样在存储数字时,就不会并行的去执行了,而是哪个用户抢到锁,哪个用户执行存储数字的方法。我们再运行一下main函数,看看运行的结果:
我是用户1,我存储的数字是:1
我是用户2,我存储的数字是:1
我是用户0,我存储的数字是:1
咦?! 结果还是混乱的,为什么?我再检查一下代码:
es.execute(()->{
User user = new User(cabinet,storeNumber);
user.useCabinet();
System.out.println("我是用户"+storeNumber+",我存储的数字是:"+cabinet.getStoreNumber());
});
我们可以看到在useCabinet
和打印的方法是两个语句,并没有保持原子性,虽然在set
方法上加了锁,但是在打印时又存在了一个并发,打印语句是有锁的,但是不能确定哪个线程去执行。所以这里,我们要保证useCabinet
和打印的方法的原子性,我们使用synchronized
块,但是synchronized
块里的对象我们使用谁的?这又是一个问题,user
?还是cabinet
? 当然是cabinet
,因为每个线程都初始化了user
,总共有3个user
对象了,而cabinet
对象只有一个,所以synchronized
要用cabinet
对象。如下:
synchronized (cabinet) {
user.useCabinet();
System.out.println("我是用户"+storeNumber+",我存储的数字是:"+cabinet.getStoreNumber());
}
我们再去运行一下:
我是用户1,我存储的数字是:1
我是用户2,我存储的数字是:2
我是用户0,我存储的数字是:0
由于我们加了synchronized
块,保证了存储和取出的原子性,这样用户存储的数字,和取出的数字就对应上了,不会造成混乱。
最后我们通过一张图上面示例的整体情况。
如上图所示,线程A,线程B,线程C同时调用Cabinet
类的setStoreNumber
方法,线程B获得了锁,所以线程B可以执行setStoreNumber
的方法,useCabinet
和打印的方法的原子性,所以线程B获得了锁后执行setStoreNumber
的方法并打印结果,线程A和线程C只能等待。
总结
通过上面的场景与示例,我们可以了解多线程情况下,造成的变量值前后不一致的问题,以及锁的作用。在使用了锁以后,可以避免这种混乱的现象。在下一节中,我们将给大家介绍JAVA中都有哪些关于锁的解决方案。
Java中单体应用锁的局限性&分布式锁
前言
JDK官方提供的锁的解决方案,这些锁只能在一个JVM进程内有效,我们把这种锁叫做单体应用锁。但是,在互联网高速发展的今天,单体应用锁能够满足我们的需求吗?
互联网系统架构的演进
在互联网系统发展之初,系统比较简单,消耗资源小,用户访问量也比较少,我们只部署一个Tomcat应用就可以满足需求。系统架构图如下:
一个Tomcat可以看作是一个JVM进程,当大量的请求并发到达系统时,所有的请求都落在这唯一的一个Tomcat上,如果某些请求方法是需要加锁的,比如:秒杀扣减库存,是可以满足需求的,这和我们前面章节所讲的内容是一样的。但是随着访问量的增加,导致一个Tomcat难以支撑,这时我们就要集群部署Tomcat,使用多个Tomcat共同支撑整个系统。系统架构图如下:
上图中,我们部署了两个Tomcat,共同支撑系统。当一个请求到达系统时,首先会经过Nginx,Nginx主要是做负载转发的,它会根据自己配置的负载均衡策略将请求转发到其中的一个Tomcat中。当大量的请求并发访问时,两个Tomcat共同承担所有的访问量,这时,我们同样在秒杀扣减库存的场景中,使用单体应用锁,还能够满足要求吗?
单体应用锁的局限性
如上图所示,在整个系统架构中,存在两个Tomcat,每个Tomcat是一个JVM。在进行秒杀业务的时候,由于大家都在抢购秒杀商品,大量的请求同时到达系统,通过Nginx分发到两个Tomcat上。我们通过一个极端的案例场景,可以更好地理解单体应用锁的局限性。假如,秒杀商品的数量只有1个,这时,这些大量的请求当中,只有一个请求可以成功的抢到这个商品,这就需要在扣减库存的方法上加锁,扣减库存的动作只能一个一个去执行,而不能同时去执行,如果同时执行,这1个商品可能同时被多个人抢到,从而产生超卖现象。加锁之后,扣减库存的动作一个一个去执行,凡是将库存扣减为负数的,都抛出异常,提示该用户没有抢到商品。通过加锁看似解决了秒杀的问题,但是事实上真的是这样吗?
我们看到系统中存在两个Tomcat,我们加的锁是JDK提供的锁,这种锁只能在一个JVM下起作用,也就是在一个Tomcat内是没有问题的。当存在两个或两个以上的Tomcat时,大量的并发请求分散到不同的Tomcat上,在每一个Tomcat中都可以防止并发的产生,但是在多个Tomcat之间,每个Tomcat中获得锁的这个请求,又产生了并发,从而产生超卖现象。这也就是单体应用锁的局限性,它只能在一个JVM内加锁,而不能从这个应用层面去加锁。
那么这个问题如何解决呢?这就需要使用分布式锁了,在整个应用层面去加锁。什么是分布式锁呢?我们怎么去使用分布式锁呢?
什么是分布式锁
在说分布式锁之前,我们看一看单体应用锁的特点,单体应用锁是在一个JVM进程内有效,无法跨JVM、跨进程。那么分布式锁的定义就出来了,分布式锁就是可以跨越多个JVM、跨越多个进程的锁,这种锁就叫做分布式锁。
分布式锁的设计思路
在上图中,由于Tomcat是由Java启动的,所以每个Tomcat可以看成一个JVM,JVM内部的锁是无法跨越多个进程的。所以,我们要实现分布式锁,我们只能在这些JVM之外去寻找,通过其他的组件来实现分布式锁。系统的架构如图所示:
两个Tomcat通过第三方的组件实现跨JVM、跨进程的分布式锁。这就是分布式锁的解决思路,找到所有JVM可以共同访问的第三方组件,通过第三方组件实现分布式锁。
目前存在的分布式的方案
分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:
- 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
- Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
- Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同。
这3种方式具体的实现方法,会在后面的做详细的介绍。
Java中锁的解决方案
前言
前面介绍了什么是锁,以及锁的使用场景,相信对锁的定义,以及锁的重要性都有了比较清晰的认识。接下来给大家继续做深入的介绍,介绍JAVA为我们提供的不同种类的锁。
JAVA为我们提供了种类丰富的锁,每种锁都有不同的特性,锁的使用场景也各不相同。由于篇幅有限,在这里只给大家介绍比较常用的几种锁。我们会通过锁的定义,核心代码剖析,以及使用场景来给大家介绍JAVA中主流的几种锁。
乐观锁 与 悲观锁
乐观锁与悲观锁应该是每个开发人员最先接触的两种锁。小编最早接触的就是这两种锁,但是不是在JAVA中接触的,而是在数据库当中。当时的应用场景主要是在更新数据的时候,更新数据这个场景也是使用锁的非常主要的场景之一。更新数据的主要流程如下:
- 检索出要更新的数据,供操作人员查看;
- 操作人员更改需要修改的数值;
- 点击保存,更新数据;
这个流程看似简单,但是我们用多线程的思维去考虑,这也应该算是一种互联网思维吧,就会发现其中隐藏着问题。我们具体看一下,
- A检索出数据;
- B检索出数据;
- B修改了数据;
- A修改数据,系统会修改成功吗?
当然啦,A修改成功与否,要看程序怎么写。咱们抛开程序,从常理考虑,A保存数据的时候,系统要给提示,说“您修改的数据已被其他人修改过,请重新查询确认”。那么我们程序中怎么实现呢?
- 在检索数据,将数据的版本号(version)或者最后更新时间一并检索出来;
- 操作员更改数据以后,点击保存,在数据库执行update操作;
- 执行update操作时,用步骤1检索出的版本号或者最后更新时间与数据库中的记录作比较;
- 如果版本号或最后更新时间一致,则可以更新;
- 如果不一致,就要给出上面的提示;
上述的流程就是乐观锁的实现方式。在JAVA中乐观锁并没有确定的方法,或者关键字,它只是一个处理的流程、策略。咱们看懂上面的例子之后,再来看看JAVA中乐观锁。
乐观锁呢,它是假设一个线程在取数据的时候不会被其他线程更改数据,就像上面的例子那样,但是在更新数据的时候会校验数据有没有被修改过。它是一种比较交换的机制,简称CAS(Compare And Swap)机制。一旦检测到有冲突产生,也就是上面说到的版本号或者最后更新时间不一致,它就会进行重试,直到没有冲突为止。
乐观锁的机制如图所示:
咱们看一下JAVA中最常用的i++
,咱们思考一个问题,i++
它的执行顺序是什么样子的?它是线程安全的吗?当多个线程并发执行i++
的时候,会不会有问题?接下来咱们通过程序看一下:
public class Test {
private int i=0;
public static void main(String[] args) {
Test test = new Test();
// 线程池:50个线程
ExecutorService es = Executors.newFixedThreadPool(50);
// 闭锁
CountDownLatch cdl = new CountDownLatch(5000);
for (int i = 0; i < 5000; i++){
es.execute(()->{
test.i++;
cdl.countDown();
});
}
es.shutdown();
try {
// 等待5000个任务执行完成后,打印出执行结果
cdl.await();
System.out.println("执行完成后,i="+test.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上面的程序中,我们模拟了50个线程同时执行i++
,总共执行5000次,按照常规的理解,得到的结果应该是5000,我们运行一下程序,看看执行的结果如何?
执行完成后,i=4975
执行完成后,i=4986
执行完成后,i=4971
这是我们运行3次以后得到的结果,可以看到每次执行的结果都不一样,而且不是5000,这是为什么呢?这就说明i++
并不是一个原子性的操作,在多线程的情况下并不安全。我们把i++
的详细执行步骤拆解一下:
- 从内存中取出i的当前值;
- 将i的值加1;
- 将计算好的值放入到内存当中;
这个流程和我们上面讲解的数据库的操作流程是一样的。在多线程的场景下,我们可以想象一下,线程A和线程B同时从内存取出i的值,假如i的值是1000,然后线程A和线程B再同时执行+1的操作,然后把值再放入内存当中,这时,内存中的值是1001,而我们期望的是1002,正是这个原因导致了上面的错误。那么我们如何解决呢?在JAVA1.5以后,JDK官方提供了大量的原子类,这些类的内部都是基于CAS机制的,也就是使用了乐观锁。我们将上面的程序稍微改造一下,如下:
public class Test {
private AtomicInteger i = new AtomicInteger(0);
public static void main(String[] args) {
Test test = new Test();
ExecutorService es = Executors.newFixedThreadPool(50);
CountDownLatch cdl = new CountDownLatch(5000);
for (int i = 0; i < 5000; i++){
es.execute(()->{
test.i.incrementAndGet();
cdl.countDown();
});
}
es.shutdown();
try {
cdl.await();
System.out.println("执行完成后,i="+test.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我们将变量i的类型改为AtomicInteger
,AtomicInteger
是一个原子类。我们在之前调用i++
的地方改成了i.incrementAndGet()
,incrementAndGet()
方法采用了CAS机制,也就是说使用了乐观锁。我们再运行一下程序,看看结果如何。
执行完成后,i=5000
执行完成后,i=5000
执行完成后,i=5000
我们同样执行了3次,3次的结果都是5000,符合了我们预期。这个就是乐观锁。我们对乐观锁稍加总结,乐观锁在读取数据的时候不做任何限制,而是在更新数据的时候,进行数据的比较,保证数据的版本一致时再更新数据。根据它的这个特点,可以看出乐观锁适用于读操作多,而写操作少的场景。
悲观锁与乐观锁恰恰相反,悲观锁从读取数据的时候就显示的加锁,直到数据更新完成,释放锁为止。在这期间只能有一个线程去操作,其他的线程只能等待。在JAVA中,悲观锁可以使用synchronized
关键字或者ReentrantLock
类来实现。还是上面的例子,我们分别使用这两种方式来实现一下。首先是使用synchronized
关键字来实现:
public class Test {
private int i=0;
public static void main(String[] args) {
Test test = new Test();
ExecutorService es = Executors.newFixedThreadPool(50);
CountDownLatch cdl = new CountDownLatch(5000);
for (int i = 0;i < 5000; i++){
es.execute(()->{
//修改部分 开始
synchronized (test){
test.i++;
}
//修改部分 结束
cdl.countDown();
});
}
es.shutdown();
try {
cdl.await();
System.out.println("执行完成后,i="+test.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我们唯一的改动就是增加了synchronized
块,它锁住的对象是test
,在所有线程中,谁获得了test
对象的锁,谁才能执行i++
操作。我们使用了synchronized
悲观锁的方式,使得i++
线程安全。我们运行一下,看看结果如何。
执行完成后,i=5000
执行完成后,i=5000
执行完成后,i=5000
我们运行3次,结果都是5000,符合预期。接下来,我们再使用ReentrantLock
类来实现悲观锁。代码如下:
public class Test {
// 添加了ReentrantLock锁
Lock lock = new ReentrantLock();
private int i=0;
public static void main(String[] args) {
Test test = new Test();
ExecutorService es = Executors.newFixedThreadPool(50);
CountDownLatch cdl = new CountDownLatch(5000);
for (int i = 0; i < 5000; i++){
es.execute(()->{
//修改部分 开始
test.lock.lock();
test.i++;
test.lock.unlock();
//修改部分 结束
cdl.countDown();
});
}
es.shutdown();
try {
cdl.await();
System.out.println("执行完成后,i="+test.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我们在类中显示的增加了Lock lock = new ReentrantLock();
,而且在i++
之前增加了lock.lock()
,加锁操作,在i++
之后增加了lock.unlock()
释放锁的操作。我们同样运行3次,看看结果。
执行完成后,i=5000
执行完成后,i=5000
执行完成后,i=5000
3次运行结果都是5000,完全符合预期。我们再来总结一下悲观锁,悲观锁从读取数据的时候就加了锁,而且在更新数据的时候,保证只有一个线程在执行更新操作,没有像乐观锁那样进行数据版本的比较。所以悲观锁适用于读相对少,写相对多的操作。
公平锁 与 非公平锁
前面我们介绍了乐观锁与悲观锁,这一小节我们将从另外一个维度去讲解锁——公平锁与非公平锁。从名字不难看出,公平锁在多线程情况下,对待每一个线程都是公平的;而非公平锁恰好与之相反。从字面上理解还是有些晦涩难懂,我们还是举例说明,场景还是去超市买东西,在储物柜存储东西的例子。储物柜只有一个,同时来了3个人使用储物柜,这时A先抢到了柜子,A去使用,B和C自觉进行排队。A使用完以后,后面排队中的第一个人将继续使用柜子,这就是公平锁。在公平锁当中,所有的线程都自觉排队,一个线程执行完以后,排在后面的线程继续使用。
非公平锁则不然,A在使用柜子的时候,B和C并不会排队,A使用完以后,将柜子的钥匙往后一抛,B和C谁抢到了谁用,甚至可能突然跑来一个D,这个D抢到了钥匙,那么D将使用柜子,这个就是非公平锁。
公平锁如图所示:
多个线程同时执行方法,线程A抢到了锁,A可以执行方法。其他线程则在队列里进行排队,A执行完方法后,会从队列里取出下一个线程B,再去执行方法。以此类推,对于每一个线程来说都是公平的,不会存在后加入的线程先执行的情况。
非公平锁入下图所示:
多个线程同时执行方法,线程A抢到了锁,A可以执行方法。其他的线程并没有排队,A执行完方法,释放锁后,其他的线程谁抢到了锁,谁去执行方法。会存在后加入的线程,反而先抢到锁的情况。
公平锁与非公平锁都在ReentrantLock
类里给出了实现,我们看一下ReentrantLock
的源码。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock
有两个构造方法,默认的构造方法中,sync = new NonfairSync();
我们可以从字面意思看出它是一个非公平锁。再看看第二个构造方法,它需要传入一个参数,参数是一个布尔型,true
是公平锁,false
是非公平锁。从上面的源码我们可以看出sync
有两个实现类,分别是FairSync
和NonfairSync
,我们再看看获取锁的核心方法,首先是公平锁FairSync
的,
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
然后是非公平锁NonfairSync
的,
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
通过对比两个方法,我们可以看出唯一的不同之处在于!hasQueuedPredecessors()
这个方法,很明显这个方法是一个队列,由此可以推断,公平锁是将所有的线程放在一个队列中,一个线程执行完成后,从队列中取出下一个线程,而非公平锁则没有这个队列。这些都是公平锁与非公平锁底层的实现原理,我们在使用的时候不用追到这么深层次的代码,只需要了解公平锁与非公平锁的含义,并且在调用构造方法时,传入true
和false
即可。
总结
JAVA中锁的种类非常多,在这一节中,我们找了非常典型的几个锁的类型给大家做了介绍。乐观锁与悲观锁是最基础的,也是大家必须掌握的。大家在工作中不可避免的都要使用到乐观锁和悲观锁。从公平锁与非公平锁这个维度上看,大家平时使用的都是非公平锁,这也是默认的锁的类型。如果要使用公平锁,可以在秒杀的场景下使用,在秒杀的场景下,是遵循先到先得的原则,是需要排队的,所以这种场景下是最适合使用公平锁的。
接下来,讲解如何使用锁解决电商中的超卖现象。
分布式锁设计
如何使用锁解决电商项目中超卖的问题?
什么是超卖?
举例:某件商品库存数量10件,结果卖出了15件。商品卖出数量超过了库存数量。超卖导致商家没有商品发货、发货时间延长、买卖双方易发生纠纷。
超卖现象一
系统中库存为1,但是产生了两笔订单。卖家在商品发货时,发现只有1件商品,但是有2笔订单。
假如商品只剩下最后1件,A和B同时看到这个商品,加入购物车,并同时提交订单,此时就产生超卖的现象。
超卖现象一:产生原因
- 扣减库存的动作,在程序中进行,在程序中计算剩余库存
- 并发场景下,导致库存计算错误
超卖现象一:解决方法
- 扣减库存不在程序中进行,而是通过数据库
- 向数据库传递库存增量,扣减1个库存,增量为-1
- 在数据库update语句计算库存,通过update行锁解决并发
代码示例
初始代码
@Resource
private OrderMapper orderMapper;
@Resource
private OrderItemMapper orderItemMapper;
@Resource
private ProductMapper productMapper;
// 购买商品id
private int purchaseProductId = 100100;
// 购买商品数量
private int purchaseProductNum = 1;
/**
* 初始代码
* @return
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class)
public Integer createOrder1() throws Exception {
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
// 商品当前库存
Integer currentCount = product.getCount();
// 校验库存
if (purchaseProductNum > currentCount) {
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
// 计算剩余库存
Integer leftCount = currentCount - purchaseProductNum;
// 更新库存
product.setCount(leftCount);
product.setUpdateTime(new Date());
product.setUpdateUser("xxx");
productMapper.updateByPrimaryKeySelective(product);
Order order = new Order();
order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
order.setOrderStatus(1);//待处理
order.setReceiverName("xxx");
order.setReceiverMobile("13311112222");
order.setCreateTime(new Date());
order.setCreateUser("xxx");
order.setUpdateTime(new Date());
order.setUpdateUser("xxx");
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(product.getId());
orderItem.setPurchasePrice(product.getPrice());
orderItem.setPurchaseNum(purchaseProductNum);
orderItem.setCreateUser("xxx");
orderItem.setCreateTime(new Date());
orderItem.setUpdateTime(new Date());
orderItem.setUpdateUser("xxx");
orderItemMapper.insertSelective(orderItem);
return order.getId();
}
解决超卖现象一
@Resource
private OrderMapper orderMapper;
@Resource
private OrderItemMapper orderItemMapper;
@Resource
private ProductMapper productMapper;
// 购买商品id
private int purchaseProductId = 100100;
// 购买商品数量
private int purchaseProductNum = 1;
/**
* 解决超卖现象一
* @return
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class)
public Integer createOrder2() throws Exception {
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
// 商品当前库存
Integer currentCount = product.getCount();
// 校验库存
if (purchaseProductNum > currentCount) {
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
// // 计算剩余库存
// Integer leftCount = currentCount - purchaseProductNum;
// // 更新库存
// product.setCount(leftCount);
// product.setUpdateTime(new Date());
// product.setUpdateUser("xxx");
// productMapper.updateByPrimaryKeySelective(product);
// 超卖现象一:解决方法
// updateProductCount方法的SQL:update product set count = count - #{purchaseProductNum,jdbcType=INTEGER},update_user = #{updateUser,jdbcType=VARCHAR},update_time = #{updateTime,jdbcType=TIME} where id = #{id,jdbcType=INTEGER}
productMapper.updateProductCount(purchaseProductNum, "xxx", new Date(), product.getId());
Order order = new Order();
order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
order.setOrderStatus(1);//待处理
order.setReceiverName("xxx");
order.setReceiverMobile("13311112222");
order.setCreateTime(new Date());
order.setCreateUser("xxx");
order.setUpdateTime(new Date());
order.setUpdateUser("xxx");
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(product.getId());
orderItem.setPurchasePrice(product.getPrice());
orderItem.setPurchaseNum(purchaseProductNum);
orderItem.setCreateUser("xxx");
orderItem.setCreateTime(new Date());
orderItem.setUpdateTime(new Date());
orderItem.setUpdateUser("xxx");
orderItemMapper.insertSelective(orderItem);
return order.getId();
}
ps:数据库update语句到底是行锁还是表锁?无索引的情况下,如果不走主键,那么update为表锁;有索引的情况下,走索引或者走主键(效果一样),那么update变为行锁。
超卖现象二&解决思路
超卖现象二
系统中库存变为-1。卖家不知所措,询问平台客服。
超卖现象二:产生原因
并发检验库存,会造成多个线程都认为库存充足的现象。之后,多个线程执行到update更新库存操作,由于update存在行锁,只能顺序的更新库存,一个线程一个线程地执行导致库存为负数。
超卖现象二:解决方法一(不使用锁)
在每次更新库存后,再次校验库存是否为负数,如果是负数,抛出异常,方法回滚,更新库存失败。
超卖现象二:解决方法二(使用锁)
校验库存、扣减库存统一加锁,使之成为原子性的操作。并发时,只有获得锁的线程才能执行该操作。扣减库存结束后,释放锁。下一个线程获得锁校验库存,库存不够就不会通过,从而确保库存不会扣成负数。
具体实现可以通过以下两种锁:
- 基于synchronized锁解决超卖问题(最原始的锁)
- 基于ReentrantLock锁(可重入锁)解决超卖问题(并发包中的锁)
基于Synchronized锁解决超卖问题(方法锁)
@Resource
private OrderMapper orderMapper;
@Resource
private OrderItemMapper orderItemMapper;
@Resource
private ProductMapper productMapper;
// 购买商品id
private int purchaseProductId = 100100;
// 购买商品数量
private int purchaseProductNum = 1;
/**
* 解决超卖现象二:基于synchronized锁解决超卖问题(最原始的锁)
* 依然存在的问题:第一个线程执行完,事务还没有提交,第二个线程就开始执行,此时商品库存还是1,从而第一个和第二个线程都执行了减少库存和创建订单操作,导致库存为-1
* 依然存在的问题的原因:@Transactional是通过aop的方式执行的,由于spring的aop,会在createOrder方法之前开启事务,之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁住的代码块执行是在事务之内执行的,可以推断在代码块执行完时,事务还未提交,锁已经被释放,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的
* @return
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class)
public synchronized Integer createOrder3() throws Exception {
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
// 商品当前库存
Integer currentCount = product.getCount();
System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
// 校验库存
if (purchaseProductNum > currentCount) {
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
// 超卖现象一:解决方法
productMapper.updateProductCount(purchaseProductNum, "xxx", new Date(), product.getId());
Order order = new Order();
order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
order.setOrderStatus(1);//待处理
order.setReceiverName("xxx");
order.setReceiverMobile("13311112222");
order.setCreateTime(new Date());
order.setCreateUser("xxx");
order.setUpdateTime(new Date());
order.setUpdateUser("xxx");
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(product.getId());
orderItem.setPurchasePrice(product.getPrice());
orderItem.setPurchaseNum(purchaseProductNum);
orderItem.setCreateUser("xxx");
orderItem.setCreateTime(new Date());
orderItem.setUpdateTime(new Date());
orderItem.setUpdateUser("xxx");
orderItemMapper.insertSelective(orderItem);
return order.getId();
}
解决直接在方法上加Synchronized锁依然存在的问题(手动控制事务)
@Resource
private OrderMapper orderMapper;
@Resource
private OrderItemMapper orderItemMapper;
@Resource
private ProductMapper productMapper;
// 购买商品id
private int purchaseProductId = 100100;
// 购买商品数量
private int purchaseProductNum = 1;
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
* 解决上述方法依然存在的问题,手动控制事务
* @return
* @throws Exception
*/
// @Transactional(rollbackFor = Exception.class)
public synchronized Integer createOrder4() throws Exception {
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
// 事务回滚
platformTransactionManager.rollback(transaction);
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
// 商品当前库存
Integer currentCount = product.getCount();
System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
// 校验库存
if (purchaseProductNum > currentCount) {
// 事务回滚
platformTransactionManager.rollback(transaction);
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
// 超卖现象一:解决方法
productMapper.updateProductCount(purchaseProductNum, "xxx", new Date(), product.getId());
Order order = new Order();
order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
order.setOrderStatus(1);// 待处理
order.setReceiverName("xxx");
order.setReceiverMobile("13311112222");
order.setCreateTime(new Date());
order.setCreateUser("xxx");
order.setUpdateTime(new Date());
order.setUpdateUser("xxx");
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(product.getId());
orderItem.setPurchasePrice(product.getPrice());
orderItem.setPurchaseNum(purchaseProductNum);
orderItem.setCreateUser("xxx");
orderItem.setCreateTime(new Date());
orderItem.setUpdateTime(new Date());
orderItem.setUpdateUser("xxx");
orderItemMapper.insertSelective(orderItem);
// 事务提交
platformTransactionManager.commit(transaction);
return order.getId();
}
基于Synchronized锁解决超卖问题(块锁)
@Resource
private OrderMapper orderMapper;
@Resource
private OrderItemMapper orderItemMapper;
@Resource
private ProductMapper productMapper;
// 购买商品id
private int purchaseProductId = 100100;
// 购买商品数量
private int purchaseProductNum = 1;
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
private Object object = new Object();
/**
* 解决超卖现象二:基于Synchronized锁解决超卖问题(块锁)
* @return
* @throws Exception
*/
// @Transactional(rollbackFor = Exception.class)
public synchronized Integer createOrder5() throws Exception {
// // 块锁写法一:五个线程中谁最先抢到OrderService(单例)这个实例对象锁才能执行块锁里面的内容
// synchronized (this) {
//
// }
//
// // 块锁写法二:新建一个object对象,五个线程中谁最先抢到object对象锁才能执行块锁里面的内容,与块锁写法一同理
// synchronized (object) {
//
// }
//
// // 块锁写法三:类锁,五个线程中谁最先抢到OrderService.class类锁才能执行块锁里面的内容。
// // 上述两种对象锁的写法都存在这样的问题,有可能OrderService不是单例的,此时可能存在多个OrderService实例对象,锁就存在多个,无法保证锁的唯一性
// // 但是类锁就不存在这种问题,因为类只能有一个,只能有一个线程获取到
// synchronized (OrderService.class) {
//
// }
Product product = null;
synchronized (this) {
TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
// 事务回滚
platformTransactionManager.rollback(transaction1);
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
// 商品当前库存
Integer currentCount = product.getCount();
System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
// 校验库存
if (purchaseProductNum > currentCount) {
// 事务回滚
platformTransactionManager.rollback(transaction1);
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
// 超卖现象一:解决方法
productMapper.updateProductCount(purchaseProductNum, "xxx", new Date(), product.getId());
// 事务提交
platformTransactionManager.commit(transaction1);
}
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
Order order = new Order();
order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
order.setOrderStatus(1);//待处理
order.setReceiverName("xxx");
order.setReceiverMobile("13311112222");
order.setCreateTime(new Date());
order.setCreateUser("xxx");
order.setUpdateTime(new Date());
order.setUpdateUser("xxx");
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(product.getId());
orderItem.setPurchasePrice(product.getPrice());
orderItem.setPurchaseNum(purchaseProductNum);
orderItem.setCreateUser("xxx");
orderItem.setCreateTime(new Date());
orderItem.setUpdateTime(new Date());
orderItem.setUpdateUser("xxx");
orderItemMapper.insertSelective(orderItem);
// 事务提交
platformTransactionManager.commit(transaction);
return order.getId();
}
基于ReentrantLock锁解决超卖问题
/**
* 解决超卖现象二:基于ReentrantLock锁(可重入锁)解决超卖问题(并发包中的锁)
* @return
* @throws Exception
*/
// @Transactional(rollbackFor = Exception.class)
public Integer createOrder6() throws Exception {
Product product = null;
lock.lock();
try {
TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
// 事务回滚
platformTransactionManager.rollback(transaction1);
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
// 商品当前库存
Integer currentCount = product.getCount();
System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
// 校验库存
if (purchaseProductNum > currentCount) {
// 事务回滚
platformTransactionManager.rollback(transaction1);
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
// 超卖现象一:解决方法
productMapper.updateProductCount(purchaseProductNum, "xxx", new Date(), product.getId());
// 事务提交
platformTransactionManager.commit(transaction1);
} finally {
lock.unlock();
}
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
Order order = new Order();
order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
order.setOrderStatus(1);//待处理
order.setReceiverName("xxx");
order.setReceiverMobile("13311112222");
order.setCreateTime(new Date());
order.setCreateUser("xxx");
order.setUpdateTime(new Date());
order.setUpdateUser("xxx");
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(product.getId());
orderItem.setPurchasePrice(product.getPrice());
orderItem.setPurchaseNum(purchaseProductNum);
orderItem.setCreateUser("xxx");
orderItem.setCreateTime(new Date());
orderItem.setUpdateTime(new Date());
orderItem.setUpdateUser("xxx");
orderItemMapper.insertSelective(orderItem);
// 事务提交
platformTransactionManager.commit(transaction);
return order.getId();
}
基于数据库悲观锁实现分布式锁
单体应用锁的局限性
单体应用锁的局限性:不能跨jvm,为解决这一问题出现了分布式锁。
基于数据库悲观锁实现分布式锁的操作步骤
- 多个进程、多个线程访问共同组件数据库
- 通过select…for update访问同一条数据
- for update,其他线程不能修改锁定的数据,不能给锁定的数据加锁,只能等待
数据库客户端会话操作演示
-- 查看本次查询会话数据库自动提交状态,1代表自动提交,0代表不自动提交
select @@autocommit;
-- 需要将本次查询会话数据库自动提交状态设置为0,执行一条select......for update语句锁定数据后,事务自动提交,锁也就释放了,其他会话还可以给该条数据加锁
set @@autocommit=0;
-- 将business_code='demo'这条数据检索出来并加锁,其他会话就无法给这个数据加锁或者修改了。因为本会话将数据加锁在事务没有提交之前时无法给这个数据加锁或者修改了
select * from distribute_lock where business_code = 'demo' for update;
-- 提交该会话事务
commit;
基于数据库实现分布式锁的优缺点
- 优点:简单方便、易于理解、易于操作
- 缺点:并发量大时,对数据库压力较大
- 建议:作为锁的数据库和业务数据库分开
代码示例
测试代码
public static final Logger log = LoggerFactory.getLogger(DemoController.class);
@Resource
private DistributeLockMapper distributeLockMapper;
@RequestMapping("distributeLockByDB")
@Transactional(rollbackFor = Exception.class) // spring默认的回滚异常是RuntimeException,这样想自定义的异常throw new Exception("分布式锁找不到");是不会回滚的
public String distributeLockByDB() throws Exception {
log.info("我进入了方法!");
DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo");
if (distributeLock == null)
throw new Exception("分布式锁找不到");
log.info("我进入了锁!");
try {
// 模拟业务代码执行耗时
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "我已经执行完成!";
}
mapper及xml层
DistributeLock selectDistributeLock(@Param("businessCode") String businessCode);
<select id="selectDistributeLock" resultType="com.example.distributelock.model.DistributeLock">
select * from distribute_lock
where business_code = #{businessCode,jdbcType=VARCHAR}
for update
</select>
测试方法和结果
可以通过启动两个服务,同时调用该接口测试,会发现只有一个服务能获取到锁执行业务代码,另一个服务只能等待锁释放后拿到锁才能执行业务代码。
基于redis的setnx实现分布式锁
基于redis的setnx实现分布式锁原理
-
获取锁的Redis命令
-
SET resource_name my_random_value NX PX 30000
-
resource_name:对应redis的key,资源名称,可根据不同的业务区分不同的锁
-
my_random_value:对应redis的value,随机值,要确保每个线程的随机值都不同,最好使用uuid,用于释放锁时的校验
-
NX:key不存在时设置成功,key存在则设置失败
- 基于redis的setnx实现分布式锁主要用的就是NX上述特性,setnx是一个原子性的操作。而redis是单线程的,当多个线程同时并发的给key设置值时,只有一个线程会设置成功,并发请求到redis里都变为顺序的,即并行变串行,只有第一个线程会设置成功,设置成功的线程即获取到锁,执行后续操作,设置不成功的即抛出异常
-
PX:自动失效时间,出现异常情况,锁可以过期失效,即自动释放
-
设置成功的线程获取到锁,执行后续操作完成后,会释放锁,这样其他线程才能获取锁。如果没有设置自动失效时间,执行后续操作或者释放锁的操作出现异常,redis里面这个key就永远存在,后续线程就永远设置不成功,即获取不到锁
-
释放锁采用redis的delete命令
-
释放锁时校验之前设置的随机数,相同才能释放。证明redis的key对应的value是该线程设置的,value是程序生成的随机数,删除时需要校验之前生成的随机数和redis的key对应的value是否相同,如果相同才能保证这把锁是该线程设置的,才可以释放,确保不会释放别的线程的锁
-
释放锁的LUA脚本(官网有介绍),因为delete命令没有提供value校验功能
-
if redis.call(“get”,KEYS[1])==ARGV[1] then
return redis.call(“del”,KEYS[1])
else
return 0
end
-
-
value不校验导致的问题原理图解
具体编码步骤
- 根据上述原理,编写redis分布式锁
- 定时任务集群部署,任务重复执行?
- 利用分布式锁解决重复执行的问题
详细代码
锁的封装
package com.example.distributelock.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
// AutoCloseable是jdk1.7版本推出的自动关闭的接口,实现这个接口重写close()方法在close()方法中关闭锁即可直接在try()的()中获取锁并释放锁不用在finally中手动释放锁,
// 解决了可能由于个人原因忘记写finally而忘记释放锁的问题
public class RedisLock implements AutoCloseable {
public static final Logger log = LoggerFactory.getLogger(RedisLock.class);
private final RedisTemplate redisTemplate;
private final String key;
private final String value;
// 单位:秒
private final int expireTime;
public RedisLock(RedisTemplate redisTemplate, String key, int expireTime){
this.redisTemplate = redisTemplate;
this.key = key;
this.expireTime = expireTime;
this.value = UUID.randomUUID().toString();
}
/**
* 获取分布式锁
* @return
*/
public boolean getLock() {
RedisCallback<Boolean> redisCallback = connection -> {
// 设置NX
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
// 设置过期时间
Expiration expiration = Expiration.seconds(expireTime);
// 序列化key
byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
// 序列化value
byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
// 执行setnx操作
Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
return result;
};
// 获取分布式锁
Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
return Boolean.TRUE.equals(lock);
}
public void unLock() {
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
List<String> keys = Collections.singletonList(key);
Boolean result = (Boolean) redisTemplate.execute(redisScript, keys, value);
log.info("释放锁的结果:" + result);
}
@Override
public void close() {
unLock();
}
}
使用锁演示
package com.example.distributelock.controller;
import com.example.distributelock.lock.RedisLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RedisLockController {
public static final Logger log = LoggerFactory.getLogger(RedisLockController.class);
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("redisLock")
public String redisLock(){
log.info("我进入了方法!");
try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
if (redisLock.getLock()) {
log.info("我进入了锁!!");
Thread.sleep(15000);
}
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法执行完成");
return "方法执行完成";
}
}
使用锁解决分布式定时任务
package com.example.distributelock.service;
import com.example.distributelock.lock.RedisLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SchedulerService {
public static final Logger log = LoggerFactory.getLogger(SchedulerService.class);
@Autowired
private RedisTemplate redisTemplate;
@Scheduled(cron = "0/5 * * * * ?")
public void sendSms(){
try (RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) {
if (redisLock.getLock()) {
log.info("向138xxxxxxxx发送短信!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试方法和结果
可以通过启动两个服务,同时调用该接口测试,会发现只有一个服务能获取到锁执行业务代码,另一个服务只能等待锁释放后拿到锁才能执行业务代码。
ps:也可以使用redis的其他客户端实现分布式锁(如:jedis)
扩展:基于分布式锁解决定时任务重复问题的几种方式
-
只允许集群中的一台执行定时任务
-
数据库中设置定时任务表,并设置两个字段 执行状态(0-未执行 1-执行中) 和下次执行时间。每次定时任务执行的时候,先判断有没有到执行时间,然后判断执行状态,如果是1,则不执行。如果是0则执行。这里可以用update table set status = 1 where status = 0。这条语句只有一台机器选更新成功。更新成功的执行定时任务。
操作步骤:首先集成quartz,主要是依赖quartz中的corn表达式,然后自定义表,并且写个公共业务方法即可。 -
使用redis或者zk的分布式锁
-
quartz的另一种集成方式。quarz的相关资料,网上信息比较少。可以参考这个博客试试:https://blog.csdn.net/weixin_38192427/article/details/121111677
-
当当网的elastic-job。ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成,一般来说用ElasticJob-Lite(其实不懂)
ps:使用elastic job 需要提前安装好ZK环境。
基于Redis的Java客户端Redisson实现分布式锁
Redisson概述
Redis有很多Java客户端,我们比较常用有Jedis,spring-data-redis,lettuce等。另外一个非常好用的Redis的Java客户端——Redisson。先看一下Redis官网中介绍的Java客户端列表:
在这个列表中,我们可以看到Redisson的后面有笑脸,有星,说明还是比较受欢迎的。再看看后面的简介,Redisson是一个在Redis服务之上的,分布式、可扩展的Java数据结构。我们进入到Redisson的官网,看看官网是怎么介绍的。
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。
上面一段话看起来有点晦涩难懂,总结起来可以归结为一下几点:
- Redisson提供了使用Redis的最简单和最便捷的方法;
- 开发人员不需过分关注Redis,集中精力关注业务即可;
- 基于Redis,提供了在Java中具有分布式特性的工具类;
- 使Java中的并发工具包获得了协调多机多线程并发的能力;
Redisson特性
上面我们对Redisson有了一个整体的印象,接下来我们看看它都有哪些特点。
支持的Redis的配置
Redisson支持多种Redis配置,无论你的Redis是单点、集群、主从还是哨兵模式,它都是支持的。只需要在Redisson的配置文件中,增加相应的配置就可以了。
支持的Java实体
Redisson支持多种Java实体,使其具有分布式的特性。我们比较常用的有:AtomicLong(原子Long)、AtomicDouble(原子Double)、PublishSubscribe(发布订阅)等。
Java分布式锁与同步器
Redisson支持Java并发包中的多种锁,比如:Lock(可重入锁)、FairLock(公平锁)、MultiLock(联锁)、RedLock(红锁)、ReadWriteLock(读写锁)、Semaphore(信号量)、CountDownLatch(闭锁)等。我们注意到这些都是Java并发包中的类,Redisson借助于Redis又重新实现了一套,使其具有了分布式的特性。以后我们在使用Redisson中的这些类的时候,可以跨进程跨JVM去使用。
分布式Java集合
Redisson对Java的集合类也进行了封装,使其具有分布式的特性。如:Map、Set、List、Queue、Deque、BlockingQueue等。以后我们就可以在分布式的环境中使用这些集合了。
与Spring框架的整合
Redisson可以与Spring大家族中的很多框架进行整合,其中包括:Spring基础框架、Spring Cache、Spring Session、Spring Data Redis、Spring Boot等。在项目中我们可以轻松的与这些框架整合,通过简单的配置就可以实现项目的需求。
基于Redisson实现分布式锁具体编码步骤
-
引入Redisson的Jar包
-
进行Redisson与Redis的配置
-
使用分布式锁
-
三种使用Redisson整合java的方式
-
通过JAVA API方式引入Redisson
-
Spring项目使用xml引入Redisson
-
Spring Boot项目引入Redisson
-
通过JAVA API方式引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
@RequestMapping("redissonLock")
public String redissonLock() {
Config config = new Config();
// redis是单点就使用useSingleServer()
config.useSingleServer().setAddress("redis://192.168.218.20:6379");
RedissonClient redisson = Redisson.create(config);
log.info("我进入了方法!!");
try {
rLock.lock(30, TimeUnit.SECONDS);
log.info("我获得了锁!!!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("我释放了锁!!");
rLock.unlock();
}
log.info("方法执行完成!!");
return "方法执行完成!!";
}
Spring项目使用xml引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
@ImportResource("classpath*:redisson.xml") // 引入xml文件,注入xml文件中的bean
public class DistributeLockApplication {...}
redisson.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:redisson="http://redisson.org/schema/redisson"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://redisson.org/schema/redisson
http://redisson.org/schema/redisson/redisson.xsd
">
<redisson:client>
<redisson:single-server address="redis://192.168.218.20:6379"/>
</redisson:client>
</beans>
@Resource
private RedissonClient redisson;
@RequestMapping("redissonLock")
public String redissonLock() {
RLock rLock = redisson.getLock("order");
log.info("我进入了方法!!");
try {
rLock.lock(30, TimeUnit.SECONDS);
log.info("我获得了锁!!!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("我释放了锁!!");
rLock.unlock();
}
log.info("方法执行完成!!");
return "方法执行完成!!";
}
Spring Boot项目引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.19.3</version>
</dependency>
spring.redis.host=192.168.218.20
spring.redis.database=0
spring.redis.port=6379
spring.redis.password=123456
@Resource
private RedissonClient redisson;
@RequestMapping("redissonLock")
public String redissonLock() {
RLock rLock = redisson.getLock("order");
log.info("我进入了方法!!");
try {
rLock.lock(30, TimeUnit.SECONDS);
log.info("我获得了锁!!!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("我释放了锁!!");
rLock.unlock();
}
log.info("方法执行完成!!");
return "方法执行完成!!";
}
基于Zookeeper的瞬时节点实现分布式锁
Zookeeper的数据结构
如图,ZooKeeper 中的数据是由多个数据节点最终构成的一个层级的树状结构,和我们在创建 MySOL 数据表时会定义不同类型的数据列字段,ZooKeeper 中的数据节点也分为持久节点、临时节点和有序节点三种类型:
持久节点
我们第一个介绍的是持久节点,这种节点也是在 ZooKeeper 最为常用的,几乎所有业务场景中都会包含持久节点的创建。之所以叫作持久节点是因为一旦将节点创建为持久节点,该数据节点会一直存储在 ZooKeeper 服务器上,即使创建该节点的客户端与服务端的会话关闭了,该节点依然不会被删除。如果我们想删除持久节点,就要显式调用 delete 函数进行删除操作。
临时节点
接下来我们来介绍临时节点。从名称上我们可以看出该节点的一个最重要的特性就是临时性。所谓临时性是指,如果将节点创建为临时节点,那么该节点数据不会一直存储在 ZooKeeper 服务器上。当创建该临时节点的客户端会话因超时或发生异常而关闭时,该节点也相应在 ZooKeeper 服务器上被删除。同样,我们可以像删除持久节点一样主动删除临时节点。需要注意的是,临时节点不可以再有子节点。
在平时的开发中,我们可以利用临时节点的这一特性来做服务器集群内机器运行情况的统计,将集群设置为“/servers”节点,并为集群下的每台服务器创建一个临时节点“/servers/host”,当服务器下线时该节点自动被删除,最后统计临时节点个数就可以知道集群中的运行情况。
有序节点
最后我们再说一下有序节点,其实有序节点并不算是一种单独种类的节点,而是在之前提到的持久节点和临时节点特性的基础上,增加了一个节点有序的性质。所谓节点有序是说在我们创建有序节点的时候,ZooKeeper 服务器会自动使用一个单调递增的数字作为后缀,追加到我们创建节点的后边。例如一个客户端创建了一个路径为 works/task- 的有序节点,那么 ZooKeeper 将会生成一个序号并追加到该节点的路径后,最后该节点的路径为 works/task-1。通过这种方式我们可以直观的查看到节点的创建顺序。
Zookeeper相关命令
ls / ## 查看Zookeeper根目录下有哪些节点
ls /zookeeper ## 查看zookeeper节点下有哪些节点
get /zookeeper ## 获取zookeeper节点下的数据
create /lock distribute-lock ## 创建lock节点里面存储distribute-lock这条数据
zookeeper分布式锁原理
Zookeeper的观察器
- 作用:检测Zookeeper里面某个节点的变化,随后立刻通知到连接该Zookeeper的客户端。变化的情况有:节点被删除、节点里面内容发生变化、节点的子节点发生变化
- 可设置观察器的三个方法:
- getData():获取节点内容
- getChildren():获取子节点
- exists():判断当前节点是否存在
- 特点:观察器只能监控一次,再监控需重新设置
Zookeeper实现分布式锁原理
- 利用Zookeeper的瞬时有序节点的特性
- 多线程并发创建瞬时节点,得到有序的序列
- 序号最小的线程获得锁
- 其他线程则监听自己序号的前一个序号
- 前一个线程执行完成,删除自己序号的节点
- 因为设置了观察器,下一个序号的线程得到通知,继续执行
- 以此类推
- 创建节点时,已经确定了线程的执行顺序
zookeeper分布式锁代码实现
锁的封装
package com.example.distributelock.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Watcher:Zookeeper的观察器接口
*/
public class ZkLock implements Watcher, AutoCloseable {
public static final Logger log = LoggerFactory.getLogger(ZkLock.class);
private final ZooKeeper zooKeeper;
private final String businessName;
private String znode;
public ZkLock(String connectString, String businessName) throws IOException {
this.zooKeeper = new ZooKeeper(connectString,30000,this);
this.businessName = businessName;
}
public boolean getLock() throws KeeperException, InterruptedException {
Stat existsNode = zooKeeper.exists("/" + businessName, false);
if (existsNode == null){
/**
* create()方法:
* 参数一:节点路径
* 参数二:节点初始化内容
* 参数三:权限
* 参数四:创建模式,临时、持久、临时有序、持久有序四种模式
*/
// 创建业务根节点
zooKeeper.create("/" + businessName, businessName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建临时有序的锁节点,例如:/order/order_00000001
znode = zooKeeper.create("/" + businessName + "/" + businessName + "_", businessName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
znode = znode.substring(znode.lastIndexOf("/") + 1);
// 获取业务根节点下所有子节点
List<String> childrenNodes = zooKeeper.getChildren("/" + businessName, false);
// 子节点排序
Collections.sort(childrenNodes);
// 获取序号最小的(第一个)子节点
String firstNode = childrenNodes.get(0);
// 如果创建的节点不是第一个子节点,则监听前一个节点
if (!firstNode.equals(znode)) {
String lastNode = firstNode;
for (String node : childrenNodes) {
if (!znode.equals(node)) {
lastNode = node;
} else {
zooKeeper.exists("/" + businessName + "/" + lastNode,true);
break;
}
}
// wait()方法使用的时候一定要加上synchronized锁,这是java里面固定的写法
// 等待前面的节点被删除,调用process()方法唤醒线程
synchronized (this) {
wait();
}
}
// 如果创建的节点是第一个子节点,则获得锁
return true;
}
/**
* 如果监听器的节点消失就会调用该方法唤醒线程
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
@Override
public void close() throws Exception {
zooKeeper.delete("/" + businessName + "/" + znode, -1);
zooKeeper.close();
log.info("我释放了锁");
}
}
使用锁演示
package com.example.distributelock.controller;
import com.example.distributelock.lock.ZkLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ZkLockController {
public static final Logger log = LoggerFactory.getLogger(ZkLockController.class);
@RequestMapping("zkLock")
public String zkLock() {
log.info("我进入了方法!");
try (ZkLock zkLock = new ZkLock("192.168.218.21:2181,192.168.218.22:2181,192.168.218.23:2181", "order")) {
if (zkLock.getLock()) {
log.info("我进入了锁!!");
Thread.sleep(15000);
}
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法执行完成");
return "方法执行完成";
}
}
测试方法和结果
可以通过启动两个服务,同时调用该接口测试,会发现只有一个服务能获取到锁执行业务代码,另一个服务只能等待锁释放后拿到锁才能执行业务代码。
基于Zookeeper的Curator客户端实现分布式锁
Curator介绍
Curator是Netflix公司开源的一套zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如Leader选举、分布式锁等,减少了技术人员在使用Zookeeper时的底层细节开发工作。
具体编码步骤
- 引入curator客户端
- curator已经实现了分布式锁的方法
- 直接调用即可
详细代码
pom文件
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
测试代码
package com.example.distributelock.controller;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@RestController
public class ZkLockController {
public static final Logger log = LoggerFactory.getLogger(ZkLockController.class);
@Resource
private CuratorFramework client;
@RequestMapping("curatorLock")
public String curatorLock(){
log.info("我进入了方法!");
InterProcessMutex lock = new InterProcessMutex(client, "/order");
try{
if (lock.acquire(30, TimeUnit.SECONDS)){
log.info("我获得了锁!!");
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
log.info("我释放了锁!!");
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("方法执行完成!");
return "方法执行完成!";
}
}
测试方法和结果
可以通过启动两个服务,同时调用该接口测试,会发现只有一个服务能获取到锁执行业务代码,另一个服务只能等待锁释放后拿到锁才能执行业务代码。
分布式锁的对比
多种分布式锁实现方案,我将如何选择?
分布式锁实现方案优缺点分析
-
数据库
- 优点:实现简单、易于理解
- 缺点:对数据库压力大,如果要使用,建议锁的数据库和业务数据库分开
-
Redis
-
优点:易于理解
-
缺点:
-
自己实现,可能存在BUG
-
不支持阻塞,获取不到锁的线程立刻返回并没有等待
-
-
-
Zookeeper
- 优点:支持阻塞
- 缺点:实际工作中自己基于Zookeeper编程的情况非常少,所以需要理解Zookeeper,程序复杂
-
Curator
- 优点:提供锁的方法,支持阻塞
- 缺点:依赖Zookeeper,强一致,即如果Zookeeper是集群,获取锁是需要将锁这个瞬时节点同步到Zookeeper的所有节点上,对Zookeeper集群压力比较大
-
Redisson
- 优点:提供锁的方法,可阻塞
对于实际工作中的总结
- 不推荐使用自己编写的分布式锁
- 推荐使用Redisson和Curator实现的分布式锁