声明:我的大部分篇幅都讲的分布式锁的原理和实现,如果想直接用Redisson框架实现分布式锁,可以直接翻至最后面
关于分布式锁,适用于并发量特别大的微服务集群,能做到同步的实现资源的获取
我其实没有经过真实项目的分布式锁的实践,以下的作为我学习的参考,但据我了解一般使用
redis
作为分布式锁的公司具体实现也如我下述的redisson
框架,只要不是像淘宝、京东那样并发量特别高的项目都基本适用,如果以后有机会使用到了分布式锁的应该场景我也会更新本文我会从分布式锁的原理解析、代码、框架一一解析,本文解析代码部分仅供参考
需要用到的知识点(必须会用)
一、JMeter用于做压力测试: jmeter安装与使用,全图文讲解
二、Nginx用于做负载均衡(多服务):Windows安装Nginx并配置负载均衡
三、Redis用于存储Mock数据:Windows安装Redis做到双击启动
四、启动两个相同服务但端口不同的项目:idea实现同时启动两个相同服务但不同端口的项目,全图解
五、redis工具类:Redis工具类(redisTemplate)以及 redisTemplate 的用法
图解分布式锁(秒杀场景)
图解为什么需要用到分布式锁(我都忘记画返回用户的数据操作了,但不影响,知道最后需要返还给用户消息就可以了)
场景一、单服务不需要分布式锁(为了省事,我就只画两个用户)
1、不加锁就会造成数据的脏读
2、使用synchronized实现加锁处理
场景二、集群(分布式锁实现)(多个相同服务但端口不同的项目,经过负载均衡到不同的服务)(为了省事,我就只画两个服务)
1、只在服务层面加锁是达不到效果的
2、需要用到redis的setnx请求
原理
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
如上面代码所示
setnx
请求是指,先查找查找是否有mykey
这个key
,如果没有则放入一个Hello
的value
并返回1
(就是表名插入成功),如果已经存在mykey
这个key
则返回0
(表示插入失败)由于redis是单线程的所有在redis方法会让进来的请求进行排队,下面用户一的请求比用户二的请求快一丢丢访问redis,实现分布式锁
用户一在服务一使用
setnx
这个方法插入一个key
返回成功,并表示用户一拿到分布式锁同一时间用户二在服务二使用
setnx
这个方法插入一个相同的key
,这时提示插入失败,返回失败,然后服务二就自旋拿锁或者直接返回用户稍后重试(直接返回用户不友好)之后用户一处理完请求,修改redis数据,最后把
key
删除掉,这样其他的服务就可以拿到锁了,就可以使用setnx
命令尝试加锁,拿到锁后操作redis
的数据,拿不到锁的执行上一个步骤(这里不是有三个点吗,就是执行第二个点后面的内容)
代码解析分布式锁原理(秒杀场景)
初始化Redis数据(用于存储秒杀使用的商品)
1、使用redis配置一个用于秒杀服务的mock数据,我这里设置key为goods,value为50,每次使用完数据,需要重新让goods数据变为50(工具:Another Redis Desktop Manager)
场景一、单服务下的秒杀实现(加锁)(不需要分布式锁)(对应图解场景一)
SpringBoot
作为实现分布式的基本框架,只跑单个服务的时候,使用的是一个jvm
来控制代码。我们假设一个秒杀的项目实现:
1、写一个接口用于测试秒杀
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@RequestMapping("/demo")
public String demo(){
try {
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 这里模拟一下延时 0.1秒 (因为数据量太少,这样可以很直观的看出加锁和不加锁的区别)
Thread.sleep(100);
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
} catch (InterruptedException e) {
return "错误";
}
return "你已经成功获取商品";
}
}
2、配置JMeter
一瞬间有20个用户去抢goods
这个商品
3、测试
按理说,每个用户只抢一个商品,那么会剩余30个商品,我们来看看下述情况,发现全部的用户都拿到的是第50这个数据,所以减1操作,后都在redis中存储的是49,就造成了数据的脏读,而修改此处代码非常的简单
控制台:
redis:
4、解决单服务下的脏读问题(加锁),记得修改redis中的goods数据至50,还是请求20次
只需要修改代码,加入锁就可以了,这样就实现了基于jvm层面的加锁了
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@RequestMapping("/demo")
public String demo(){
try {
// 加锁
synchronized (this){
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 这里模拟一下延时 0.1秒 (因为数据量太少,这样可以很直观的看出加锁和不加锁的区别)
Thread.sleep(100);
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
}
} catch (InterruptedException e) {
return "错误";
}
return "你已经成功获取商品";
}
}
5、测试
控制台:
redis:
场景二、集群下的秒杀实现(分布式锁实现)(对应图解场景二)
1、启动Nginx,配置负载均衡,并启动两个服务(8080端口、9090端口),测试 场景一 4 的代码
会出现怎样的错误,记得修改redis中的goods数据至50(以下操作不知道怎么处理的,可以看需要用到的知识点)
Nginx配置(nginx.conf),并启动:
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
# 负载均衡配置访问路径 serverList名字随便取
upstream serverList{
# 这个是tomcat的访问路径
server localhost:8080;
server localhost:9090;
}
server {
listen 80;
server_name localhost;
location / {
root html;
proxy_pass http://serverList;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
重新配置JMeter:
让两个服务分别处理25个请求
两个项目启动,并清空控制台:
2、测试
后面有很多两个项目取出了相同的商品,我这里就不拉开展示了,知道有问题就好了
8080端口:
9090端口:
redis:并不是我们想象的0,所以使用单个jvm下面的加锁是行不通的
3、修改代码,实现加redis分布式锁
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@RequestMapping("/demo")
public String demo(){
// 设置自旋超时时间
long timeoutAt = System.currentTimeMillis();
try {
// 自旋
while (true){
long now = System.currentTimeMillis();
// 5秒超时,退出
if (now - timeoutAt > 5000){
System.out.println("连接超时请重试");
return "连接超时请重试";
}
// 加锁
boolean lock = redisUtil.setnx("lock", "先随便输入什么东西都可以啦~");
if (!lock){
continue;
}
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 这里加了锁就不需要
// Thread.sleep(100);
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
// 解锁
redisUtil.delete("lock");
return "你已经成功获取商品";
}
} catch (Exception e) {
return "错误";
}
}
}
4、测试,记得修改redis中的goods数据至50
8080端口:
9090端口:
redis:
一个redis分布式锁的基础已经完成了,但是这样实现的分布式锁有很多的问题,在高并发的条件下根本不够看,下面是进阶教学(以下方案均经过测试)
问题一、在解锁之前出现异常,导致不能解锁,那么其他的服务都不可以访问redis
解决:修改解锁的代码至finally代码块中
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@RequestMapping("/demo")
public String demo(){
// 设置自旋超时时间
long timeoutAt = System.currentTimeMillis();
// 自旋
while (true){
long now = System.currentTimeMillis();
// 5秒超时,退出
if (now - timeoutAt > 5000){
System.out.println("连接超时请重试");
return "连接超时请重试";
}
// 加锁
boolean lock = redisUtil.setnx("lock", "先随便输入什么东西都可以啦~");
if (!lock){
continue;
}
try {
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
return "你已经成功获取商品";
} catch (Exception e) {
e.printStackTrace();
return "请重试";
} finally {
// 解锁
redisUtil.delete("lock");
}
}
}
}
问题二、如果在执行到解锁之前,服务直接挂掉了,那么其他的服务都不可以访问redis
解决:设置缓存时间(10s)
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@RequestMapping("/demo")
public String demo(){
// 设置自旋超时时间
long timeoutAt = System.currentTimeMillis();
// 自旋
while (true){
long now = System.currentTimeMillis();
// 5秒超时,退出
if (now - timeoutAt > 5000){
System.out.println("连接超时请重试");
return "连接超时请重试";
}
// 加锁,并设置缓存时间 10秒
boolean lock = redisUtil.setnx("lock", "先随便输入什么东西都可以啦~",10, TimeUnit.SECONDS);
if (!lock){
continue;
}
try {
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
return "你已经成功获取商品";
} catch (Exception e) {
e.printStackTrace();
return "请重试";
} finally {
// 解锁
redisUtil.delete("lock");
}
}
}
}
问题三、假如第一个服务跑了11秒,但10秒后,然后锁的时间到了,那么第二个服务就可以拿到锁并访问redis了,这样在第二个服务期间,第一个服务完成了,那么第一个服务会释放掉第二个服务的锁,这样就导致了锁的失效问题
解决一:设置一个唯一标识(uuid),删除的时候判断是不是该请求设置的锁就可以了,但是,这里有个问题,就是10秒后,虽然第一个服务不可以删除除自己以外的锁,但是有其他的服务拿到这把锁进行redis操作,如果第一个服务在第二个服务修改redis数据之后再去修改redis,那么第三个服务拿到redis的数据就是有问题的,这样也会导致锁失效问题
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@RequestMapping("/demo")
public String demo(){
// 设置自旋超时时间
long timeoutAt = System.currentTimeMillis();
// 设置唯一标识
String uuid = UUID.randomUUID().toString();
// 自旋
while (true){
long now = System.currentTimeMillis();
// 5秒超时,退出
if (now - timeoutAt > 5000){
System.out.println("连接超时请重试");
return "连接超时请重试";
}
// 加锁,设置超时时间和唯一标识
boolean lock = redisUtil.setnx("lock", uuid,10, TimeUnit.SECONDS);
if (!lock){
continue;
}
try {
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
return "你已经成功获取商品";
} catch (Exception e) {
e.printStackTrace();
return "请重试";
} finally {
if (uuid.equals(redisUtil.get("lock"))){
// 解锁
redisUtil.delete("lock");
}
}
}
}
}
解决二:设置心跳检测,可以去看看这篇Redis分布式锁如何解决锁超时问题?
问题四、串行太慢了怎么办
解决:拆分:使用不同的锁和不同的关键字,比如goods为50个商品,可以拆分为:goods_1:10,goods_2:10,goods_3:10,goods_4:10,goods_5:10
问题五、使用Redis集群时,主节点挂了,而子节点刚好没有同步到刚刚上传的key,导致锁失效
解决:使用zookeeper实现分布式锁
等等等等。。。。。。
使用Redisson框架实现分布式锁
Redisson原理图
加入jar包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
配置Bean
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redisson(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0).setPassword("123456");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
测试(别看代码少,其实这个是经过多年实战,很有保障的)
@RestController
public class DemoController {
@Autowired
private RedisUtil redisUtil;
@Resource
private Redisson redisson;
@RequestMapping("/test")
public String Test(){
RLock lock = redisson.getLock("lock");
// 设置超时时间30秒
lock.lock(30,TimeUnit.SECONDS);
try {
// 取goods的值
Integer goods = (Integer) redisUtil.get("goods");
if (goods <= 0){
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "商品已经取完");
return "商品已经取完";
}
// 用户拿到了这个商品,所以这个商品需要自减一
// 使用System.err输出一下数据,这样显示控制台输出是红色比较好观察
System.err.println(Thread.currentThread().getName() +
Thread.currentThread().getId() + "取出了第" + goods-- + "商品");
// 由于用户已经取到了商品,所以redis中的数据也需要更新
redisUtil.set("goods",goods);
return "你已经成功获取商品";
} catch (Exception e) {
e.printStackTrace();
return "请重试";
}finally {
lock.unlock();
}
}
}