8. 【Redisson源码】分布式信号量RSemaphore

news2025/1/13 15:50:49

目录

一、RSemaphore的使用

二、RSemaphore设置许可数量

三、RSemaphore的加锁流程

四、RSemaphore的解锁流程


【本篇文章基于redisson-3.17.6版本源码进行分析】

基于Redis的Redisson的分布式信号量RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。

一、RSemaphore的使用

@Test
public void testRSemaphore() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 设置5个许可,模拟五个停车位
    rSemaphore.trySetPermits(5);

    // 创建10个线程,模拟10辆车过来停车
    for (int i = 1; i <= 10; i++) {
        new Thread(() -> {
            try {
                rSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "进入停车场...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
                System.out.println(Thread.currentThread().getName() + "离开停车场...");
                rSemaphore.release();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A" + i).start();
    }

    try {
        TimeUnit.MINUTES.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

二、RSemaphore设置许可数量

初始化RSemaphore,需要调用trySetPermits()设置许可数量:

/**
 * 尝试设置许可数量,设置成功,返回true,否则返回false
 */
boolean trySetPermits(int permits);

trySetPermits()内部调用了trySetPermitsAsync():

// 异步设置许可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判断分布式信号量的key是否存在,如果不存在,才设置
            "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false) then "
                    // set "semaphore" permits
                    // 使用String数据结构设置信号量的许可数
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    // 发布一条消息到redisson_sc:{semaphore}通道
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    // 设置成功,返回1
                    + "return 1;"
                    + "end;"
                    // 否则返回0
                    + "return 0;",
            Arrays.asList(getRawName(), getChannelName()), permits);

    if (log.isDebugEnabled()) {
        future.thenAccept(r -> {
            if (r) {
                log.debug("permits set, permits: {}, name: {}", permits, getName());
            } else {
                log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
            }
        });
    }
    return future;
}

可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:

参数说明:

  • KEYS[1]: 我们指定的分布式信号量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
  • KEYS[2]: 释放锁的channel名称,redisson_sc:{分布式信号量key},在本例中,就是redisson_sc:{semaphore}
  • ARGV[1]: 设置的许可数量

总结设置许可执行流程为:

  • get semaphore,获取到semaphore信号量的当前的值
  • 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3。(注意到,如果之前设置过了信号量,将无法再次设置,直接返回0。想要更改信号量总数可以使用addPermits方法)
  • 然后redis发布一些消息,返回1

三、RSemaphore的加锁流程

许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。

public void acquire() throws InterruptedException {
    acquire(1);
}

public void acquire(int permits) throws InterruptedException {
    // 尝试获取锁成功,直接返回
    if (tryAcquire(permits)) {
        return;
    }

    // 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息
    CompletableFuture<RedissonLockEntry> future = subscribe();
    semaphorePubSub.timeout(future);
    RedissonLockEntry entry = commandExecutor.getInterrupted(future);
    try {
        // 不断循环尝试获取许可
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }

            entry.getLatch().acquire();
        }
    } finally {
        // 取消订阅
        unsubscribe(entry);
    }
//        get(acquireAsync(permits));
}

可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。

我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:

// 异步获取许可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>(true);
    }

    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              // 获取当前剩余的许可数量
              "local value = redis.call('get', KEYS[1]); " +
              // 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量        
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  // 通过decrby减少剩余可用许可    
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  // 返回1    
                  "return 1; " +
              "end; " +
              // 其它情况,返回0        
              "return 0;",
              Collections.<Object>singletonList(getRawName()), permits);
}

从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;

总结加锁执行流程为:

  • get semaphore,获取到一个当前的值,比如说是3,3 > 1
  • decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
  • decrby semaphore 1
  • decrby semaphore 1
  • 执行3次加锁后,semaphore值为0
  • 此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁

四、RSemaphore的解锁流程

通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:

public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>((Void) null);
    }

    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            // 通过incrby增加许可数量
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    // 发布一条消息到redisson_sc:{semaphore}中
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(o -> {
            log.debug("released, permits: {}, name: {}", permits, getName());
        });
    }
    return future;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/169642.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从首个「数实融合」公益球场,看元宇宙奏响创新「三重奏」

作者 | 曾响铃 文 | 响铃说 2022年的元宇宙&#xff0c;一半是海水&#xff0c;一半是火焰。 一边是刮起元宇宙热潮的Roblox股价跌去大半&#xff0c;Meta也因元宇宙亏损深陷泥潭。另一边&#xff0c;经过2021年元宇宙概念落地和普及&#xff0c;2022年却也是元宇宙相关产业…

分享86个PHP源码,总有一款适合您

PHP源码 分享86个PHP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 86个PHP源码下载链接&#xff1a;https://pan.baidu.com/s/1fsoGdkr_-wZUaJvVMOlihQ?pwdlhyo 提取码&#xff…

Java 泛型是什么?一文带你吃透泛型

文章目录1. Java 泛型2. 泛型类3. 泛型接口4. 泛型方法5. 泛型集合Java编程基础教程系列1. Java 泛型 Java 泛型是 JDK1.5 中引入的一个新特性&#xff0c;其本质是参数化类型&#xff0c;把类型作为参数传递。其主要的形式有泛型类&#xff0c;泛型接口和泛型方法。泛型概念的…

sqoop安装(linux)

一、前期准备安装好hadoop伪分布安装好MySQL下载sqoop压缩文件实验环境&#xff1a;实验环境版本CentOS 6.5MySQL5.7.37hadoop3.3.0sqoop1.4.7sqoop1.4.7 下载链接&#xff1a;https://pan.baidu.com/s/16AUdtBmSv7OG2PTyA1XcgQ?pwdqu7lmysql驱动包下载地址&#xff1a;https:…

易于设置的倒计时页面Easy countdown

今天开始放假了 什么是 Easy countdown &#xff1f; Easy countdown 是一个易于设置的倒计时页面。可以设置为倒计时或计时器。 先看看官方提供的动图 安装 在群晖上以 Docker 方式安装。 在注册表中搜索 easy-countdown &#xff0c;选择第一个 yooooomi/easy-countdown&am…

【前端学习指南】基础开发环境搭建

&#x1f36d; Hello&#xff0c;我是爱吃糖的范同学 邻近春节&#xff0c;虽然学校的事情已经处理的差不多了&#xff0c;又开始要忙着找实习......时间安排上还是有很多问题&#xff0c;希望大家多多包涵&#xff0c;我已经加班加点在写作了&#x1f602;&#x1f602;&…

高盐废水如何处理,离子交换树脂在高盐废水中的应用

什么是高盐废水&#xff1f; 高盐废水是工业废水中较常见的一种&#xff0c;它是指总含盐量(以NaCl计&#xff09;至少为1%的废水&#xff0c;属于难处理的废水之一。 高盐废水中的总溶解固体物TDS&#xff0c;多在10000-25000mg/L&#xff0c;含盐成分复杂&#xff0c;有Na、…

vue3中echarts组件的最佳封装形式

项目中经常用到echarts&#xff0c;不做封装直接拿来使用也行&#xff0c;但不可避免要写很多重复的配置代码&#xff0c;封装稍不注意又会过度封装&#xff0c;丢失了扩展性和可读性。始终没有找到一个好的实践&#xff0c;偶然看到一篇文章&#xff0c;给了灵感。找到了一个目…

【数据结构】并查集

目录1.概述2.代码实现3.应用本文参考&#xff1a; LABULADONG 的算法网站 《数据结构教程》&#xff08;第 5 版&#xff09;李春葆主编 1.概述 &#xff08;1&#xff09;并查集支持查找一个元素所属的集合以及两个元素各自所属的集合的合并运算。当给出两个元素的一个无序对…

氨氮废水如何处理,离子交换树脂在氨氮废水中的应用点

近几年来重点污染源考核结果及地表水监测结果表明&#xff0c;氨氮超标现象仍较严重。认清氨氮的来源&#xff0c;了解其危害&#xff0c;采取有效的处理措施成为保护水环境不被氨氮污染的必要环节。 北京科海思科技有限公司利用离子交换特种树脂可以做到有针对性的氨氮的去除…

vue2之生命周期

生命周期 生命周期是指组件从创建&#xff0c;运行到销毁的阶段。而生命周期函数&#xff08;也叫生命周期钩子&#xff09;是vue在关键的时刻帮我们调用的一些特殊名称的函数&#xff0c;会根据生命周期的阶段&#xff0c;依次执行。 beforeCreatecreatedbeforeMountmountedb…

基本的SELECT语句与显示表结构

文章目录基本的SELECT语句SELECT...SELECT ... FROM列的别名去除重复行空值参与运算着重号查询常数(查询同时添加常数字段)显示表结构过滤数据练习题基本的SELECT语句 SELECT… SELECT 11, 22;# 直接这样写相当于下面这句 SELECT 11, 22 FROM DUAL; # 这里DUAL&#xff1a;伪…

【云攻防系列】从攻击者视角聊聊K8S集群安全(上)

前言 作为云原生管理与编排系统的代表&#xff0c;Kubernetes&#xff08;简称K8S&#xff09;正受到越来越多的关注&#xff0c;有报告[1]显示&#xff0c;96% 的组织正在使用或评估 K8S&#xff0c;其在生产环境下的市场占有率可见一斑。 K8S 的功能十分强大&#xff0c;其…

day38【代码随想录】动态规划之斐波那契数、爬楼梯、使用最小花费爬楼梯

文章目录前言一、斐波那契数&#xff08;力扣509&#xff09;二、爬楼梯&#xff08;力扣70&#xff09;三、使用最小花费爬楼梯&#xff08;力扣746&#xff09;总结前言 1、斐波那契数 2、爬楼梯 3、使用最小花费爬楼梯 一、斐波那契数&#xff08;力扣509&#xff09; 思路…

详解C语言预处理

个人主页&#xff1a;平行线也会相交 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【C/C】 本文目录程序的翻译环境和执行环境翻译环境&#xff08;C语言程序的编译链接&#xff09;执行&#xff08;运行&#xff09;环境…

扩展Linux根目录磁盘空间

问题&#xff1a;如果一开始创建虚拟机&#xff0c;挂载给虚拟机根目录&#xff08;/&#xff09;的磁盘空间太小了&#xff0c;所以磁盘空间很快就会填满。如果根目录的磁盘空间占用超过90%&#xff0c;会导致无法再新安装软件。 查看根目录磁盘空间&#xff1a; 可以--右键…

导入若依项目数据库脚本到mysql数据库

使用DBeaver工具连接本地mysql数据库 在之前的文章中&#xff0c;已经介绍过&#xff0c;怎么样去寻找某款软件的替代软件了&#xff0c;如果不知道怎么找的&#xff0c;可以再看看之前的文章&#xff1a;为大家介绍一个我常用的搜索同类替代软件的网站 大家都知道&#xff0c;…

day18集合

1.Map集合 1.1Map集合概述和特点【理解】 Map集合概述 interface Map<K,V> K&#xff1a;键的类型&#xff1b;V&#xff1a;值的类型Map集合的特点 双列集合,一个键对应一个值键不可以重复,值可以重复 Map集合的基本使用 public class MapDemo01 {public static void…

Linux常用命令——trap命令

在线Linux命令查询工具(http://www.lzltool.com/LinuxCommand) trap 指定在接收到信号后将要采取的动作 补充说明 trap命令用于指定在接收到信号后将要采取的动作&#xff0c;常见的用途是在脚本程序被中断时完成清理工作。当shell接收到sigspec指定的信号时&#xff0c;ar…

数据类型(个人学习笔记)

这里写自定义目录标题数据类型浮点型数据浮点型常量浮点型变量字符串数据字符串型常量混合运算与printf()printf模型进制转换数据类型 常量&#xff1a;整形、实型&#xff08;浮点&#xff09;、字符型和字符串型 变量&#xff1a;变量名、变量值 整型数据 define 直接将字…