分布式锁之redis实现

news2025/1/10 16:28:47

docker安装redis

拉取镜像

docker pull redis:6.2.6

查看镜像

87d429bb8dfa467baedf8733e62ac37b.png

启动容器并挂载目录

需要挂在的data和redis.conf自行创建即可

docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data --name redis -p 6379:6379 redis:6.2.6 redis-server /usr/local/etc/redis/redis.conf

查看运行状态 

不要忘记开放端口6379

b8ff2272d9354ac39d198b5819e62aef.png

进入容器内部使用redis-cli

docker exec -it 13829d3f335a /bin/bash

redis-cli

[可选]用密码登录 

修改redis.conf配置文件,设置 requirepass xxxxx

spring boot 集成redis

添加依赖

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>

添加redis配置 


server.port= 10010

spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
spring.datasource.url= jdbc:mysql://39.106.53.30:3306/lock_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root123456

spring.redis.host=39.106.53.30
spring.redis.port=6379

使用StringRedisTemplate

如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二进制,使用StringRedisTemplate默认初始化序列化器就是String类型

    public StringRedisTemplate() {
        this.setKeySerializer(RedisSerializer.string());
        this.setValueSerializer(RedisSerializer.string());
        this.setHashKeySerializer(RedisSerializer.string());
        this.setHashValueSerializer(RedisSerializer.string());
    }

redis演示超卖问题

执行票数存入redis指令

set ticket 5000

 编写代码演示超卖问题

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        //获取redis中的票数
        String ticket = redisTemplate.opsForValue().get("ticket");

        if(ticket!= null && ticket.length() != 0){
            // 扣减票数

            Integer integer = Integer.valueOf(ticket);

            if(integer >0){
                redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
            }

        }

    }
}

 5000请求压测,结果为4895,发生了超卖问题

a7fc4d7f1ee548169fd7f08b0fc6ca0a.png
e1d1f4af1d704938b8188b32b6ccbc36.png
redis解决超卖问题 

解决方案

解决方案

  •         本地jvm锁(这种情况仅限单机,不做介绍)
  •         redis乐观锁 watch  multi exec(性能低)
  •         分布式锁(redis+lua手动实现或者通过redission实现)

redis乐观锁实现 

watch: 监控一个或者多个key,如果这些key在提交事务(exec)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作

multi: 开启事务,使用该命令,标记一个事务块的开始,redis会将这些操作放入队列中

exec: 执行事务

720c867464d145e68e27ad877dc0f155.png

 乐观锁的代码需要包在SessionCallback中实现

package com.test.lockservice.service.impl;

import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        redisTemplate.execute(new SessionCallback<Object>() {
            @Override
            public  Object execute(RedisOperations redisOperations) throws DataAccessException {

                // 开启监听
                redisOperations.watch("ticket");
                //获取redis中的票数
                String ticket = redisTemplate.opsForValue().get("ticket");

                if(ticket!= null && ticket.length() != 0){
                    // 开启事务
                    redisOperations.multi();
                    Integer integer = Integer.valueOf(ticket);
                    // 扣减票数
                    redisOperations.opsForValue().set("ticket",String.valueOf(--integer));

                    // 提交事务
                    List exec = redisOperations.exec();

                    // 如果获取锁失败 ,重试
                    if(exec == null || exec.size() == 0){
                        try {
                            // 减少锁争抢,避免栈内存溢出
                            Thread.sleep(40);
                            sellTicket();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                }
                return null;
            }
        });


    }
}

 1000请求压测,结果为4000,没有发生超卖,但性能极低

b975be5edee2453597b57ccd66d557f1.pngredis实现分布式锁

分布式锁的实现方案中redis的实现主要思想就是独占排他使用,在redis中可以使用setnx命令进行独占排他使用

  • 加锁 setnx 
  • 解锁 del
  • 重试:递归(容易造成栈内存溢出),这里使用循环

 

package com.test.lockservice.service.impl;

import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁操作
            redisTemplate.delete("lock");
        }

    }
}

压测1000,显示无超卖现象 

5629d991544d4b7c9b77472abca1c682.png042eaa1342124922a9d528dc660a16ec.png

添加过期时间防止死锁问题

当前代码存在问题,假如现在有4台服务器争抢锁,编号为1的服务器抢到了锁,但是没来得及释放锁,就宕机啦,其他2,3,4服务器就永远拿不到锁,这就是产生的死锁问题,解决方案是给锁添加过期时间来解决

4affdb3b239141e78d695451512263fb.png

要保证枷锁和设置过期时间具有原子性,否则加了锁,没来得及给过期时间就宕机啦,又会产生死锁问题

expire key 20指令和枷锁指令是两条指令不具有原子性,在这里使用 set key ex 20 nx命令设置过期时间来保证原子性

9d6becfe33a84ebf93433896a4d65bcf.png

添加过期时间和获取锁的原子性

redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)

 // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 }

通过UUID防止误删

因为已经加了过期时间,如果加了3秒过期时间,第一个请求到了第3秒还没执行完毕,锁就失效了,这时第二个请求获取锁,执行1s的时候,第一个请求执行到del指令,就把第二个锁删除掉啦(误删)

解决方案:通过uuid标识是自己的锁,通过判断是自己的锁,在删除

84e258e5fb574c91a78b4244a41923d9.png

添加uuid防止误删

    public  void sellTicket(){

        String uuid = UUID.randomUUID().toString();
        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 判断是自己的锁在删除
            if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
                redisTemplate.delete("lock");
            }
            
        }

    }

使用Lua脚本解决防误删的原子性问题

判断和删除锁之间需要保证原子性第一个请求因为如果判断的时候,发现是自己的锁,然后此时锁超过了过期时间,此时,第二个请求获取到锁,第一个请求执行del指令,删除的是第二个请求的锁,所以需要在判断和删除锁之间保持原子性

解决方案:使用Lua脚本保证原子性,Lua脚本将多条命令一次性发给redis,redis单线程的特性可以保证原子性操作

Lua脚本介绍和redis执行Lua脚本 

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,编译后仅仅一百余K,可以很方便的嵌入别的程序里

菜鸟地址:https://www.runoob.com/lua/lua-variables.html

Lua脚本流程控制和变量定义

--[ 定义全局变量a 局部变量用local a --]
a = 100;
--[ 检查条件 --]
if( a < 20 )
then
   --[ if 条件为 true 时执行该语句块 --]
   print("a 小于 20" )
else
   --[ if 条件为 false 时执行该语句块 --]
   print("a 大于 20" )
end
print("a 的值为 :", a)

在redis中执行Lua脚本

redis中继承了Lua脚本,lua-time-limit参数现在脚本最长运行时间,默认是5秒,执行指令为:

eval script numkeys key [key ...] arg [arg ...]

numkeys:标识key的数量 不能省略

hello word

eval "return 'hello world'" 0

分支语句KEYS和ARGV必须大写

eval "if KEYS[1]==1 then return KEYS[1] else return  ARGV[1] end" 1 0 3 

4c5257c42e834f25958f88987fdfb151.png解决判断和删除之间的原子性问题

// 如果是自己的锁,则删除,否则返回0为false
if redis.call('get',KEYS[1]) == ARGV[1]
then
    return redis.call('del',KEYS[1])
else
    return 0
end

keys:lock

argv: uuid
 public  void sellTicket(){

        String uuid = UUID.randomUUID().toString();
        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

            this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Collections.singletonList("lock"),uuid);

        }

    }

 压测1000 显示无超卖现象

daf44ce416a4420294d1bd923028b763.png

4b48b5d2be32427c9ec2bf1033598291.pnghash+Lua解决锁的可重复入问题

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行

第一个就是锁的重入问题

当前方法a获取锁,在方法之中调用b方法,b方法也需要获取锁,这个时候造成了死锁问题,采用hash+Lua脚本解决

第二个就是锁的自动续期问题:后续会解决续期问题

探讨ReentrantLock的可重入原理

ReentrantLock继承了aqs,aqs是锁的基石

可重入锁加锁流程

  • CAS获取锁,如果没有线程占用锁(state==0),加锁成功并记录当前线程是有锁线程
  • 如果state的值不为0,说明锁已经被占用。则判断当前线程是否是有锁线程,如果是则重入 (state + 1)
  • 否则加锁失败,入队等待

可重入锁解锁流程

  • 判断当前线程是否是有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,则返回false

hash+Lua实现可重复入锁

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁: hash + lua脚本
加锁

  •     判断锁是否存在 (exists),则直接获取锁 hset key field value
  •     如果锁存在则判断是否自己的锁 (hexists),如果是自己的锁则重入: hincrby key field increment
  •     否则重试:递归 循环
加锁
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次
数加1

if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 
then 
	redis.call('hincrby',KEYS[1], ARGV[1], 1)
	redis.call('expire',KEYS[1],ARGV[2])
	return 1
else 
	return 0
end

keys lock
argv uuid 30


解锁
判断 hash set 可重入 key 的值是否等于 0
如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
如果为 0 代表 可重入次数被减 1
如果为 1 代表 该可重入 key 解锁成功
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
if redis.call('hexists',KEYS[1],ARGV[1])==0 
then 
	return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 
then 
	return 0 
else 
	redis.call('del',KEYS[1]) 
	return 1 
end

keys lock
argv uuid

exists判断lock是否存在,hexists lock uuid 判断filed是否存在

通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1,hincrby命令,如果增加的key filed 不存在则新增并加1

ef39ebc7bb9840cd8169db5e279dc80a.png

加锁工具类

package com.test.lockservice.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.UUID;

public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private Integer expire = 30;
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (uuid == null) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
        this.expire = expire;
    }

    public void lock(){
        this.lock(expire);
    }

    public void lock(Integer expire){
        this.expire = expire;
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        System.out.println(script);
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public void unlock(){
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";
        /**
         * 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断
         * 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功
         */
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }
}

测试可重入 

@Override
    public  void checkAndLock(){
        RedisLock lock = new RedisLock(redisTemplate, "lock");
        lock.lock();
        // 查询票数
        Ticket ticket = ticketMapper.selectOne(new QueryWrapper<Ticket>().eq("sell_company", "12306"));
        // 判断不为空和票数大于0
        if(ticket!=null&& ticket.getCount() > 0){
            ticket.setCount(ticket.getCount()-1);
            ticketMapper.updateById(ticket);
        }
        // 测试可重入
        testRepeatEntry();
        lock.unlock();
    }

    public void testRepeatEntry(){
        RedisLock lock = new RedisLock(redisTemplate, "lock");
        lock.lock();
        System.out.println("redis分布式锁测试可重入");
        lock.unlock();
    }

 压测1000,未发现超卖问题,并解决可重入的问题

7dfd3282120641e2a34d09da730a07c6.png

d920dd12a6cb4e7dbce13ea0c1b15000.png锁的自动续期

如果在锁还在使用过程中,锁还未使用完,就失效了,也就产生了锁如何自动添加过期时间的问题 

实现方案: 定时器 + Lua脚本定时续期


自动续期

if redis.call('hexists',KEYS[1],ARGV[1])==1
then
	redis.call('expire',KEYS[1],ARGV[2]) 
	return 1 
else 
	return 0 
end

这里没有选用线程池的原因在于释放锁之后没有取消定时任务的方法,所以选用jdk自带的

Timer作为定时任务 

package com.test.lockservice.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.*;

public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private Integer expire = 30;

    @SuppressWarnings("all")
    private static final Timer timer = new Timer();

    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (uuid == null) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
        this.expire = expire;
    }

    public void lock(){
        this.lock(expire);
    }

    public void lock(Integer expire){
        this.expire = expire;
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 自动续期
        renewExpire();

    }

    public void unlock(){
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";
        /**
         * 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断
         * 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功
         */
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
        // 释放锁成功
        this.uuid = null;
    }


    @SuppressWarnings("all")
    private void renewExpire() {
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                if (uuid != null) {
                    redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), RedisLock.this.uuid, expire.toString());
                    renewExpire();
                }
            }
        },expire * 1000 / 3);
    }
}

红锁算法 

利用红锁算法解决集群下锁的问题:

  • 1、应用程序获取当前系统时间
  • 2、应用程序以相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过了一定时间(小于过期时间)没有获取到锁,则放弃,尽快从其他节点获取锁,避免一个节点宕机阻塞
  • 3、计算锁的消耗时间= 客户端当前时间-step1中的事件,获取锁的时间小于总的锁定时间,并且半数以上节点获取锁成功,认为获取锁成功
  • 4、如果获取锁失败,对所有节点释放锁

redis分布式锁小结

redis分布式锁最开始采用setnex+Lua脚本的方式,我们发现存在不可重入的问题,于是使用hash+Lua脚本解决可重入问题,并解决了自动续期问题,但是还存在一个重要问题,就是redis集群部署所带来的并发问题,所以使用Redission作为最终的分布式锁解决方案

redis集群状态下的问题:
  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁

redisson中的分布式锁  

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson引入依赖

 <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.11.2</version>
</dependency>

Redission配置 

package com.test.lockservice.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author sl
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
//        config.useClusterServers()
        config.useSingleServer().setAddress("redis://39.106.53.30:6379").setPassword("12345");
        return Redisson.create(config);
    }
}

Redission使用

@Autowired
    private RedissonClient redissonClient;


    public void userRedisson(){
        // 获取锁
        RLock lock = redissonClient.getLock("lock");
        try {
            // 加锁
            lock.lock();
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数
                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁
            lock.unlock();
        }
    }

1000并发压测,发现并无超卖问题 

60440ac508af4380ba2ecc5de16754fa.png

c017a7803ee2469c9e89014ef00362c7.png

RLock原理

 RLock对象实现了 java.util.concurrent.locks.Lock 接口,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间 是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定

  • RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出 IllegalMonitorStateException 错误
  • 另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了

其实Redisson底层的实现思路同样是hash+Lua脚本的实现方式,在源码中可以看到,下面列举一下加锁的源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

 公平锁 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一 种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了 当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队 列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个 线程都处于等待状态,那么后面的线程会等待至少25秒
public void useFairLock() {
        RLock fairLock = redissonClient.getFairLock("fairLock");
//        fairLock.lock();

        // 10秒钟以后自动解锁
        // 无需调用unlock方法手动解锁
        fairLock.lock(10, TimeUnit.SECONDS);
        System.out.println("加锁成功"+Thread.currentThread().getName());

        // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
//        boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
//        fairLock.unlock();
    }


加锁成功http-nio-10010-exec-5
加锁成功http-nio-10010-exec-10

可以看到,公平锁会维护一个队列,按发送顺序依次加锁

22641eac255c4cd78315c6cda75863ee.png 

联锁

   在多个redis实例上获取锁,联锁所有的锁都上锁成功才算成功

  @Override
    public void useMutiLock() {

        RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");
        //联锁所有的锁都上锁成功才算成功
        RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1);
        redissonMultiLock.lock();
        System.out.println("业务内容");
        redissonMultiLock.unlock();
    }

红锁

在多个节点上加锁,大部分节点获取锁成功就算成功


   public void useRedLock() {
        RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");
        RedissonRedLock readLock = new RedissonRedLock(lock1);
        // 红锁在大部分节点上加锁成功就算成功
        readLock.lock();
        System.out.println("业务内容");
        readLock.unlock();
    }

读写锁

对读和写上锁,RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口,读-读不阻塞

 public void useReadWriteLock() {
        /**
         * 读-读 不阻塞 读-写 阻塞 写-写 阻塞
         * RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口
         */
        RReadWriteLock rwlock = redissonClient.getReadWriteLock("readWrite");
        // 最常见的读锁
        rwlock.readLock().lock();
        // 写锁
        rwlock.writeLock().lock();
        // 10秒钟以后自动解锁无需调用unlock方法手动解锁
        rwlock.readLock().lock(10, TimeUnit.SECONDS);

        rwlock.writeLock().lock(10, TimeUnit.SECONDS);
        // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
        // boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);

        rwlock.readLock().unlock();
        rwlock.writeLock().unlock();
    }

 信号量

资源限流并发工具类,java.util.concurrent.semaphore是单机版限流,RSemaphore是分布式限流,下面的Semaphore会始终限流3个资源

单机版 

package com.test.lockservice.service.impl;

import java.util.concurrent.Semaphore;

/**
 * @Author sl
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        // 3个有限资源
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try{
                    // 获取资源
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到车位");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()  +"离开车位");
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    // 释放资源
                    semaphore.release();
                }

            }).start();
        }

    }
}

Thread-1抢到车位
Thread-0抢到车位
Thread-4抢到车位
Thread-0离开车位
Thread-1离开车位
Thread-4离开车位
Thread-3抢到车位
Thread-5抢到车位
Thread-2抢到车位
Thread-5离开车位
Thread-3离开车位
Thread-2离开车位

分布式版

  public void useSemaphore() {
        /**
         * RSemaphore 采用了与java.util.concurrent.semaphore相似的接口
         * 资源限流信号量, 3个资源 6个线程, semaphore是单机版限流,RSemaphore是分布式限流
         */
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        try{
            semaphore.acquire();
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    }

闭锁(CountDownLatch

CountDownLatch并发工具类,一个线程等待一组线程结束是一个做减法的倒计时器,RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法,

 单机版

package com.test.lockservice.service.impl;

import java.util.concurrent.CountDownLatch;

/**
 * @Author sl
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t上完自习");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        // 班长等待所有线程同学走完在锁门
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t班长离开,锁门");
    }
}

1	上完自习
3	上完自习
4	上完自习
5	上完自习
2	上完自习
6	上完自习
main	班长离开,锁门

顺道介绍一下CyclicBarrier并发工具类,与CountDownLatch正好相反,它做的是加法

package com.test.lockservice.service.impl;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author sl
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("集齐了卡片,开始召唤神龙");
        });

        for (int i = 0; i < 7; i++) {
            String s = String.valueOf(i);
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t 收集到第"+s+"卡片");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();

        }
    }
}

0	 收集到第0卡片
6	 收集到第6卡片
2	 收集到第2卡片
1	 收集到第1卡片
5	 收集到第5卡片
4	 收集到第4卡片
3	 收集到第3卡片
集齐了卡片,开始召唤神龙

分布式版

 public void useCountDownLatch() {
        /**
         * RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法
         * 一个线程 等待一组线程完事
         * 班长等待所有同学走出门口在锁门 CountDownLatch是单机版 RCountDownLatch是分布式版
         */
        RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");
        latch.trySetCount(6);
        latch.countDown();
        try{
            latch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

关于zookeeper实现分布式锁,在本专栏zookeeper章节中做了简单介绍,就是创建临时顺序节点,值最小的就是锁。 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/971352.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode986. 区间列表的交集(java)

区间列表的交集 题目描述贪心 - 合并区间代码演示 题目描述 难度 - 中等 leetcode986. 区间列表的交集 给定两个由一些 闭区间 组成的列表&#xff0c;firstList 和 secondList &#xff0c;其中 firstList[i] [starti, endi] 而 secondList[j] [startj, endj] 。每个区间列表…

【数学建模竞赛】超详细Matlab二维三维图形绘制

二维图像绘制 绘制曲线图 g 是表示绿色 b--o是表示蓝色/虚线/o标记 c*是表示蓝绿色(cyan)/*标记 ‘MakerIndices,1:5:length(y) 每五个点取点&#xff08;设置标记密度&#xff09; 特殊符号的输入 序号 需求 函数字符结构 示例 1 上角标 ^{ } title( $ a…

Arthas教程 - 命令篇 (二)

目录 一、Attach 黏附一个进程 1.1 准备代码 1.2 启动Demo 1.3 启动arthas 1.4 通过浏览器连接arthas 二、常用命令 2.1 dashboard 仪表盘 2.2 cls 清屏 2.3 thread 线程 2.4 jad 反编译类 2.5 watch 监视 2.6 退出arthas 三、基础命令 3.1 help 3.2 cat 3.3 …

小米新机代号“Manet”:搭载高通8 Gen 3 处理器 + 金属中框设计

根据数码闲聊站和体验more的消息爆料&#xff0c;小米Redmi K70 Pro被代号为“Manet”&#xff0c;将搭载高通SM8650处理器&#xff0c;这是骁龙8 Gen 3移动平台的一部分。该处理器基于台积电N4P工艺制程打造&#xff0c;具有强大的性能表现。 CPU包含1*3.19GHz X45*2.96GHz A7…

Python Opencv实践 - 矩形轮廓绘制(直边矩形,最小外接矩形)

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/stars.png") plt.imshow(img[:,:,::-1])img_gray cv.cvtColor(img, cv.COLOR_BGR2GRAY) #通过cv.threshold转换为二值图 ret,thresh cv.threshold(img_gray,…

Error from server (NotFound): pods “nginx-57d84f57dc-b866m“ not found

原因&#xff1a;机房断电&#xff0c;导致服务重启 [rootmaster1 logs]# kubectl get pod NAME READY STATUS RESTARTS AGE nginx-57d84f57dc-57fkf 1/1 Running 0 75s [rootmaster1 logs]# kubectl logs -f nginx-5…

nginx使用详解

文章目录 一、前言二、nginx使用详解2.1、nginx特点2.2 静态文件处理2.3 反向代理2.4 负载均衡2.5 高级用法2.5.1 正则表达式匹配2.5.2 重定向 三、总结 一、前言 本文将详细介绍nginx的各个功能使用&#xff0c;主要包括 二、nginx使用详解 2.1、nginx特点 高性能&#xff…

【JVM】垃圾收集算法

文章目录 分代收集理论标记-清除算法标记-复制算法标记-整理算法 分代收集理论 当前商业虚拟机的垃圾收集器&#xff0c;大多数都遵循了“分代收集”&#xff08;Generational Collection&#xff09;[1]的理论进 行设计&#xff0c;分代收集名为理论&#xff0c;实质是一套符…

C#循环定时上传数据,失败重传解决方案,数据库标识

有些时候我们需要定时的上传一些数据库的数据&#xff0c;在数据不完整的情况下可能上传失败&#xff0c;上传失败后我们需要定时在重新上传失败的数据&#xff0c;该怎么合理的制定解决方案呢&#xff1f;下面一起看一下&#xff1a; 当然本篇文章只是提供一个思路&#xff0…

windows系统bat脚本调用powershell脚本

前言 项目上有些项目既使用了bat脚本&#xff0c;又使用了powershell脚本&#xff1b; 需要两种脚本配合使用&#xff1b; bat调用powershell 不隐藏窗口运行 bat脚本代码&#xff1a;执行当前路径下的1.ps1脚本文件 start powershell .\1.ps1pause powershell脚本代码&…

025: vue父子组件中传递方法控制:$emit,$refs,$parent,$children

第025个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…

prometheus通过blackbox-exporter监控web站点证书

1 概述 线上站点普遍是https&#xff0c;因此监控https web站点的证书的过期时间&#xff0c;是一个基础性需求。例如&#xff0c;证书过期会导致tls握手失败&#xff0c;进而导致用户无法正常访问web站点。 blackbox-expoter是一个web服务&#xff0c;它暴露了一个接口&#…

如何在面试中处理竞争与压力

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

WebSocket与SSE区别

一&#xff0c;websocket WebSocket是HTML5下一种新的协议&#xff08;websocket协议本质上是一个基于tcp的协议&#xff09; 它实现了浏览器与服务器全双工通信&#xff0c;能更好的节省服务器资源和带宽并达到实时通讯的目的 Websocket是一个持久化的协议 websocket的原理 …

算法笔记:二叉树

1 基本二叉树 二叉树是一种树形数据结构&#xff0c;其中每个节点最多有两个子节点&#xff0c;通常称为“左子节点”和“右子节点”。 二叉树的根是唯一没有父节点的节点&#xff0c;而所有其他节点都有一个父节点和零个或两个子节点。 1.1 基础术语 节点&#xff08;Node&…

服务运营 | MSOR文章精选:远程医疗服务中的统计与运筹(二)

作者信息&#xff1a;王畅&#xff0c;陈盈鑫 编者按 在上一期中&#xff0c;我们分享了与远程医疗中运营管理问题相关的两篇文章。其一发表在《Stochastic Systems》&#xff0c;旨在使用排队论与流体近似的方法解决远程医疗中资源配置的问题&#xff1b;其二发表在《Managem…

R_I相关指令函数(SMART PLC梯形图代码)

大部分小型PLC可能并没有R_I(浮点数转单字)指令&#xff0c;这篇博客我们介绍简单实用的一些转换FC,这些FC其实并不复杂&#xff0c;但是可以大大简化我们的代码量&#xff0c;使代码阅读起来更简介明了。SMART PLC的ABS()指令请查看下面文章链接&#xff1a; PLC绝对值指令AB…

04 Linux补充|C/C++

目录 Linux补充 C语⾔ C语言中puts和printf的区别&#xff1f; Linux补充 (1)ubuntu安装ssh服务端openssh-server命令&#xff1a; ubuntu安装后默认只有ssh客户端&#xff0c;只能去连其它ssh服务器&#xff1b;其它客户端想要连接这个ubuntu系统&#xff0c;需要安装部署…

LLM大模型推理加速 vLLM

参考&#xff1a; https://github.com/vllm-project/vllm https://zhuanlan.zhihu.com/p/645732302 https://vllm.readthedocs.io/en/latest/getting_started/quickstart.html ##文档 加速原理&#xff1a; PagedAttention&#xff0c;主要是利用kv缓存 使用&#xff1a; #…

JVM | Java执行引擎结构及工作原理

引言 Java虚拟机&#xff08;JVM&#xff09;和其复杂性 在我们先前探讨的文章中&#xff0c;我们已经深入到了Java虚拟机&#xff08;JVM&#xff09;的内部&#xff0c;透视了其如何通过元空间存储类的元数据和字节码。JVM的设计初衷是为了实现跨平台兼容性&#xff0c;但随…