分布式锁 — Redisson 全面解析!

news2024/11/22 16:48:16

前言

分布式锁主要是解决集群,分布式下数据一致性的问题。在单机的环境下,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatileReentrantLocksynchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。

分布式锁的实现主要有以下方式:

  • 基于数据库

  • 基于分布式协调系统

  • 基于缓存

    • 基于redis命令。如:setnx等操作

    • 基于redis Lua脚本能力(本文介绍的实现方式 redisson)

一、Redisson的使用

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例

引入maven依赖

<!-- maven版本号根据项目版本自行调整 -->
<!--redis-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.4.0</version>
</dependency>
<!--使用redisson作为分布式锁-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.8</version>
</dependency>

yml配置

spring:
  redis:
    # Redis数据库索引(默认为0)
    database: 0
    # Redis服务器地址
    host: 127.0.0.1
    # Redis服务器连接端口
    port: 6379
    # Redis服务器链接密码(默认为空)
    password:
    jedis:
      pool:
        # 连接池最大链接数(负值表示没有限制)
        max-active: 20
        # 连接池最大阻塞等待时间(负值表示没有限制)
        max-wait: -1
        # 链接池中最大空闲链接
        max-idle: 10
        # 连接池中最小空闲链接
        min-idle: 0
    # 链接超市时间(毫秒)
    timeout: 1000

Redisson配置

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String redisHost;
    @Value("${spring.redis.password}")
    private String redisPassword;
    @Value("${spring.redis.port}")
    private String port;

    @Bean
    @ConditionalOnMissingBean
    public RedissonClient redissonClient() {
        Config config = new Config();
        //单机模式  依次设置redis地址和密码
        System.out.println(redisHost);
        // config.useSingleServer().setAddress("redis://" + redisHost + ":" + port).setPassword(redisPassword);
        // 没有配置redis密码
        config.useSingleServer().setAddress("redis://" + redisHost + ":" + port);
        return Redisson.create(config);
    }
}

测试用例

package com.example.aopdemo;

import com.example.aopdemo.springbootaopdemo.SpringBootDemoxzApplication;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @ClassName RedissonTest
 * @Description Redisson测试用例
 * @Author 阿Q
 * @Date 2022/11/26
 */
@Slf4j
@SpringBootTest(classes = SpringBootDemoxzApplication.class)
public class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private ThreadPoolTaskExecutor executor;

    // redisson分布式锁的key
    private static final String LOCK_TEST_KEY = "redisson:lock:test";

    int n = 500;

    /**
     * 分布式锁测试用例
     */
    @Test
    public void lockTest() {
        // 利用 循环+多线程 模仿高并发请求
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                // 这里获取公平锁,遵循先进先出原则,方便测试
                RLock fairLock = redissonClient.getFairLock(LOCK_TEST_KEY);

                try {
                    // 尝试加锁
                    // waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
                    // leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
                    boolean lock = fairLock.tryLock(3000, 30, TimeUnit.MILLISECONDS);
                    if (lock){
                        log.info("线程:" + Thread.currentThread().getName() + "获得了锁");
                        log.info("剩余数量:{}", --n);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();

                } finally {
                    log.info("线程:" + Thread.currentThread().getName() + "准备释放锁");
                    // 注意,无论出现任何情况,都要主动解锁
                    fairLock.unlock();
                }
            });
        }

        try {
            // ->_-> 这里使当前方法占用的线程休息10秒,不要立即结束
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

二、Redisson源码分析

redisson这个框架的实现依赖了Lua脚本和Netty,以及各种Future及FutureListener的异步、同步操作转换,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能

简单了解Lua脚本(Redisson 版本 3.16.8)

想要真正读懂redisson底层的加锁解锁实现,基本的lua脚本还是要了解一下的,这里作者就不深入了(毕竟我也是门外汉),大家有兴趣的可以去深入一下,这里只针对锁的实现来解释一下。另外,搜索公众号Linux就该这样学后台回复“猴子”,获取一份惊喜礼包。

加锁脚本
  • KEYS[1] 锁的名字

  • ARGV[1] 锁自动失效时间(毫秒,默认30s(看门狗续期时长))

  • ARGV[2] hash子项的key(uuid+threadId)

--如果锁不存在
if (redis.call('exists', KEYS[1]) == 0) then
--重入次数初始为0后加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--设锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表加锁成功
return nil;
--结束符
end;
--如果加锁的进程已存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--重入次数加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--更新锁的过期时间(毫秒)
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表重入成功
return nil;
--结束符
end;
--返回锁的剩余时间(毫秒)-代表加锁失败
return redis.call('pttl', KEYS[1]);

图片

结论:当且仅当返回nil,才表示加锁成功;

解锁脚本

  • KEYS[1] 锁的名字

  • KEYS[2] 发布订阅的channel=redisson_lock__channel:{lock_name}

  • ARGV[1] 发布订阅中解锁消息=0

  • ARGV[2] 看门狗续期时间

  • ARGV[3] hash子项的key=uuid+threadId

--如果锁不存在
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
--返回null-代表解锁成功
return nil;
end;
--重入次数减一
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
--如果重入次数不为0,对锁进行续期(使用看门狗的续期时间,默认续期30s)
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
--返回0-代表锁的重入次数减一,解锁成功
return 0;
--否则重入次数<=0
else
--删除key
redis.call('del', KEYS[1]);
--向channel中发布删除key的消息
redis.call('publish', KEYS[2], ARGV[1]);
--返回1-代表锁被删除,解锁成功
return 1;
end;
return nil;

图片

结论:当且仅当返回1,才表示当前请求真正解锁;

看门口续期lua脚本

io.netty.util.TimerTask每10秒执行一次(30(续期时间)/3)

  • KEYS[1] 锁的名字

  • ARGV[1]

--自己加的锁存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--续期
redis.call('pexpire', KEYS[1], ARGV[1]);
--1代表续期成功
return 1;
end;
--自己加的锁不存在,后续不需要再续期
return 0;

源码鉴赏

加锁逻辑

// tryLock 是Redisson加锁的核心代码,在这里,我们基本可以了解加锁的整个逻辑流程
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 获取锁能容忍的最大等待时长
    long time = unit.toMillis(waitTime);
    // 获取当前系统时间 - 节点一
    long current = System.currentTimeMillis();
    // 获取当前线程id
    long threadId = Thread.currentThread().getId();
    
    // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    
   // 剩余等待时长 =   最大等待时长-(当前时间-节点一)
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    
    // 【核心点2】订阅解锁消息
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        // 以阻塞的方式获取订阅结果,最大等待时常time
        subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
    } catch (ExecutionException | TimeoutException e) {
        // 判断异步任务是否不存在
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.whenComplete((res, ex) -> {
                // 异步任务出现异常,取消订阅
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        // 剩余等待时长
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
    
    // 循环获取锁      
        while (true) {
            long currentTime = System.currentTimeMillis();
            // 再次获取锁,成功则返回
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            currentTime = System.currentTimeMillis();
            
            // 【核心点3】阻塞等待信号量唤醒或者超时,接收到订阅时唤醒
         // 使用的是Semaphore#tryAcquire()
           // 判断 锁的占有时间(ttl)是否小于等待时间  
            if (ttl >= 0 && ttl < time) {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 因为是同步操作,所以无论加锁成功或失败,都取消订阅
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

接下来,我们再一起看一下 tryAcquire()

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // get方法就是以线程阻塞的方式获取结果,这里不再展示,有兴趣的朋友可以自行查看源码
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
/**
 * 异步的方式尝试获取锁
 */
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
     
        // 占有时间等于 -1 表示会一直持有锁,直到业务进行完成,主动解锁(这里就显示出了finally的重要性)
        if (leaseTime != -1) {
            // 【核心点4】这里就是直接使用lua脚本
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            // lock acquired
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
/**
 * 兄弟们看到这里应该就恍然大悟了吧,redisson最底层就是lua脚本的直接调用
 * 这里就不作说明了,上面的lua脚本分析已经说的很明白了
 */
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

解锁逻辑

/**
* 解锁逻辑
*/
@Override
public void unlock() {
    try {
        // 相信看来上面的逻辑,兄弟们应该已经很熟悉redisson作者的编码习惯了,照例get还是以线程阻塞的方式获取结果
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}
// 异步解锁
@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 调用异步解锁方法
 // unlockInnerAsync() 是抽象方法,所有的实现方法都是直接调用lua脚本,这里不做展示,感兴趣的同学可以自行查看
 // 调用真正的实现取决于上面我们创建什么样的lock对象        
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1565058.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM_垃圾收集器

GC垃圾收集器 文章目录 GC垃圾收集器GC垃圾回收算法和垃圾收集器关系GC算法主要有以下几种四种主要的垃圾收集器SerialParallelCMSG1垃圾收集器总结查看默认垃圾收集器 默认垃圾收集器有哪些各垃圾收集器的使用范围部分参数说明 新生代下的垃圾收集器并行GC(ParNew)并行回收GC&…

[Python GUI PyQt] PyQt5快速入门

PyQt5快速入门 PyQt5的快速入门0. 写在前面1. 思维导图2. 第一个PyQt5的应用程序3. PyQt5的常用基本控件和布局3.1 PyQt5的常用基本控件3.1.1 按钮控件 QPushButton3.1.2 文本标签控件 QLabel3.1.3 单行输入框控件 QLineEdit3.1.4 A Quick Widgets Demo 3.2 PyQt5的常用基本控件…

morkdown语法转微信公众号排版(免费)

morkdown语法转微信公众号排版&#xff08;免费&#xff09; 源码来自githab&#xff0c;有些简单的问题我都修复了。大家可以直接去找原作者的源码&#xff0c;如果githab打不开就从我下载的网盘里下载吧。 效果

在制定OKR的过程中,应该怎么确定目标O的来源或方向?

在制定OKR&#xff08;Objectives and Key Results&#xff0c;目标与关键成果&#xff09;的过程中&#xff0c;确定目标O的来源或方向是至关重要的一步。一个明确、合理的目标能够为团队指明方向&#xff0c;激发团队成员的积极性和创造力&#xff0c;进而推动公司的整体发展…

【嵌入式智能产品开发实战】(十五)—— 政安晨:通过ARM-Linux掌握基本技能【GNU C标准与编译器】

目录 GNU C 什么是C语言标准 C语言标准的内容 C语言标准的发展过程 1.K&R C 2.ANSI C 3.C99标准 4.C11标准 编译器对C语言标准的支持 编译器对C语言标准的扩展 政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: 嵌入式智能产品…

信息技术学院大数据技术专业开展专业实训周

四川城市职业学院讯&#xff08;信息技术学院 陈天伟&#xff09;日前&#xff0c;为提升学生的工匠精神和职业认知&#xff0c;信息技术学院邀请企业专家入驻眉山校区大数据实训基地&#xff0c;开展数据标识专业实训周。 数据标识是大数据专业的核心技术&#xff0c;数据标识…

在CentOS 7上安装Python 3.7.7

文章目录 一、实战步骤1. 安装编译工具2. 下载Python 3.7.7安装包3. 上传Python 3.7.7安装包4. 解压缩安装包5. 切换目录并编译安装6. 配置Python环境变量7. 使配置生效8. 验证安装是否成功 二、实战总结 一、实战步骤 1. 安装编译工具 在终端中执行以下命令 yum -y groupin…

24年大一训练一(东北林业大学)

前言&#xff1a; 周五晚上的训练赛&#xff0c;以后应该每两周都会有一次。 正文&#xff1a; Problem:A矩阵翻转&#xff1a; #include<bits/stdc.h> using namespace std; int a[55][55]; int main(){int n,m;while(cin>>n>>m){for(int i1;i<n;i){for…

1.Git是用来干嘛的

本文章学习于【GeekHour】一小时Git教程&#xff0c;来自bilibili Git就是一个文件管理系统&#xff0c;这样说吧&#xff0c;当多个人同时在操作一个文件的同时&#xff0c;很容易造成紊乱&#xff0c;git就是保证文件不紊乱产生的 包括集中式管理系统和分布式管理系统 听懂…

每日一题:用c语言写(输入n个数(n小于等于100),输出数字2的出现次数)

目录 一、要求 二、代码 三、结果 ​四、注意 一、要求 二、代码 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> int main() {//输入n个数&#xff08;n小于等于100&#xff09;&#xff0c;输出数字2的出现次数;int n[100] ;int num 0;int count 0;/…

加域报错:找不到网络路径

在尝试将计算机加入Windows域时&#xff0c;如果收到“找不到网络路径”的错误提示&#xff0c;可能的原因及解决方法如下&#xff1a; 网络连接问题&#xff1a;确保计算机与域控制器之间的物理网络连接是正常的&#xff0c;可以通过ping命令测试与域控制器的连通性。例如&…

【黑马头条】-day05延迟队列文章发布审核-Redis-zSet实现延迟队列-Feign远程调用

文章目录 昨日回顾今日内容1 延迟任务1.1 概述1.2 技术对比1.2.1 DelayQueue1.2.2 RabbitMQ1.2.3 Redis实现1.2.4 总结 2 redis实现延迟任务2.0 实现思路2.1 思考2.2 初步配置实现2.2.1 导入heima-leadnews-schedule模块2.2.2 在Nacos注册配置管理leadnews-schedule2.2.3 导入表…

【单片机家电产品学习记录--红外线】

单片机家电产品学习记录–红外线 红外手势驱动电路&#xff0c;&#xff08;手势控制的LED灯&#xff09; 原理 通过红外线对管&#xff0c;IC搭建的电路&#xff0c;实现灯模式转换。 手势控制灯模式转换&#xff0c;详细说明 转载 1《三色调光LED台灯电路》&#xff0c…

大数据学习第十一天(复习linux指令3)

1、su和exit su命令就是用于账户切换的系统命令 基本语法&#xff1a;su[-] [用户名] 1&#xff09;-表示是否在切换用户后加载变量&#xff0c;建议带上 2&#xff09;参数&#xff1a;用户名&#xff0c;表示切换用户 3&#xff09;切换用户后&#xff0c;可以通过exit命令退…

Redhat 7.9 安装dm8配置文档

Redhat 7.9 安装dm8配置文档 一 创建用户 groupadd -g 12349 dinstall useradd -u 12345 -g dinstall -m -d /home/dmdba -s /bin/bash dmdba passwd dmdba二 创建目录 mkdir /dm8 chown -R dmdba:dinstall /dm8三 配置/etc/security/limits.conf dmdba soft nproc 163…

二叉树结点关键字输出的递归算法实现

在计算机科学中&#xff0c;二叉树是一种重要的数据结构&#xff0c;广泛应用于各种算法和程序设计中。二叉树的遍历是二叉树操作中的基础问题之一&#xff0c;其目的是以某种规则访问二叉树的每个结点&#xff0c;使得每个结点被且仅被访问一次。给定一个具有n个结点的二叉树&…

idea端口占用

报错&#xff1a;Verify the connector‘s configuration, identify and stop any process that‘s listening on port XXXX 翻译&#xff1a; 原因&#xff1a; 解决&#xff1a; 一、重启大法 二、手动关闭 启动spring项目是控制台报错&#xff0c;详细信息如下&#xff…

C++的并发世界(四)——线程传参

1.全局函数作为传参入口 #include <iostream> #include <thread> #include <string>void ThreadMain(int p1,float p2,std::string str) {std::cout << "p1:" << p1 << std::endl;std::cout << "p2:" <<…

css3之动画animation

动画animation 一.优点二.定义和使用三.动画序列和解释四.常见属性及解释1.常见属性及应用2.速度曲线细节 五.简写&#xff08;名字和时间不能省略&#xff09;&#xff08;持续时间在何时开始的时间前&#xff09;&#xff08;简写中无animation-play-state)六.例子1.大数据热…

图神经网络实战(7)——图卷积网络(Graph Convolutional Network, GCN)详解与实现

图神经网络实战&#xff08;7&#xff09;——图卷积网络详解与实现 0. 前言1. 图卷积层2. 比较 GCN 和 GNN2.1 数据集分析2.2 实现 GCN 架构 小结系列链接 0. 前言 图卷积网络 (Graph Convolutional Network, GCN) 架构由 Kipf 和 Welling 于 2017 年提出&#xff0c;其理念是…