第二章_基于redis实现分布式锁

news2024/10/7 14:31:36

基本实现

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。 

  1. 多个客户端同时获取锁(setnx)
  2. 获取成功,执行业务逻辑,执行完成释放锁(del)
  3. 其他客户端等待重试

改造StockService方法

@Service
public class StockService {

    @Autowired
    private StockMapper stockMapper;

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 数据库分布式锁
     */
    public void checkAndLock() {
        // 加锁,获取锁失败重试
        while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx")){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        
        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
        }

        // 释放锁
        this.redisTemplate.delete("lock");
    }

}

其中,加锁

// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx")){
    try {
         Thread.sleep(100);
    } catch (InterruptedException e) {
         e.printStackTrace();
    }
}

解锁

// 释放锁
this.redisTemplate.delete("lock");

使用Jmeter压力测试如下

查看mysql数据库

 防死锁

解决:给锁设置过期时间,自动释放锁。

设置过期时间两种方式:

  1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
  2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

压力测试肯定也没有问题

问题:可能会释放其他服务器的锁。

场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。
  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
  3. index3获取到锁,执行业务逻辑。
  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。

防误删

实现如下

问题:删除操作缺乏原子性。

场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等。
  2. index1执行删除前,lock刚好过期时间已到,被redis自动释放。
  3. index2获取了lock。
  4. index1执行删除,此时会把index2的lock删除。

解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)

redis中的lua脚本

现实问题 

redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。

在串行场景下:A和B的值肯定都是3

在并发场景下:A和B的值可能在0-6之间。

极限情况下1:

则A的结果是0,B的结果是3

极限情况下2

AB的结果都是6 

如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI/ EXEC 包围的事务很类似。

但是MULTI/ EXEC方法来使用事务功能,将一组命令打包执行,无法进行业务逻辑的操作。这期间有某一条命令执行报错(例如给字符串自增),其他的命令还是会执行,并不会回滚。

lua介绍

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

设计目的

其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 特性

  • 轻量级:它用标准C语言编写并以源代码形式开放,编译后仅仅一百余K,可以很方便的嵌入别的程序里。
  • 可扩展:Lua提供了非常易于使用的扩展接口和机制:由宿主语言(通常是C或C++)提供这些功能,Lua可以使用它们,就像是本来就内置的功能一样。
  • 其它特性: 

    ○支持面向过程(procedure-oriented)编程和函数式编程(functional programming);

    ○自动内存管理;只提供了一种通用类型的表(table),用它可以实现数组,哈希表,集合,对象;

    ○语言内置模式匹配;闭包(closure);函数也可以看做一个值;提供多线程(协同进程,并非操作系统所支持的线程)支持;

    ○通过闭包和table可以很方便地支持面向对象编程所需要的一些关键机制,比如数据抽象,虚函数,继承和重载等。

lua基本语法

对lua脚本感兴趣的,可以到官方教程或者《菜鸟教程》。这里仅以redis中可能会用到的部分语法作介绍。

a = 5 -- 全局变量
local b = 5 -- 局部变量, redis只支持局部变量
a, b = 10, 2*x -- 等价于 a=10; b=2*x

流程控制

if( 布尔表达式 1)
then
   --[ 在布尔表达式 1 为 true 时执行该语句块 --]
elseif( 布尔表达式 2)
then
   --[ 在布尔表达式 2 为 true 时执行该语句块 --]
else
   --[ 如果以上布尔表达式都不为 true 则执行该语句块 --]
end

redis执行lua脚本 - EVAL指令

在redis中需要通过eval命令执行lua脚本。

格式

EVAL script numkeys key [key ...] arg [arg ...]
script:lua脚本字符串,这段Lua脚本不需要(也不应该)定义函数。
numkeys:lua脚本中KEYS数组的大小
key [key ...]:KEYS数组中的元素
arg [art ...]:ARGV数组中的元素

案例1:基本案例

EVAL "return 10" 0

案例2:动态传参

EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 输出:10 20 60 70
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20
# 输出:0
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10
# 输出:1

传入了两个参数10和20,KEYS的长度是1,所以KEYS中有一个元素10,剩余的一个20就是ARGV数组的元素。

redis.call()中的redis是redis中提供的lua脚本类库,仅在redis环境中可以使用该类库。

案例3:执行redis类库方法

set aaa 10 -- 设置一个aaa值为10
EVAL "return redis.call('get', 'aaa')" 0
## 通过return把call方法返回给redis客户端,打印:"10"

注意:脚本里使用的所有键都应该由 KEYS 数组来传递。但并不是强制性的,代价是这样写出的脚本不能被 Redis 集群所兼容。

案例4:给redis类库方法动态传参

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 bbb 20 1

学到这里基本可以应付redis分布式锁所需要的脚本知识了。

案例5:pcall函数的使用(了解)

-- 当call() 在执行命令的过程中发生错误时,脚本会停止执行,并返回一个脚本错误,输出错误信息
EVAL "return redis.call('sets', KEYS[1], ARGV[1]), redis.call('set', KEYS[2],
ARGV[2])" 2 bbb ccc 20 30
-- pcall函数不影响后续指令的执行
EVAL "return redis.pcall('sets', KEYS[1], ARGV[1]), redis.pcall('set',
KEYS[2], ARGV[2])" 2 bbb ccc 20 30

 注意:set方法写成了sets,肯定会报错。

性能优化 - EVALSHA指令 

EVAL 命令要求你在每次执行脚本的时候都发送一次脚本主体(script body)。Redis 有一个内部的缓存机制,因此它不会每次都重新编译脚本,不过在很多场合,付出无谓的带宽来传送脚本主体并不是最佳选择。

为了减少带宽的消耗, Redis 实现了 EVALSHA命令,它的作用和 EVAL 一样,都用于执行lua脚
本,但它接受的第一个参数不是脚本,而是脚本的 SHA1 编码。

EVALSHA 命令的表现如下:

如果服务器存在SHA1编码对应的的脚本,那么就会执行这个脚本;如果服务器不存在SHA1编码对应的脚本,那么会返回一个特殊的错误,提醒用户使用 EVAL 代替 EVALSHA。

-- script load会对redis脚本进行sha1加密生成加密字符串,无论脚本多长,密文长度固定,会以密文为key缓存lua基本
SCRIPT LOAD "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}"
-- 通过密文方式执行缓存的lua脚本
EVALSHA a42059b356c875f0717db19a51f6aaca9ae659ea 2 aa bb cc dd
-- 判断缓存中是否存在某个lua脚本,有返回1,无返回0
SCRIPT EXISTS a42059b356c875f0717db19a51f6aaca9ae659ea
-- 删除缓存中的lua脚本
SCRIPT FLUSH

测试

使用lua保证删除原子性 

删除LUA脚本

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',
KEYS[1]) else return 0 end

代码实现

@Service
public class StockService {

    @Autowired
    private StockMapper stockMapper;

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 数据库分布式锁
     */
    public void checkAndLock() {
        // 加锁,获取锁失败重试
        String uuid = UUID.randomUUID().toString();
        
        while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);

        // 再减库存
        if (stock != null && stock.getCount() > 0) {
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
        }

        // 释放锁
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

        this.redisTemplate.execute(new DefaultRedisScript<>(script,
                Long.class), Arrays.asList("lock"), uuid);
    }

}

压力测试

库存量也没有问题

可重入锁 

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行 。

用一段 Java 代码解释可重入

public synchronized void a() {
   b();
}

public synchronized void b() {
   // pass
}

假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。

锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己~。

可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释放。

可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

解决方案:redis + Hash

加锁脚本

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。

if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1],
ARGV[1]) == 1)
then
   redis.call('hincrby', KEYS[1], ARGV[1], 1);
   redis.call('expire', KEYS[1], ARGV[2]);
   return 1;
else
   return 0;
end

假设值为:KEYS:[lock], ARGV[uuid, expire]

如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。

解锁脚本

-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
   return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
   return 0;
else
   redis.call('del', KEYS[1]);
   return 1;
end;

这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:

  • 1 代表解锁成功,锁被释放
  • 0 代表可重入次数被减 1
  • null 代表其他线程尝试解锁,解锁失败

代码实现

由于加解锁代码量相对较多,这里可以封装成一个工具类

具体实现

public class RedisDistributeLock {

    private final StringRedisTemplate redisTemplate;

    /**
     * 线程局部变量,可以在线程内共享参数
     */
    private final String lockName;

    private String uuid;

    private Integer expire = 30;

    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisDistributeLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();

        if (StringUtils.isBlank(uuid)) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
    }

    public void lock() {
        this.lock(expire);
    }

    public void lock(Integer expire) {
        this.expire = expire;
        String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)" +
                "then" +
                " redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
                " redis.call('expire', KEYS[1], ARGV[2]);" +
                " return 1;" +
                "else" +
                " return 0;" +
                "end";

        if (Boolean.FALSE.equals(this.redisTemplate.execute(new DefaultRedisScript<>(script,
                Boolean.class), Collections.singletonList(lockName), uuid, expire.toString()))) {
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void unlock() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                " return nil; " +
                "elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then " +
                " return 0; " +
                "else " +
                " redis.call('del', KEYS[1]); " +
                " return 1; " +
                "end;";

        // 如果返回值没有使用Boolean,Spring-data-redis 进行类型转换时将会把 null转为 false,这就会影响我们逻辑判断
        // 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功。
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>
                (script, Long.class), Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:"
                    + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }

}

使用及测试

在业务代码中使用

public void checkAndLock() {
    // 加锁,获取锁失败重试
    RedisDistributeLock lock = new RedisDistributeLock(this.redisTemplate, "lock");

    lock.lock();

    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);

    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
    }

    // this.testSubLock();
    // 释放锁
    lock.unlock();

}

测试

测试可重入性

自动续期 

lua脚本

if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
   redis.call('expire', KEYS[1], ARGV[2]);
   return 1;
else
   return 0;
end

 RedisDistributeLock中添加renewExpire方法

public class RedisDistributeLock {

    private final StringRedisTemplate redisTemplate;

    /**
     * 线程局部变量,可以在线程内共享参数
     */
    private final String lockName;

    private String uuid;

    private Integer expire = 30;

    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    /**
     * 这里也可以换成线程池
     * private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
     * EXECUTOR_SERVICE.schedule();
     */
    private static final Timer TIMER = new Timer();

    public RedisDistributeLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();

        if (StringUtils.isBlank(uuid)) {
            this.uuid = UUID.randomUUID().toString();
            System.out.println("---------:" + uuid);
            THREAD_LOCAL.set(uuid);
        }
    }

    public void lock() {
        this.lock(expire);
    }

    public void lock(Integer expire) {
        this.expire = expire;
        String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)" +
                "then" +
                " redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
                " redis.call('expire', KEYS[1], ARGV[2]);" +
                " return 1;" +
                "else" +
                " return 0;" +
                "end";

        if (Boolean.FALSE.equals(this.redisTemplate.execute(new DefaultRedisScript<>(script,
                Boolean.class), Collections.singletonList(lockName), uuid, expire.toString()))) {
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //获取锁成功后自动续期
        renewExpire();
    }

    public void unlock() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                " return nil; " +
                "elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then " +
                " return 0; " +
                "else " +
                " redis.call('del', KEYS[1]); " +
                " return 1; " +
                "end;";

        // 如果返回值没有使用Boolean,Spring-data-redis 进行类型转换时将会把 null转为 false,这就会影响我们逻辑判断
        // 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功。
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>
                (script, Long.class), Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:"
                    + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }

        //设置锁之后,把uuid置未空,停止定时任务
        this.uuid = null;
    }

    /**
     * 开启定时器,自动续期
     */
    private void renewExpire() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end";

        TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                // 如果uuid为空,则终止定时任务
                if (StringUtils.isNotBlank(uuid)) {
                    redisTemplate.execute(new DefaultRedisScript<>(script,
                                    Boolean.class), Collections.singletonList(lockName), RedisDistributeLock.this.uuid,
                            expire.toString());
                    renewExpire();
                }
            }
        }, expire * 1000 / 3);
    }

}

在lock方法中使用

在unlock方法中添加红框中的代码

红锁算法

redis集群状态下的问题:

  1. 客户端A从master获取到锁
  2. 在master将锁同步到slave之前,master宕掉了。
  3. slave节点被晋级为master节点
  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。 

安全失效

解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock,在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或
任何其他隐式协调系统。前几节已经描述了如何在单个实例中安全地获取和释放锁,在分布式锁算法中,将使用相同的方法在单个实例中获取和释放锁。将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主服务器,确保它们以独立的方式发生故障。

为了获取锁,客户端执行以下操作

  1. 客户端以毫秒为单位获取当前时间的时间戳,作为起始时间
  2. 客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间,客户端应该设置一个远小于总锁定时间的超时时间。例如,如果自动释放时间为10秒,则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信:如果某个实例不可用,尽快尝试与下一个实例进行通信。
  3. 客户端获取当前时间 减去在步骤1中获得的起始时间,来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁时,并且获取锁所花费的总时间小于锁有效时间,则认为已获取锁。
  4. 如果获取了锁,则将锁有效时间减去获取锁所花费的时间,如步骤3中所计算。
  5. 如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而未能获得该锁,它将尝试解锁所有实例(即使没有锁定成功的实例)。

每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机来产生很小的时钟漂移。只有在拥有锁的客户端将在锁有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移。当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。

值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。

 redisson中的分布式锁

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅
提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap,SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock,AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache,Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。 

官方文档地址:https://github.com/redisson/redisson/wiki

可重入锁(Reentrant Lock) 

基于Redis的Redisson分布式可重入锁 RLock Java对象实现了 java.util.concurrent.locks.Lock
接口。

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。

RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException 错误。

另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

if (res) {
   try {
      ...
   } finally {
      lock.unlock();
   }
}

1. 引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.11.2</version>
</dependency>

2. 添加配置

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        // 可以用"rediss://"来启用SSL连接
        config.useSingleServer().setAddress("redis://172.16.116.100:6379");
        return Redisson.create(config);
    }

}

3. 代码中使用

@Autowired
private RedissonClient redissonClient;

public void checkAndLock() {
    // 加锁,获取锁失败重试
    RLock lock = this.redissonClient.getLock("lock");
    lock.lock();
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);

    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
    }

    // 释放锁
    lock.unlock();
}

4. 压力测试

性能跟我们手写的区别不大。

数据库也没有问题

公平锁(Fair Lock) 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一
种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();

联锁(MultiLock)

基于Redis的Redisson分布式联锁 RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的Redisson实例

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();

 红锁(RedLock)

基于Redis的Redisson红锁 RedissonRedLock 对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个 RLock 对象关联为一个红锁,每个 RLock 对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

读写锁(ReadWriteLock)

基于Redis的Redisson分布式可重入读写锁 RReadWriteLock Java对象实java.util.concurrent.locks.ReadWriteLock 接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

添加StockController方法

@GetMapping("/test/read")
public String testRead(){
    String msg = stockService.testRead();
    return "测试读";
}

@GetMapping("/test/write")
public String testWrite(){
    String msg = stockService.testWrite();
    return "测试写";
}

添加StockService方法

public String testRead() {
    RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");
    rwLock.readLock().lock(10, TimeUnit.SECONDS);
    System.out.println("测试读锁。。。。");
    // rwLock.readLock().unlock();
    return null;
}

public String testWrite() {
    RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");
    rwLock.writeLock().lock(10, TimeUnit.SECONDS);
    System.out.println("测试写锁。。。。");
    // rwLock.writeLock().unlock();
    return null;
}

打开开两个浏览器窗口测试:

  • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
  • 同时访问读:不用等待
  • 先写后读:读要等待(约10s)写完成
  • 先读后写:写要等待(约10s)读完成

信号量(Semaphore)

基于Redis的Redisson的分布式信号量(Semaphore)Java对象 RSemaphore 采用了与java.util.concurrent.Semaphore 相似的接口和用法。同时还提供了异步(Async)、反射式 (Reactive)和RxJava2标准的接口。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
semaphore.release();

在StockController添加方法

@GetMapping("test/semaphore")
public String testSemaphore(){
    this.stockService.testSemaphore();
    return "测试信号量";
}

在StockService添加方法

public void testSemaphore() {
    RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");
    semaphore.trySetPermits(3);

    try {
        semaphore.acquire();

        TimeUnit.SECONDS.sleep(5);
        System.out.println(System.currentTimeMillis());

        semaphore.release();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

添加测试用例:并发10次,循环一次

控制台效果

由此可知:
1606960790秒有3次请求进来:每个控制台各1次
1606960795秒有3次请求进来:控制台2有1次,控制台3有2次
1606960800秒有3次请求进来:控制台1有2次,控制台2有1次
1606960805秒有1次请求进来:控制台1有1次 

闭锁(CountDownLatch)

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象 RCountDownLatch 采用了与
java.util.concurrent.CountDownLatch 相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

需要两个方法:一个等待,一个计数countDown

给StockController添加测试方法

@GetMapping("test/latch")
public String testLatch(){
    this.stockService.testLatch();
    return "班长锁门。。。";
}

@GetMapping("test/countdown")
public String testCountDown(){
    this.stockService.testCountDown();
    return "出来了一位同学";
}

给StockService添加测试方法

public void testLatch() {
    RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");
    latch.trySetCount(6);

    try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public void testCountDown() {
    RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");
    latch.trySetCount(6);
    latch.countDown();
}

重启测试,打开两个页面:当第二个请求执行6次之后,第一个请求才会执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/650243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序WE分析----事件分析

目录 web分析-小程序 事件分析概述 1.新增事件管理 事件参数说明 (1) 填写事件配置 (2)小程序添加上报代码&#xff1a;将上报代码添加到小程序中 (3)测试事件数据上报&#xff1a;测试事件上报的数据是否正确。 属性管理 字典管理 新增事件分析 创建事件分析 添加事件指…

Java+Swing+mysql员工工资管理系统2.0

JavaSwingmysql员工工资管理系统2.0 一、系统介绍二、功能展示1.用户登陆2.主页3.员工工资查询4.员工工资添加5.员工工资修改6.员工工资删除 三、系统实现1.salary.java 四、其它系统五、获取源码 一、系统介绍 该系统实现了简单的增删查改、用户登陆、员工工资查询、员工工资…

美联储缩表意味着什么?

What does the Feds balance sheet reduction mean? 这里的表是资产负债表&#xff0c;Balance sheet. 美联储&#xff08;Federal Reserve&#xff09;作为全球影响力最大的央行&#xff0c;其在货币政策上做出的一些调整&#xff0c;可能就会引起全球经济和金融市场动荡&am…

项目测试排期的正确方法是什么?

测试排期是项目排期里面的一部分&#xff0c;所以了解项目排期对整体产品的全貌会有一个宏观的认知&#xff0c;甘特图能很好的体现项目排期&#xff0c;里面包含了参与角色和每个角色对应的排期。项目参与者和项目责任人都可以清晰的看到项目当前进展和项目耗时等。 甘特图可…

智能监控系统:在线培训考试系统的保障

随着互联网技术的不断发展&#xff0c;越来越多的培训机构和教育机构采用在线学习和考试的方式进行教学。然而&#xff0c;考试中的作弊问题也随之产生&#xff0c;给教育质量和学术诚信带来了挑战。为了解决这一问题&#xff0c;许多在线培训考试系统引入了智能监控系统。 智…

邓铎:探索书法艺术的新境界

中国书画院院士邓铎&#xff0c;是一位在书法艺术领域拥有深刻理解和丰富实践经验的老者。他的作品随心所欲&#xff0c;个性鲜明&#xff0c;具备独特的审美品味和艺术手法&#xff0c;更有重要的理论创新&#xff0c;让书法艺术大放光彩。 邓铎的书法作品在形式上追求“形似象…

【无标题】面试常考算法(3):二叉树遍历(创建、遍历、销毁)

这部分不够熟悉的话&#xff0c;面试直接递归就行。不过实际中虽然递归在某些情况下可以提供简洁和优雅的解决方案&#xff0c;但可能占用大量的内存空间和导致额外时间开销&#xff0c;所以还是尽量使用非递归。因为每次递归调用时&#xff0c;函数的局部变量和参数都需要在栈…

迭代器模式(十九)

相信自己&#xff0c;请一定要相信自己 上一章简单介绍了访问者模式(十八), 如果没有看过, 请观看上一章 一. 迭代器模式 引用 菜鸟教程里面迭代器模式介绍: https://www.runoob.com/design-pattern/iterator-pattern.html 迭代器模式&#xff08;Iterator Pattern&#xff…

实战:私有化部署ngin+文件步骤记录

这里写目录标题 背景准备总结 背景 出差到某国企进行私有化部署&#xff0c;一波三折。没想到是那种最麻烦的部署&#xff0c;导入文件需要刻光盘&#xff0c;进入电脑房需要上交手机&#xff0c;不允许有人以及拍摄设备&#xff0c;内部有监控摄像头。 有问题怎么办&#xf…

SYSU程设c++(第十六周)

set set<int> st; 会自动排序升序 如果降序可以set<int, greater<int>> s; map map<string,int> m; 会按键进行升序 m["uiui"]100; map<string,int>::iterator it; for(itm.begin();it!m.end();it){ cout<<"键&qu…

Spark大数据处理学习笔记(3.3)掌握RDD分区

该文章主要为完成实训任务&#xff0c;详细实现过程及结果见【http://t.csdn.cn/OmCQ8】 文章目录 一、概念二、自定义分区器2.1 提出问题2.2 解决问题1. 准备数据文件2. 新建科目分区器3. 测试科目分区器 三、课后作业 一、概念 在Spark中&#xff0c;RDD&#xff08;弹性分布…

长文|基于Zabbix的可观测性监控

01 可观测性与可观测性监控 02 基于ZABBIX的可观测性监控 03 可观测性监控的探索 ——王小东&#xff0c;多年运维老兵&#xff0c;《nginx应用与运维实战》作者 本文整理自王小东在2022Zabbix峰会演讲分享。ppt可在公众号后台回复“ppt"。 1、可观测性与可观测性监控…

拷贝构造函数

拷贝构造函数 以值传递的方式调用函数时&#xff0c;如果实参为对象&#xff0c;会调用拷贝构造函数。函数以值的方式返回对象时&#xff0c;可能会调用拷贝构造函数&#xff08;VS会调用&#xff0c;Linux不会&#xff0c;g编译器做了优化&#xff09; 类似于构造函数和析构函…

Python控制流程盘点及高级用法、神秘技巧大揭秘!

在这篇文章中我们将全面深入地介绍 Python 的控制流程&#xff0c;包括条件语句、循环结构和异常处理等关键部分&#xff0c;尤其会将列表解析、生成器、装饰器等高级用法一网打尽。此外&#xff0c;我还将分享一些独特的见解和研究发现&#xff0c;希望能给你带来新的启发。文…

让你不再疑惑图片翻译怎么弄

你是否曾遇到过在阅读外语文章或资料时&#xff0c;遇到了图片上的文字无法翻译的困扰&#xff1f;别担心&#xff0c;如果你还不知道如何翻译图片上的文字的话&#xff0c;接下来我将教你三种图片翻译的实用小技巧&#xff0c;一起来看看吧。 翻译图片的实用方法一&#xff1a…

Python语法基础01(列表,元组,字典)

Python基础语法 变量的命名与使用 变量名只能包含字母、数字和下划线&#xff0c;只能以字母和下划线为开头不能包含空格不能使用python保留字 列表&#xff0c;元组&#xff0c;字典 列表 定义列表(元素之间可以没有任何关系)&#xff1a;[] 例如 fruits["apple&qu…

oracle如何才能卸载干净

windows系统下oracle如何才能卸载干净 1.关闭oracle所有的服务。2.删除注册表中相关信息3.删除注册表中相关Oracle安装信息4.删除注册的oracle事件日志5.删除环境变量path中关于oracle的内容6.重新启动操作系统7.删除Oracle_Home下的所有数据8.删除oracle安装目录。9.删除开始菜…

灰度图像逻辑运算之逻辑或

目录 note code test note out max(x1,x2) code void img_logic_or_fun(uchar& in1, uchar& in2, uchar& out) {out in1 > in2 ? in1 : in2; } void img_logic_or(Mat& src1, Mat& src2, Mat& res) {if (src1.size() ! src2.size()) {retur…

基于html+css的图展示128

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

【微信支付】微信v3支付案例,SpringBoot集成IJPay实现微信v3支付

前言 这篇文章主要实现一下通过IJPay来实现微信v3支付案例&#xff0c;本篇文章使用的是JSAPI即小程序支付 IJPay码云仓库&#xff1a;https://gitee.com/javen205/IJPay/tree/dev IJPay官方文档&#xff1a;https://javen205.gitee.io/ijpay/ 准备工作 导入依赖 <depen…