Redis进阶(三)--Redis高性能底层原理

news2024/12/23 6:09:13

文章目录

  • 第三章、Redis高性能底层原理
    • 一、持久化
      • 1、RDB
        • (1)给哪些内存数据做快照?
        • (2)RDB文件的生成是否会阻塞主线程
        • (3)bgsave执的行流程
        • (4)RDB文件
        • (5)RDB的优缺点
          • RDB的优点
          • RDB的缺点
        • (6)Redis中RDB导致的数据丢失问题
      • 2、AOF
        • (1)使用AOF
        • (2)AOF的工作流程
          • 命令写入
          • 重写机制
          • 重启加载
          • 文件校验
      • 3、RDB-AOF混合持久化
      • 4、Redis持久化相关的问题
        • (1)主线程、子进程和后台线程的联系与区别?
        • (2)Redis持久化过程中有没有其他潜在的阻塞风险?
        • (3)为什么主从库间的复制不使用 AOF?
    • 二、分布式锁
      • 1、Redis分布式锁最简单的实现
      • 2、如何避免死锁?
      • 3、锁被别人释放怎么办?
      • 4、Java代码实现分布式锁
      • 5、锁过期时间不好评估怎么办?
        • (1)分布式锁加入看门狗
        • (2)分布式锁加入看门狗代码实现
      • 6、Redisson中的分布式锁
      • 7、集群下的锁还安全么?
      • 8、Redlock真的安全吗?
        • (1)Redlock实现整体流程
        • (2)RedLock的是是非非
        • (3)RedLock总结

第三章、Redis高性能底层原理

一、持久化

Redis虽然是个内存数据库,但是Redis支持RDB和AOF两种持久化机制,将数据写往磁盘,可以有效地避免因进程退出造成的数据丢失问题,当下次重启时利用之前持久化的文件即可实现数据恢复。

1、RDB

RDB持久化是把当前进程数据生成快照保存到硬盘的过程。所谓内存快照,就是指内存中的数据在某一个时刻的状态记录。这就类似于照片,当你给朋友拍照时,一张照片就能把朋友一瞬间的形象完全记下来。RDB 就是Redis DataBase 的缩写。

(1)给哪些内存数据做快照?

Redis 的数据都在内存中,为了提供所有数据的可靠性保证,它执行的是全量快照,也就是说,把内存中的所有数据都记录到磁盘中。但是,RDB 文件就越大,往磁盘上写数据的时间开销就越大。

(2)RDB文件的生成是否会阻塞主线程

Redis 提供了两个手动命令来生成 RDB 文件,分别是 save 和 bgsave。

save:在主线程中执行,会导致阻塞;对于内存比较大的实例会造成长时间阻塞,线上环境不建议使用。
bgsave:创建一个子进程,专门用于写入 RDB 文件,避免了主线程的阻塞,这也是Redis RDB 文件生成的默认配置。

命令实战演示

image.png

image.png

image.pngimage.png

除了执行命令手动触发之外,Redis内部还存在自动触发RDB 的持久化机制,例如以下场景:

1)使用save相关配置,如“save m n”。表示m 秒内数据集存在n 次修改时,自动触发bgsave。

image.png

2)如果从节点执行全量复制操作,主节点自动执行bgsave生成RDB文件并发送给从节点。

3)执行debug reload命令重新加载Redis 时,也会自动触发save操作。

image.png

4)默认情况下执行shutdown命令时,如果没有开启AOF持久化功能则自动执行bgsave。

image.png

关闭RDB 持久化,在的Redis版本(6.2.4)上,是将配置文件中的save 配置改为 save “”

image.png

(3)bgsave执的行流程

为了快照而暂停写操作,肯定是不能接受的。所以这个时候,Redis 就会借助操作系统提供的写时复制技术(Copy-On-Write, COW),在执行快照的同时,正常处理写操作。

image.png

bgsave 子进程是由主线程 fork 生成的,可以共享主线程的所有内存数据。bgsave 子进程运行后,开始读取主线程的内存数据,并把它们写入 RDB 文件。

如果主线程对这些数据也都是读操作(例如图中的键值对 A),那么,主线程和bgsave 子进程相互不影响。但是,如果主线程要修改一块数据(例如图中的键值对 B),那么,这块数据就会被复制一份,生成该数据的副本。然后,bgsave 子进程会把这个副本数据写入 RDB 文件,而在这个过程中,主线程仍然可以直接修改原来的数据。

这既保证了快照的完整性,也允许主线程同时对数据进行修改,避免了对正常业务的影响。

(4)RDB文件

RDB文件保存在dir 配置指定的目录下,文件名通过dbfilename 配置指定。

image.png

可以通过执行config set dir {newDir} 和config set dbfilename (newFileName} 运行期动态执行,当下次运行时RDB文件会保存到新目录。

image.png

Redis 默认采用LZF 算法对生成的RDB文件做压缩处理,压缩后的文件远远小于内存大小,默认开启,可以通过参数config set rdbcompression { yes |no} 动态修改。
虽然压缩RDB会消耗CPU,但可大幅降低文件的体积,方便保存到硬盘或通过网维示络发送给从节点,因此线上建议开启。
如果 Redis加载损坏的RDB文件时拒绝启动,并打印如下日志:

Short read or OOM loading DB. Unrecoverable error,aborting now.

这时可以使用Redis提供的redis-check-rdb工具(老版本是redis-check-dump)检测RDB文件并获取对应的错误报告。

image.png

(5)RDB的优缺点
RDB的优点

RDB是一个紧凑压缩的二进制文件,代表Redis在某个时间点上的数据快照。非常适用于备份,全量复制等场景。

比如每隔几小时执行bgsave 备份,并把 RDB文件拷贝到远程机器或者文件系统中(如hdfs),用于灾难恢复。

Redis加载RDB恢复数据远远快于AOF的方式。

RDB的缺点

RDB方式数据没办法做到实时持久化/秒级持久化。因为bgsave每次运行都要执行fork 操作创建子进程,属于重量级操作,频繁执行成本过高。

RDB文件使用特定二进制格式保存,Redis版本演进过程中有多个格式的RDB版本,存在老版本Redis服务无法兼容新版RDB格式的问题。

(6)Redis中RDB导致的数据丢失问题

针对RDB不适合实时持久化的问题,Redis提供了AOF持久化方式来解决。

如下图所示,我们先在 T0 时刻做了一次快照(下一次快照是T4时刻),然后在T1时刻,数据块 5 和 8 被修改了。如果在T2时刻,机器宕机了, 那么,只能按照 T0 时刻的快照进行恢复。此时,数据块 5 和 8 的修改值因为没有快照记录,就无法恢复了。

image.png

所以这里可以看出,如果想丢失较少的数据,那么T4-T0就要尽可能的小,但是如果频繁地执行全量
快照,也会带来两方面的开销:

1、频繁将全量数据写入磁盘,会给磁盘带来很大压力,多个快照竞争有限的磁盘带宽,前一个快照还没有做完,后一个又开始做了,容易造成恶性循环。

2、另一方面,bgsave 子进程需要通过 fork 操作从主线程创建出来。虽然子进程在创建后不会再阻塞主线程,但是,fork 这个创建过程本身会阻塞主线程,而且主线程的内存越大,阻塞时间越长。如果频繁fork出bgsave 子进程,这就会频繁阻塞主线程了。

所以基于这种情况,我们就需要AOF的持久化机制。

2、AOF

AOF(append only file)持久化:以独立日志的方式记录每次写命令,重启时再重新执行AOF文件中的命令达到恢复数据的目的。AOF的主要作用是解决了数据持久化的实时性,目前已经是Redis持久化的主流方式。理解掌握好AOF持久化机制对我们兼顾数据安全性和性能非常有帮助。

(1)使用AOF

开启AOF功能需要设置配置:appendonly yes,默认不开启。

image.png

AOF文件名通过appendfilename配置设置,默认文件名是appendonly.aof。保存路径同RDB持久化方式一致,通过dir配置指定。

image.png

(2)AOF的工作流程

AOF的工作流程主要是4个部分:命令写入( append)、文件同步( sync)、文件重写(rewrite)、重启加载( load)。

image.png

命令写入

AOF命令写入的内容直接是RESP文本协议格式。例如lpush lijin A B这条命令,在AOF缓冲区会追加如下文本:

*3\r\n$6\r\nlupush\r\n$5\r\nlijin\r\n$3\r\nA B

看看 AOF 日志的内容。其中,“*3”表示当前命令有三个部分,每部分都是由“$+数字”开头,后面紧跟着具体的命令、键或值。这里,“数字”表示这部分中的命令、键或值一共有多少字节。例如,“$3 set”表示这部分有 3 个字节,也就是“set”命令。

1)AOF为什么直接采用文本协议格式?

文本协议具有很好的兼容性。开启AOF后,所有写入命令都包含追加操作,直接采用协议格式,避免了二次处理开销。文本协议具有可读性,方便直接修改和处理。

2)AOF为什么把命令追加到aof_buf中?

Redis使用单线程响应命令,如果每次写AOF文件命令都直接追加到硬盘,那么性能完全取决于当前硬盘负载。先写入缓冲区aof_buf中,还有另一个好处,Redis可以提供多种缓冲区同步硬盘的策略,在性能和安全性方面做出平衡。

Redis提供了多种AOF缓冲区同步文件策略,由参数appendfsync控制。

image.png

always

同步写回:每个写命令执行完,立马同步地将日志写回磁盘;

everysec

每秒写回:每个写命令执行完,只是先把日志写到 AOF 文件的内存缓冲区,每隔一秒把缓冲区中的内容写入磁盘;

no

操作系统控制的写回:每个写命令执行完,只是先把日志写到 AOF 文件的内存缓冲区,由操作系统决定何时将缓冲区内容写回磁盘,通常同步周期最长30秒。

很明显,配置为always时,每次写入都要同步AOF文件,在一般的SATA 硬盘上,Redis只能支持大约几百TPS写入,显然跟Redis高性能特性背道而驰,不建议配置。

配置为no,由于操作系统每次同步AOF文件的周期不可控,而且会加大每次同步硬盘的数据量,虽然提升了性能,但数据安全性无法保证。

配置为everysec,是建议的同步策略,也是默认配置,做到兼顾性能和数据安全性。理论上只有在系统突然宕机的情况下丢失1秒的数据。(严格来说最多丢失1秒数据是不准确的)

想要获得高性能,就选择 no 策略;如果想要得到高可靠性保证,就选择always 策略;如果允许数据有一点丢失,又希望性能别受太大影响的话,那么就选择everysec 策略。

重写机制

随着命令不断写入AOF,文件会越来越大,为了解决这个问题,Redis引入AOF重写机制压缩文件体积。AOF文件重写是把Redis进程内的数据转化为写命令同步到新AOF文件的过程。

重写后的AOF 文件为什么可以变小?有如下原因:

1)进程内已经超时的数据不再写入文件。

2)旧的AOF文件含有无效命令,如set a 111、set a 222等。重写使用进程内数据直接生成,这样新的AOF文件只保留最终数据的写入命令。

image.png

3)多条写命令可以合并为一个,如:lpush list a、lpush list b、lpush list c可以转化为:lpush list a b c。为了防止单条命令过大造成客户端缓冲区溢出,对于list、set、hash、zset等类型操作,以64个元素为界拆分为多条。

AOF重写降低了文件占用空间,除此之外,另一个目的是:更小的AOF文件可以更快地被Redis加载。

AOF重写过程可以手动触发和自动触发:

手动触发:直接调用bgrewriteaof命令。

image.png

自动触发:根据auto-aof-rewrite-min-size和 auto-aof-rewrite-percentage参数确定自动触发时机。

image.png

auto-aof-rewrite-min-size:表示运行AOF重写时文件最小体积,默认为64MB。

auto-aof-rewrite-percentage:代表当前AOF 文件空间(aof_currentsize)和上一次重写后AOF 文件空间(aof_base_size)的比值。

另外,如果在Redis在进行AOF重写时,有写入操作,这个操作也会被写到重写日志的缓冲区。这样,重写日志也不会丢失最新的操作。

重启加载

AOF和 RDB 文件都可以用于服务器重启时的数据恢复。redis重启时加载AOF与RDB的顺序是怎么样的呢?

image.png

1,当AOF和RDB文件同时存在时,优先加载AOF

2,若关闭了AOF,加载RDB文件

3,加载AOF/RDB成功,redis重启成功

4,AOF/RDB存在错误,启动失败打印错误信息

文件校验

加载损坏的AOF 文件时会拒绝启动,对于错误格式的AOF文件,先进行备份,然后采用redis-check-aof --fix命令进行修复,对比数据的差异,找出丢失的数据,有些可以人工修改补全。

AOF文件可能存在结尾不完整的情况,比如机器突然掉电导致AOF尾部文件命令写入不全。Redis为我们提供了aof-load-truncated 配置来兼容这种情况,默认开启。加载AOF时当遇到此问题时会忽略并继续启动,同时如下警告日志。

image.png

3、RDB-AOF混合持久化

通过 aof-use-rdb-preamble 配置项可以打开混合开关,yes则表示开启,no表示禁用,默认是禁用的,可通过config set修改

image.png

该状态开启后,如果执行bgrewriteaof命令,则会把当前内存中已有的数据弄成二进程存放在aof文件中,这个过程模拟了rdb生成的过程,然后Redis后面有其他命令,在触发下次重写之前,依然采用AOF追加的方式

image.png

4、Redis持久化相关的问题

(1)主线程、子进程和后台线程的联系与区别?

image.png

进程和线程的区别

从操作系统的角度来看,进程一般是指资源分配单元,例如一个进程拥有自己的堆、栈、虚存空间(页表)、文件描述符等;

而线程一般是指 CPU 进行调度和执行的实体。

一个进程启动后,没有再创建额外的线程,那么,这样的进程一般称为主进程或主线程。

Redis 启动以后,本身就是一个进程,它会接收客户端发送的请求,并处理读写操作请求。而且,接收请求和处理请求操作是 Redis 的主要工作,Redis 没有再依赖于其他线程,所以,我一般把完成这个主要工作的 Redis 进程,称为主进程或主线程。

主线程与子进程

通过fork创建的子进程,一般和主线程会共用同一片内存区域,所以上面就需要使用到写时复制技术确保安全。

后台线程

从 4.0 版本开始,Redis 也开始使用pthread_create 创建线程,这些线程在创建后,一般会自行执行一些任务,例如执行异步删除任务

(2)Redis持久化过程中有没有其他潜在的阻塞风险?

当Redis做RDB或AOF重写时,一个必不可少的操作就是执行fork操作创建子进程,对于大多数操作系统来说fork是个重量级错误。虽然fork创建的子进程不需要拷贝父进程的物理内存空间,但是会复制父进程的空间内存页表。例如对于10GB的Redis进程,需要复制大约20MB的内存页表,因此fork操作耗时跟进程总内存量息息相关,如果使用虚拟化技术,特别是Xen虚拟机,fork操作会更耗时。

fork耗时问题定位:

对于高流量的Redis实例OPS可达5万以上,如果fork 操作耗时在秒级别将拖慢Redis几万条命令执行,对线上应用延迟影响非常明显。正常情况下fork耗时应该是每GB消耗20毫秒左右。可以在info stats统计中查latest_fork_usec指标获取最近一次fork操作耗时,单位微秒。

image.png

如何改善fork操作的耗时:

1)优先使用物理机或者高效支持fork操作的虚拟化技术

2)控制Redis实例最大可用内存,fork耗时跟内存量成正比,线上建议每个Redis实例内存控制在10GB 以内。

3)降低fork操作的频率,如适度放宽AOF自动触发时机,避免不必要的全量复制等。

(3)为什么主从库间的复制不使用 AOF?

1、RDB 文件是二进制文件,无论是要把 RDB 写入磁盘,还是要通过网络传输 RDB,IO效率都比记录和传输 AOF 的高。

2、在从库端进行恢复时,用 RDB 的恢复效率要高于用 AOF。

二、分布式锁

1、Redis分布式锁最简单的实现

想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。

两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。

客户端 1 申请加锁,加锁成功:

客户端 2 申请加锁,因为它后到达,加锁失败:

image.png

此时,加锁成功的客户端,就可以去操作「共享资源」,例如,修改 MySQL 的某一行数据,或者调用一个 API 请求。

操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?

也很简单,直接使用 DEL 命令删除这个 key 即可,这个逻辑非常简单。

image.png

但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成「死锁」:

1、程序处理业务逻辑异常,没及时释放锁

2、进程挂了,没机会释放锁

这时,这个客户端就会一直占用这个锁,而其它客户端就「永远」拿不到这把锁了。怎么解决这个问题呢?

2、如何避免死锁?

我们很容易想到的方案是,在申请锁时,给这把锁设置一个「租期」。

在 Redis 中实现时,就是给这个 key 设置一个「过期时间」。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:

SETNX lock 1    // 加锁
EXPIRE lock 10  // 10s后自动过期

image.png

这样一来,无论客户端是否异常,这个锁都可以在 10s 后被「自动释放」,其它客户端依旧可以拿到锁。

但现在还是有问题:

现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却「来不及」执行的情况发生呢?例如:

  • SETNX 执行成功,执行EXPIRE 时由于网络问题,执行失败
  • SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
  • SETNX 执行成功,客户端异常崩溃,EXPIRE也没有机会执行

总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生「死锁」问题。

在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:

SET lock 1 EX 10 NX

image.png

3、锁被别人释放怎么办?

上面的命令执行时,每个客户端在释放锁时,都是「无脑」操作,并没有检查这把锁是否还「归自己持有」,所以就会发生释放别人锁的风险,这样的解锁流程,很不「严谨」!如何解决这个问题呢?

解决办法是:客户端在加锁时,设置一个只有自己知道的「唯一标识」进去。

例如,可以是自己的线程 ID,也可以是一个 UUID(随机且唯一),这里我们以UUID 举例:

SET lock $uuid EX 20 NX

之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:

if redis.get("lock") == $uuid:
    redis.del("lock")

这里释放锁使用的是 GET + DEL 两条命令,这时,又会遇到我们前面讲的原子性问题了。这里可以使用lua脚本来解决。

安全释放锁的 Lua 脚本如下:

if redis.call("GET",KEYS[1]) == ARGV[1]
then
    return redis.call("DEL",KEYS[1])
else
    return 0
end

好了,这样一路优化,整个的加锁、解锁的流程就更「严谨」了。

这里我们先小结一下,基于 Redis 实现的分布式锁,一个严谨的的流程如下:

1、加锁

SET lock_key $unique_id EX $expire_time NX

2、操作共享资源

3、释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再DEL 释放锁

4、Java代码实现分布式锁

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 分布式锁的实现
 */
@Component
public class RedisDistLock implements Lock {

    private final static int LOCK_TIME = 5*1000;
    private final static String RS_DISTLOCK_NS = "tdln:";
    /*
     if redis.call('get',KEYS[1])==ARGV[1] then
        return redis.call('del', KEYS[1])
    else return 0 end
     */
    private final static String RELEASE_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('del', KEYS[1])\n" +
                    "    else return 0 end";
    /*保存每个线程的独有的ID值*/
    private ThreadLocal<String> lockerId = new ThreadLocal<>();

    /*解决锁的重入*/
    private Thread ownerThread;
    private String lockName = "lock";

    @Autowired
    private JedisPool jedisPool;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    public Thread getOwnerThread() {
        return ownerThread;
    }

    public void setOwnerThread(Thread ownerThread) {
        this.ownerThread = ownerThread;
    }

    @Override
    public void lock() {
        while(!tryLock()){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException("不支持可中断获取锁!");
    }

    @Override
    public boolean tryLock() {
        Thread t = Thread.currentThread();
        if(ownerThread==t){/*说明本线程持有锁*/
            return true;
        }else if(ownerThread!=null){/*本进程里有其他线程持有分布式锁*/
            return false;
        }
        Jedis jedis = null;
        try {
            String id = UUID.randomUUID().toString();
            SetParams params = new SetParams();
            params.px(LOCK_TIME);
            params.nx();
            synchronized (this){/*线程们,本地抢锁*/
                if((ownerThread==null)&&
                "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){
                    lockerId.set(id);
                    setOwnerThread(t);
                    return true;
                }else{
                    return false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("分布式锁尝试加锁失败!");
        } finally {
            jedis.close();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException("不支持等待尝试获取锁!");
    }

    @Override
    public void unlock() {
        if(ownerThread!=Thread.currentThread()) {
            throw new RuntimeException("试图释放无所有权的锁!");
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
                    Arrays.asList(RS_DISTLOCK_NS+lockName),
                    Arrays.asList(lockerId.get()));
            if(result.longValue()!=0L){
                System.out.println("Redis上的锁已释放!");
            }else{
                System.out.println("Redis上的锁释放失败!");
            }
        } catch (Exception e) {
            throw new RuntimeException("释放锁失败!",e);
        } finally {
            if(jedis!=null) jedis.close();
            lockerId.remove();
            setOwnerThread(null);
            System.out.println("本地锁所有权已释放!");
        }
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException("不支持等待通知操作!");
    }

}

5、锁过期时间不好评估怎么办?

image.png

看上面这张图,加入key的失效时间是10s,但是客户端C在拿到分布式锁之后,然后业务逻辑执行超过10s,那么问题来了,在客户端C释放锁之前,其实这把锁已经失效了,那么客户端A和客户端B都可以去拿锁,这样就已经失去了分布式锁的功能了!!!

比较简单的妥协方案是,尽量「冗余」过期时间,降低锁提前过期的概率,但是这个并不能完美解决问题,那怎么办呢?

(1)分布式锁加入看门狗

加锁时,先设置一个过期时间,然后我们开启一个「守护线程」,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行「续期」,重新设置过期时间。

这个守护线程我们一般也把它叫做「看门狗」线程。

为什么要使用守护线程:

image.png

(2)分布式锁加入看门狗代码实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;

import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 分布式锁,附带看门狗线程的实现:加锁,保持锁1秒
 */
@Component
public class RedisDistLockWithDog implements Lock {

    private final static int LOCK_TIME = 1*1000;
    private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);
    private final static String RS_DISTLOCK_NS = "tdln2:";

    private final static String RELEASE_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('del', KEYS[1])\n" +
                    "    else return 0 end";
    /*还有并发问题,考虑ThreadLocal*/
    private ThreadLocal<String> lockerId = new ThreadLocal<>();

    private Thread ownerThread;
    private String lockName = "lock";

    @Autowired
    private JedisPool jedisPool;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    public Thread getOwnerThread() {
        return ownerThread;
    }

    public void setOwnerThread(Thread ownerThread) {
        this.ownerThread = ownerThread;
    }

    @Override
    public void lock() {
        while(!tryLock()){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException("不支持可中断获取锁!");
    }

    @Override
    public boolean tryLock() {
        Thread t=Thread.currentThread();
        /*说明本线程正在持有锁*/
        if(ownerThread==t) {
            return true;
        }else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/
            return false;
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            /*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/
            String id = UUID.randomUUID().toString();

            SetParams params = new SetParams();
            params.px(LOCK_TIME); //加锁时间1s
            params.nx();
            synchronized (this){
                if ((ownerThread==null)&&
                        "OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {
                    lockerId.set(id);
                    setOwnerThread(t);
                    if(expireThread == null){//看门狗线程启动
                        expireThread = new Thread(new ExpireTask(),"expireThread");
                        expireThread.setDaemon(true);
                        expireThread.start();
                    }
                    //往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)
                    delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));
                    System.out.println(Thread.currentThread().getName()+"已获得锁----");
                    return true;
                }else{
                    System.out.println(Thread.currentThread().getName()+"无法获得锁----");
                    return false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("分布式锁尝试加锁失败!",e);
        } finally {
            jedis.close();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException("不支持等待尝试获取锁!");
    }

    @Override
    public void unlock() {
        if(ownerThread!=Thread.currentThread()) {
            throw new RuntimeException("试图释放无所有权的锁!");
        }
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
                    Arrays.asList(RS_DISTLOCK_NS+lockName),
                    Arrays.asList(lockerId.get()));
            System.out.println(result);
            if(result.longValue()!=0L){
                System.out.println("Redis上的锁已释放!");
            }else{
                System.out.println("Redis上的锁释放失败!");
            }
        } catch (Exception e) {
            throw new RuntimeException("释放锁失败!",e);
        } finally {
            if(jedis!=null) jedis.close();
            lockerId.remove();
            setOwnerThread(null);
        }
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException("不支持等待通知操作!");
    }

    /*看门狗线程*/
    private Thread expireThread;
    //通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数   阻塞延迟队列   刷1  没有刷2
    private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
    //续锁逻辑:判断是持有锁的线程才能续锁
    private final static String DELAY_LOCK_LUA =
            "if redis.call('get',KEYS[1])==ARGV[1] then\n" +
                    "        return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
                    "    else return 0 end";
    private class ExpireTask implements Runnable{

        @Override
        public void run() {
            System.out.println("看门狗线程已启动......");
            while(!Thread.currentThread().isInterrupted()) {
                try {
                    LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到  0.9s
                    Jedis jedis = null;
                    try {
                        jedis = jedisPool.getResource();
                        Long result = (Long)jedis.eval(DELAY_LOCK_LUA,
                                Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),
                                Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));
                        if(result.longValue()==0L){
                            System.out.println("Redis上的锁已释放,无需续期!");
                        }else{
                            delayDog.add(new ItemVo<>((int)LOCK_TIME,
                                    new LockItem(lockItem.getKey(),lockItem.getValue())));
                            System.out.println("Redis上的锁已续期:"+LOCK_TIME);
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("锁续期失败!",e);
                    } finally {
                        if(jedis!=null) jedis.close();
                    }
                } catch (InterruptedException e) {
                    System.out.println("看门狗线程被中断");
                    break;
                }
            }
            System.out.println("看门狗线程准备关闭......");
        }
    }

    @PreDestroy
    public void closeExpireThread(){
        if(null!=expireThread){
            expireThread.interrupt();
        }
    }
}

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 *类说明:存放到延迟队列的元素,比标准的delay的实现要提前一点时间
 */
public class ItemVo<T> implements Delayed{

    /*到期时刻  20:00:35,234*/
    private long activeTime;
    /*业务数据,泛型*/
    private T data;

	/*传入的数值代表过期的时长,单位毫秒,需要乘1000转换为毫秒和到期时间
	* 同时提前100毫秒续期,具体的时间可以自己决定*/
	public ItemVo(long expirationTime, T data) {
		super();
		this.activeTime = expirationTime+System.currentTimeMillis()-100;
		this.data = data;
	}

	public long getActiveTime() {
		return activeTime;
	}

	public T getData() {
		return data;
	}
	
	/**
	 * 返回元素到激活时刻的剩余时长
	 */
	public long getDelay(TimeUnit unit) {
		long d = unit.convert(this.activeTime
						- System.currentTimeMillis(),unit);
		return d;
	}

	/**按剩余时长排序*/
	public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.MILLISECONDS)
				-o.getDelay(TimeUnit.MILLISECONDS));
        if (d==0){
        	return 0;
		}else{
        	if (d<0){
        		return -1;
			}else{
        		return  1;
			}
		}
	}

}

/**
 *类说明:Redis的key-value结构
 */
public class LockItem {
    private final String key;
    private final String value;

    public LockItem(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public String getValue() {
        return value;
    }
}
@SpringBootTest
public class TestRedisDistLockWithDog {

    @Autowired
    private RedisDistLockWithDog redisDistLockWithDog;
    private int count = 0;


    @Test
    public void testLockWithDog() throws InterruptedException {
        int clientCount =3;
        CountDownLatch countDownLatch = new CountDownLatch(clientCount);
        ExecutorService executorService = Executors.newFixedThreadPool(clientCount);
        for (int i = 0;i<clientCount;i++){
            executorService.execute(() -> {
                try {
                    redisDistLockWithDog.lock(); //锁的有效时间1秒
                    System.out.println(Thread.currentThread().getName()+"准备进行累加。");
                    Thread.sleep(2000);
                    count++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    redisDistLockWithDog.unlock();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println(count);
    }

    @Test
    public void testTryLock2() {
        int clientCount =1000;
        for (int i = 0;i<clientCount;i++) {
            if (redisDistLockWithDog.tryLock()) {
                System.out.println("已获得锁!");
                redisDistLockWithDog.unlock();
            } else {
                System.out.println("未能获得锁!");
            }
        }
    }

}

运行效果:

image.png

6、Redisson中的分布式锁

Redisson把这些工作都封装好了

     <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.12.3</version>
        </dependency>
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRedissonConfig {
    /**
     * 所有对Redisson的使用都是通过RedissonClient
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson(){
        //1、创建配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");

        //2、根据Config创建出RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

import org.junit.jupiter.api.Test;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@SpringBootTest
public class TestRedissionLock {

    private int count = 0;
    @Autowired
    private RedissonClient redisson;

    @Test
    public void testLockWithDog() throws InterruptedException {
        int clientCount =3;
        RLock lock = redisson.getLock("RD-lock");
        CountDownLatch countDownLatch = new CountDownLatch(clientCount);
        ExecutorService executorService = Executors.newFixedThreadPool(clientCount);
        for (int i = 0;i<clientCount;i++){
            executorService.execute(() -> {
                try {
                    lock.lock(10, TimeUnit.SECONDS);
                    System.out.println(Thread.currentThread().getName()+"准备进行累加。");
                    Thread.sleep(2000);
                    count++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println(count);
    }
}

7、集群下的锁还安全么?

基于 Redis 的实现分布式锁,前面遇到的问题,以及对应的解决方案:

1、死锁:设置过期时间

2、过期时间评估不好,锁提前过期:守护线程,自动续期

3、锁被别人释放:锁写入唯一标识,释放锁先检查标识,再释放

之前分析的场景都是,锁在「单个」Redis实例中可能产生的问题,并没有涉及到 Redis 的部署架构细节。

而我们在使用 Redis 时,一般会采用主从集群 +哨兵的模式部署,这样做的好处在于,当主库异常宕机时,哨兵可以实现「故障自动切换」,把从库提升为主库,继续提供服务,以此保证可用性。

但是因为主从复制是异步的,那么就不可避免会发生的锁数据丢失问题(加了锁却没来得及同步过来)。从库被哨兵提升为新主库,这个锁在新的主库上,丢失了!

8、Redlock真的安全吗?

Redis 作者提出的 Redlock方案,是如何解决主从切换后,锁失效问题的。

Redlock 的方案基于一个前提:

不再需要部署从库和哨兵实例,只部署主库;但主库要部署多个,官方推荐至少 5 个实例。

注意:不是部署 Redis Cluster,就是部署 5 个简单的 Redis 实例。它们之间没有任何关系,都是一个个孤立的实例。

基于Redis的Redisson红锁 RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个 RLock对象关联为一个红锁,每个 RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

大家都知道,如果负责储存某些分布式锁的某些Redis节点宕机以后,而且这些锁正好处于锁住的状态时,这些锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了 leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);

// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
(1)Redlock实现整体流程

1、客户端先获取「当前时间戳T1」

2、客户端依次向这 5 个 Redis 实例发起加锁请求

3、如果客户端从 >=3 个(大多数)以上Redis 实例加锁成功,则再次获取「当前时间戳T2」,如果 T2 - T1 < 锁的过期时间,此时,认为客户端加锁成功,否则认为加锁失败。

4、加锁成功,去操作共享资源

5、加锁失败/释放锁,向「全部节点」发起释放锁请求。

所以总的来说:客户端在多个 Redis 实例上申请加锁;必须保证大多数节点加锁成功;大多数节点加锁的总耗时,要小于锁设置的过期时间;释放锁,要向全部节点发起释放锁请求。

我们来看 Redlock 为什么要这么做?

  1. 为什么要在多个实例上加锁?

本质上是为了「容错」,部分实例异常宕机,剩余的实例加锁成功,整个锁服务依旧可用。

  1. 为什么大多数加锁成功,才算成功?

多个 Redis 实例一起来用,其实就组成了一个「分布式系统」。在分布式系统中,总会出现「异常节点」,所以,在谈论分布式系统问题时,需要考虑异常节点达到多少个,也依旧不会影响整个系统的「正确性」。

这是一个分布式系统「容错」问题,这个问题的结论是:如果只存在「故障」节点,只要大多数节点正常,那么整个系统依旧是可以提供正确服务的。

  1. 为什么步骤 3 加锁成功后,还要计算加锁的累计耗时?

因为操作的是多个节点,所以耗时肯定会比操作单个实例耗时更久,而且,因为是网络请求,网络情况是复杂的,有可能存在延迟、丢包、超时等情况发生,网络请求越多,异常发生的概率就越大。

所以,即使大多数节点加锁成功,但如果加锁的累计耗时已经「超过」了锁的过期时间,那此时有些实例上的锁可能已经失效了,这个锁就没有意义了。

  1. 为什么释放锁,要操作所有节点?

在某一个 Redis 节点加锁时,可能因为「网络原因」导致加锁失败。

例如,客户端在一个 Redis 实例上加锁成功,但在读取响应结果时,网络问题导致读取失败,那这把锁其实已经在 Redis 上加锁成功了。

所以,释放锁时,不管之前有没有加锁成功,需要释放「所有节点」的锁,以保证清理节点上「残留」的锁。

好了,明白了 Redlock 的流程和相关问题,看似Redlock 确实解决了 Redis 节点异常宕机锁失效的问题,保证了锁的「安全性」。

但事实真的如此吗?

(2)RedLock的是是非非

一个分布式系统,更像一个复杂的「野兽」,存在着你想不到的各种异常情况。

这些异常场景主要包括三大块,这也是分布式系统会遇到的三座大山:NPC。

N:Network Delay,网络延迟

P:Process Pause,进程暂停(GC)

C:Clock Drift,时钟漂移

比如一个进程暂停(GC)的例子

image.png

1)客户端 1 请求锁定节点 A、B、C、D、E

2)客户端 1 的拿到锁后,进入 GC(时间比较久)

3)所有 Redis 节点上的锁都过期了

4)客户端 2 获取到了 A、B、C、D、E 上的锁

5)客户端 1 GC 结束,认为成功获取锁

6)客户端 2 也认为获取到了锁,发生「冲突」

GC 和网络延迟问题:这两点可以在红锁实现流程的第3步来解决这个问题。

但是最核心的还是时钟漂移,因为时钟漂移,就有可能导致第3步的判断本身就是一个BUG,所以当多个 Redis 节点「时钟」发生问题时,也会导致 Redlock 锁失效。

(3)RedLock总结

Redlock 只有建立在「时钟正确」的前提下,才能正常工作,如果你可以保证这个前提,那么可以拿来使用。

但是时钟偏移在现实中是存在的:

第一,从硬件角度来说,时钟发生偏移是时有发生,无法避免。例如,CPU 温度、机器负载、芯片材料都是有可能导致时钟发生偏移的。

第二,人为错误也是很难完全避免的。

所以,Redlock尽量不用它,而且它的性能不如单机版 Redis,部署成本也高,优先考虑使用主从+ 哨兵的模式
实现分布式锁(只会有很小的记录发生主从切换时的锁丢失问题)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2112443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ios免签H5

1、windows下载mobileconfig文件制作工具&#xff0c;可在csdn搜索iPhone_Mobileconfig_Tool下载安装&#xff1b;IOS 从APP Store 下载Apple Configurator 2 2、用申请的域名SSL证书给mobieconfig文件签名&#xff0c;最好下载Apache证书&#xff0c;里面包含 AE86211.crt…

zabbix-高级应用(主被动监控、邮件告警、企业微信告警)

文章目录 zabbix-高级应用监控路由器交换机SNMP简单网络管理协议测试案例配置网络设备创建主机创建监控项测试监控项 自动发现什么是自动发现Discovery&#xff1f;配置自动发现1、创建自动发现规则2、创建Action动作&#xff08;发现主机后自动执行什么动作&#xff09;3、通过…

Python画笔案例-037 绘制彩色格子台阶

1、绘制彩色格子台阶 通过 python 的turtle 库绘制彩色格子台阶&#xff0c;如下图&#xff1a; 2、实现代码 绘制彩色格子台阶&#xff0c;以下为实现代码&#xff1a; """彩色格子台阶.py """ import turtle from random import randomturtle…

小杨做题c++

题目描述 为了准备考试&#xff0c;小杨每天都要做题。第1天&#xff0c;小杨做了a道题;第2天&#xff0c;小杨做了b道题;从第3天起&#xff0c;小杨每天做的题目数量是前两天的总和。 此外&#xff0c;小杨还规定&#xff0c;当自己某一天做了大于或等于m题时&#xff0c;接下…

KRTSt内嵌Lua脚本

KRTSt内嵌Lua脚本 Lua 简介 Lua是一门强大、高效、轻量、可嵌入的脚本语言。它支持多种编程架构&#xff1a;过程编程、面向对象编程&#xff08;OOP&#xff09;、函数式编程、数据驱动编程及数据描述。 Lua结合了简洁的过程语法和强大的数据描述结构&#xff08;基于关联数…

什么是网络准入控制系统?网络准入控制系统七大品牌介绍!

在当今信息化时代&#xff0c;企业网络安全面临着前所未有的挑战。网络准入控制系统&#xff08;NAC, Network Access Control&#xff09;作为一种重要的网络安全技术&#xff0c;扮演着守护企业网络安全大门的关键角色。网络准入控制系统通过对接入网络的设备进行身份验证、安…

为什么现在不建议去电力设计院?终于有人把电力设计院说清楚了!

作者&#xff1a;电气哥 最近电气哥收到了许多面临就业的同学特别是硕士同学有关于电力设计院的咨询&#xff0c;那么现在电力设计院到底还值不值得去&#xff1f;电气哥带你来分析一下电力设计院的前世今生。 01 电力设计院的前世今生 曾经&#xff0c;在我国的大基建时代&…

java设计模式--(行为型模式:策略模式、命令模式、责任链模式)

6&#xff0c;行为型模式 行为型模式用于描述程序在运行时复杂的流程控制&#xff0c;即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务&#xff0c;它涉及算法与对象间职责的分配。 行为型模式分为类行为模式和对象行为模式&#xff0c;前者采用继承…

keepalived和lvs高可用集群

keepavlied和lvs高可用集群搭建 主备模式&#xff1a; 关闭防火墙和selinux systemctl stop firewalld setenforce 0部署master负载调度服务器 zyj86 安装ipvsadm keepalived yum install -y keepalived ipvsadm修改主节点配置 vim /etc/keepalived/keepalived.conf! Conf…

鹰眼雾炮适合在哪些场合使用

朗观视觉鹰眼雾炮由于其独特的功能和优势&#xff0c;适合在多种场合使用&#xff0c;主要包括但不限于以下几个方面&#xff1a; 建筑工地&#xff1a;建筑工地是粉尘污染的主要来源之一。鹰眼雾炮可以有效降低施工过程中的扬尘&#xff0c;改善工地及周边空气质量&#xff0c…

南卡科技“满分之选”全新开放式耳机发布,打造超越Pro的极致体验!

在音频技术的不断革新中&#xff0c;南卡品牌以其深厚的声学底蕴和对创新的不懈追求&#xff0c;再次为市场带来惊喜。今天&#xff0c;我们自豪地宣布&#xff0c;南卡OE Pro2开放式蓝牙耳机正式亮相&#xff0c;它不仅代表了南卡在开放式耳机领域的技术巅峰&#xff0c;更是对…

基本和复合逻辑运算

目录 基本逻辑运算 与运算 或运算 非运算 复合逻辑运算 与非运算 或非运算 异或运算 同或运算 基本逻辑运算 与运算 两个都为1才为1&#xff0c;否则为0&#xff0c;类似于编程语言里的&。 有0出0&#xff0c;全1出1。 逻辑表达式就是AB,可以省略中间的点。 逻辑符…

TimescaleDB-3 超表的维护

数采系统上线半年多了&#xff0c;产生了大概2亿条记录&#xff0c;这些数据其实是有时效性的&#xff0c;用来生成的二次数据可以永久保存&#xff0c;这种时序数据没有多大价值&#xff0c;又非常占用空间&#xff0c;所以定期清理超表的trunk是必要的 --1、查看分区表以及…

好用的AI编程助手[豆包]

欢迎来到 Marscode 的世界&#xff01;这里将为你揭秘 Marscode&#xff0c;它的独特之处、应用领域等相关精彩内容等你来探索。 一、打开VS Code 二、选择 Extensions,搜索marscode 三、点击安装 四、点击使用 五、输入需要编写的代码 六、根据自己的需求修改代码 MarsCode 注…

Pytorch多GPU分布式训练代码编写

Pytorch多GPU分布式训练代码编写 一、数据并行 1.单机单卡 模型拷贝 model.cuda() 原地操作 数据拷贝&#xff08;每步&#xff09; datadata.cuda() 非原地操作 基于torch.cuda.is_available()来判断是否可用 模型保存与加载 torch.save 来保存模型、优化器、其他变量tor…

spring security 中的授权使用

一、认证 身份认证&#xff0c;就是判断一个用户是否为合法用户的处理过程。Spring Security 中支持多种不同方式的认证&#xff0c;但是无论开发者使用那种方式认证&#xff0c;都不会影响授权功能使用。因为 SpringSecurity 很好做到了认证和授权解耦。 二、授权 授权&#x…

红黑树的旋转

红黑树的基本性质 红黑树与普通的二叉搜索树不同&#xff0c;它在每个节点上附加了一个额外的属性——颜色&#xff0c;该颜色可以是红色或黑色。通过引入这些颜色&#xff0c;红黑树能够维持以下 5 个基本性质&#xff0c;以确保树的平衡性&#xff1a; 每个节点是红色或黑色…

C++入门10——stack与queue的使用

目录 1.什么是stack&#xff1f; stack的使用 2.什么是queue&#xff1f; queue的使用 3.priority_queue 3.1 什么是priority_queue? 3.2 priority_queue的使用 1.什么是stack&#xff1f; 在官网中&#xff0c;对stack有这样的介绍&#xff1a; Stacks are a type o…

一台电脑对应一个IP地址吗?‌探讨两台电脑共用IP的可能性

在当今数字化时代&#xff0c;‌IP地址作为网络世界中的“门牌号”&#xff0c;‌扮演着至关重要的角色。‌它负责在网络上唯一标识每一台设备&#xff0c;‌使得数据能够在庞大的互联网中准确无误地传输。‌然而&#xff0c;‌对于IP地址与电脑之间的对应关系&#xff0c;‌许…

uni-appH5项目实现导航区域与内容区域联动效果

一、需求描述 将导航区域与内容区域实现联动&#xff0c;即点击导航区域&#xff0c;内容区滚动到对应位置&#xff0c;内容区滚动过程中根据内容定位到相对应的导航栏。 效果如下&#xff1a; 侧边导航与内容联动效果 二、功能实现思路分析汇总&#xff1a; 三、具体代码 1…