目录
一、锁的种类
二、分布式锁具备的条件与刚需
三、springboot+redis+ngnix单机实现案例
四、Nginx配置负载均衡
4.1、修改nginx配置文件
4.2、执行启动命令
4.3、启动微服务程序测试
五、使用redis分布式锁
5.1、方法递归重试
5.2、自旋方式
5.3、添加key过期时间,防死锁
5.4、防止误删除
5.6、使用Lua脚本优化
5.7、可重入锁
5.7.1、隐式锁(synchronized)
5.7.2、显式锁(Lock)
5.7.3、redis的Map类型替代可重入锁的计数问题
5.8、自动续期
一、锁的种类
1、单机版同一个JVM虚拟机内,synchronized或者Lock接口
2、分布式多个不同JVM虚拟机,单机的线程机制不再起作用,资源类不在同一服务器上,不在共享。
二、分布式锁具备的条件与刚需
独占性:
OnlyOne,任何时刻只能有且仅有一个线程持有。
高可用:
在redis集群环境下,不能因为某一个节点挂掉而出现获取锁和释放锁失败的情况,高并发请求下,依旧性能好用。
防死锁:
杜绝死锁,必须有超时控制机制或者撤销操作,必须有最终解决方案。
不乱抢:
自己的锁只能自己释放,不能释放不属于自己的锁。
重入性:
同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。
三、springboot+redis+ngnix单机实现案例
场景:
多个服务间保证同一时刻同一时间段内同一用户只能有一个请求 (防止关键业务出现并发攻击)
采用微服务创建:
test1与test2两台pom导入jar一致
<dependencies>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<!--swagger-ui-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.13</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
springboot配置文件
server.port=8081
spring.application.name=redis7test1
#swagger2
spring.swagger2.enabled=true
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
#redis配置
spring.redis.database=0
spring.redis.host=192.168.200.110
spring.redis.port=6379
spring.redis.password=123456
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.max-wait=-1ms
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.min-idle=0
配置类
@Configuration
public class RedisConfig {
/**
* redis序列化配置类
*
*/
@Bean
public RedisTemplate<String , Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory){
RedisTemplate<String , Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式string
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式json,使用GenericJackson2JsonRedisSerializer替换默认序列化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
@Component
@EnableSwagger2
public class SwaggerConfig {
@Value("${spring.swagger2.enabled}")
private Boolean enabled;
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.enable(enabled)
.select()
.apis(RequestHandlerSelectors.basePackage("com.cjc.redis7test1"))
.paths(PathSelectors.any())
.build();
}
public ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("springboot利用swagger2构建api接口文档"+"\t"+ DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDateTime.now()))
.description("springboot+redis整合,有问题联系客服小丽:123456789")
.version("1.0")
.termsOfServiceUrl("yyyyyyyyy")
.build();
}
}
service层
@Service
@Slf4j
public class InventoryServiceImpl implements InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String port;
public static final String IN_KEY = "inventory";
private Lock lock = new ReentrantLock();
@Override
public String sale() {
String resultMessage = "";
//加锁
lock.lock();
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
//解锁
lock.unlock();
}
return resultMessage;
}
}
controller层
@RestController
@Slf4j
@Api(tags = "redis分布式锁测试")
public class InventoryController {
@Autowired
private InventoryService inventoryService;
@ApiOperation("扣减库存,一次卖一个")
@RequestMapping(value = "/sale",method = RequestMethod.GET)
public String sale(){
return inventoryService.sale();
}
}
上述代码在test2上拷贝,修改端口号
四、Nginx配置负载均衡
上面的代码在分布式部署后,单机锁将会出现超卖现象,需要分布式锁。
4.1、修改nginx配置文件
cd /usr/local/nginx/conf目录下的nginx.conf文件
4.2、执行启动命令
在/usr/local/nginx/sbin目录下启动
启动命令:
./nginx -c /usr/local/nginx/conf/nginx.conf
关闭
./nginx -s stop
重启
./ngnix -s reload
4.3、启动微服务程序测试
手动测试,程序正常
使用jmeter工具进行压测
原因如下:
在单机环境下,可以使用synchronized或lock来实现
在分布式系统中,因为竞争的线程可能不在同一个节点上(即同一个jvm中)所以锁就失效了,就需要利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程。
结论:
单机版加锁配合nginx和jmeter压测后,不满足高并发分布式锁的性能要求
五、使用redis分布式锁
5.1、方法递归重试
@Override
public String sale(){
String resultMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
//没有抢到锁将进行重试
if (!flag){
//暂停20毫秒,进行递归重试,
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sale();
}else {
//拿到锁进行业务处理
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
stringRedisTemplate.delete(key);
}
}
return resultMessage;
}
使用方法递归重试测试通过,但并发量一高,容易出现StackOverflowError,高并发唤醒后推荐使用while判断,不推荐使用。
5.2、自旋方式
@Override
public String sale() {
String resultMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//使用自旋代替递归,使用while进行判断
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){
//暂停20毫秒,进行递归重试,
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//拿到锁进行业务处理
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
stringRedisTemplate.delete(key);
}
return resultMessage;
}
上面代码在程序挂掉的情况下且没有走到finally这块,由于该key没有设置过期时间,导致会没办法删除。
5.3、添加key过期时间,防死锁
@Override
public String sale() {
String resultMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//使用自旋代替递归,使用while进行判断
//加锁和过期时间设置必须是同一行,保证原子性
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,20L,TimeUnit.SECONDS)){
//暂停20毫秒,进行递归重试,
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//拿到锁进行业务处理
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
stringRedisTemplate.delete(key);
}
return resultMessage;
}
问题当A线程处理业务时间超过过期时间,B线程也建立了一道锁,会导致A线程误删除B线程的锁。
5.4、防止误删除
@Override
public String sale() {
String resultMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//使用自旋代替递归,使用while进行判断
//加锁和过期时间设置必须是同一行,保证原子性
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,20L,TimeUnit.SECONDS)){
//暂停20毫秒,进行递归重试,
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//拿到锁进行业务处理
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
//删除锁时,判断该锁是否是同一客户端的
if (stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)){
stringRedisTemplate.delete(key);
}
}
return resultMessage;
}
在判断与删除时不是一个原子操作,可能会因为一些特性原因删除失败,需要使用Lua脚本进行优化
5.6、使用Lua脚本优化
@Override
public String sale() {
String resultMessage = "";
String key = "redisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//使用自旋代替递归,使用while进行判断
//加锁和过期时间设置必须是同一行,保证原子性
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,20L,TimeUnit.SECONDS)){
//暂停20毫秒,进行递归重试,
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//拿到锁进行业务处理
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
//使用lua脚本的redis分布式调用
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else" +
"\treturn 0 end";
stringRedisTemplate.execute(new DefaultRedisScript(luaScript,Boolean.class), Arrays.asList(key),uuidValue);
}
return resultMessage;
}
上述代码不满足可重入性
5.7、可重入锁
在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(锁对象是同一对象),不会因为之前已经获取过还没有释放而阻塞。Java中ReentrantLoak和synchronized都是可重入锁,可重入锁的一个优点是一定程度上避免了死锁。
5.7.1、隐式锁(synchronized)
Object o = new Object();
@Test
void contextLoads() {
new Thread(()->{
synchronized (o){
System.out.println(Thread.currentThread().getName() + "\t外层调用");
synchronized (o){
System.out.println(Thread.currentThread().getName() + "\t中层调用");
synchronized (o){
System.out.println(Thread.currentThread().getName() + "\t内层调用");
}
}
}
},"a").start();
}
@Test
void entry(){
m1();
}
private synchronized void m1() {
System.out.println(Thread.currentThread().getName() + "\t外层调用");
m2();
}
private synchronized void m2() {
System.out.println(Thread.currentThread().getName() + "\t中层调用");
m3();
}
private synchronized void m3() {
System.out.println(Thread.currentThread().getName() + "\t内层调用");
}
原理:
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。
当执行monitorenter时,如果目标锁对象的计数器为0,那么它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将计数器加1。
在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直到持有线程释放该锁。
当执行monitorexit时,Java虚拟机则需要将锁对象的计数器减1,计数器为零代表锁已被释放。
5.7.2、显式锁(Lock)
@Test
void lockTest(){
Lock lock = new ReentrantLock();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t外层调用");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t中层调用");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
},"a1").start();
try {
TimeUnit.MILLISECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t内层调用");
} finally {
lock.unlock();
}
},"a2").start();
}
5.7.3、redis的Map类型替代可重入锁的计数问题
加锁Lua脚本:
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
redis.call('hincrby',KEYS[1],ARGV[1],1)
redis.call('expire',KEYS[1],ARGV[2])
return 1
else
return 0
end解锁Lua脚本:
if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then
return nil
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then
return redis.call('del',KEYS[1])
else
return 0
end
/**
* 自定义redis分布式锁
*/
@Slf4j
public class MyRedisLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
private Long expireTime;
public MyRedisLock(StringRedisTemplate stringRedisTemplate, String lockName,String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = uuid + ":" + Thread.currentThread().getId();
this.expireTime = 50L;
}
@Override
public void lock() {
tryLock();
}
@Override
public boolean tryLock() {
try {
tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time == -1L){
String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
log.info("lockName:{},uuidValue:{}",lockName,uuidValue);
//加锁失败重试
Boolean execute = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
while (!execute){
TimeUnit.MILLISECONDS.sleep(60);
}
return true;
}
return false;
}
@Override
public void unlock() {
String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then\t" +
"return nil\t" +
"elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then\t" +
"return redis.call('del',KEYS[1])\t" +
"else\t" +
"return 0\t" +
"end";
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue,String.valueOf(expireTime));
if (flag == null){
throw new RuntimeException("this lock doesn't exists");
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Component
public class DistributedLockFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuid;
public DistributedLockFactory() {
this.uuid = IdUtil.simpleUUID();
}
public Lock getDistributedLock(String lockType){
if (lockType == null){
return null;
}
if (lockType.equalsIgnoreCase("REDIS")){
lockName = "redisLock";
return new MyRedisLock(stringRedisTemplate,lockName,uuid);
}
if (lockType.equalsIgnoreCase("MySQL")){
this.lockName = "mysqlLock";
//TODO 完善
return null;
}
return null;
}
}
@Override
public String sale() {
String resultMessage = "";
//加锁
Lock redisLock = distributedLockFactory.getDistributedLock("REDIS");
redisLock.lock();
try {
//查询redis里的库存
String result = stringRedisTemplate.opsForValue().get(IN_KEY + 1);
//判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//卖出商品扣减库存
if (inventoryNumber > 0){
//存入redis
stringRedisTemplate.opsForValue().set(IN_KEY + 1,String.valueOf(--inventoryNumber));
resultMessage = "成功卖出商品,库存剩余:" + inventoryNumber;
log.info(resultMessage + "\t服务端口号:{}",port);
}else {
resultMessage = "商品已售空";
}
} finally {
//解锁
redisLock.unlock();
}
return resultMessage;
}
5.8、自动续期
Lua脚本:
if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
return redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end
public void renewExpire(){
String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then\t" +
"return redis.call('expire',KEYS[1],ARGV[2])\t" +
"else\t" +
"return 0\t" +
"end";
//定期扫描
new Timer().schedule(new TimerTask() {
@Override
public void run() {
Boolean execute = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if (execute){
renewExpire();
}
}
},(this.expireTime * 1000/3));
}