1 什么是Redis
Redis(Remote Dictionary Server) 是一个使用 C 语言编写的,开源的(BSD许可)高性能非关系型(NoSQL)的键值对数据库。
Redis 可以存储键和五种不同类型的值之间的映射。键的类型只能为字符串,值支持五种数据类型:字符串、列表、集合、散列表、有序集合。
与传统数据库不同的是 Redis 的数据是存在内存中的,所以读写速度非常快,因此 redis 被广泛应用于缓存方向,每秒可以处理超过 10万次读写操作,是已知性能最快的Key-Value DB。另外,Redis 也经常用来做分布式锁。除此之外,Redis 支持事务 、持久化、LUA脚本、LRU驱动事件、多种集群方案。
8、 持久化
8.1、什么是Redis持久化?
持久化就是把内存的数据写到磁盘中去,防止服务宕机了内存数据丢失。
Redis 的持久化机制是什么?各自的优缺点?
Redis 提供两种持久化机制 RDB(默认) 和 AOF 机制:
RDB:是Redis DataBase缩写快照
RDB是Redis默认的持久化方式。按照一定的时间将内存的数据以快照的形式保存到硬盘中,对应产生的数据文件为dump.rdb。通过配置文件中的save参数来定义快照的周期。
优点:
1)、只有一个文件 dump.rdb,方便持久化。
2)、容灾性好,一个文件可以保存到安全的磁盘。
3、性能最大化,fork 子进程来完成写操作,让主进程继续处理命令,所以是 IO 最大化。使用单独子进程来进行持久化,主进程不会进行任何 IO 操作,保证了 redis 的高性能
4).相对于数据集大时,比 AOF 的启动效率更高。
缺点:
1)、数据安全性低。RDB 是间隔一段时间进行持久化,如果持久化之间 redis 发生故障,会发生数据丢失。所以这种方式更适合数据要求不严谨的时候)
2)、AOF(Append-only file)持久化方式: 是指所有的命令行记录以 redis 命令请 求协议的格式完全持久化存储)保存为 aof 文件。
AOF:持久化
AOF持久化(即Append Only File持久化),则是将Redis执行的每次写命令记录到单独的日志文件中,当重启Redis会重新将持久化的日志中文件恢复数据。
当两种方式同时开启时,数据恢复Redis会优先选择AOF恢复。
优点:
1)、数据安全,aof 持久化可以配置 appendfsync 属性,有 always,每进行一次 命令操作就记录到 aof 文件中一次。
2)、通过 append 模式写文件,即使中途服务器宕机,可以通过 redis-check-aof 工具解决数据一致性问题。
3)、AOF 机制的 rewrite 模式。AOF 文件没被 rewrite 之前(文件过大时会对命令 进行合并重写),可以删除其中的某些命令(比如误操作的 flushall))
缺点:
1)、AOF 文件比 RDB 文件大,且恢复速度慢。
2)、数据集大的时候,比 rdb 启动效率低。
优缺点是什么?
AOF文件比RDB更新频率高,优先使用AOF还原数据。
AOF比RDB更安全也更大
RDB性能比AOF好
如果两个都配了优先加载AOF
如何选择合适的持久化方式
一般来说, 如果想达到足以媲美PostgreSQL的数据安全性,你应该同时使用两种持久化功能。在这种情况下,当 Redis 重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
如果你非常关心你的数据, 但仍然可以承受数分钟以内的数据丢失,那么你可以只使用RDB持久化。
有很多用户都只使用AOF持久化,但并不推荐这种方式,因为定时生成RDB快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比AOF恢复的速度要快,除此之外,使用RDB还可以避免AOF程序的bug。
如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式。
8.2、 Redis持久化数据和缓存怎么做扩容?
如果Redis被当做缓存使用,使用一致性哈希实现动态扩容缩容。
如果Redis被当做一个持久化存储使用,必须使用固定的keys-to-nodes映射关系,节点的数量一旦确定不能变化。否则的话(即Redis节点需要动态变化的情况),必须使用可以在运行时进行数据再平衡的一套系统,而当前只有Redis集群可以做到这样。
9、过期键的删除策略
9.1、Redis的过期键的删除策略
我们都知道,Redis是key-value数据库,我们可以设置Redis中缓存的key的过期时间。Redis的过期策略就是指当Redis中缓存的key过期了,Redis如何处理,过期策略通常有以下三种:
1) 定时过期:每个设置过期时间的key都需要创建一个定时器,到过期时间就会立即清除。该策略可以立即清除过期的数据,对内存很友好;但是会占用大量的CPU资源去处理过期的数据,从而影响缓存的响应时间和吞吐量。
2) 惰性过期:只有当访问一个key时,才会判断该key是否已过期,过期则清除。该策略可以最大化地节省CPU资源,却对内存非常不友好。极端情况可能出现大量的过期key没有再次被访问,从而不会被清除,占用大量内存。
3) 定期过期:每隔一定的时间,会扫描一定数量的数据库的expires字典中一定数量的key,并清除其中已过期的key。该策略是前两者的一个折中方案。通过调整定时扫描的时间间隔和每次扫描的限定耗时,可以在不同情况下使得CPU和内存资源达到最优的平衡效果。
(expires字典会保存所有设置了过期时间的key的过期时间数据,其中,key是指向键空间中的某个键的指针,value是该键的毫秒精度的UNIX时间戳表示的过期时间。键空间是指该Redis集群中保存的所有键。)
Redis中同时使用了惰性过期和定期过期两种过期策略。
9.2、Redis key的过期时间和永久有效分别怎么设置?
EXPIRE和PERSIST命令。
9.3、对过期的数据怎么处理呢?
除了缓存服务器自带的缓存失效策略之外(Redis默认的有6中策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:定时去清理过期的缓存;当有用户请求过来时,再判断这个请求所用到的缓存是否过期,过期的话就去底层系统得到新数据并更新缓存。
两者各有优劣,第一种的缺点是维护大量缓存的key是比较麻烦的,第二种的缺点就是每次用户请求过来都要判断缓存失效,逻辑相对比较复杂!具体用哪种方案,大家可以根据自己的应用场景来权衡。
10、 内存相关
MySQL里有2000w数据,redis中只存20w的数据,如何保证redis中的数据都是热点数据
redis内存数据集大小上升到一定大小的时候,就会施行数据淘汰策略。
Redis的内存淘汰策略有哪些
Redis的内存淘汰策略是指在Redis的用于缓存的内存不足时,怎么处理需要新写入且需要申请额外空间的数据。
全局的键空间选择性移除
noeviction:当内存不足以容纳新写入数据时,新写入操作会报错。
allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key。(这个是最常用的)
allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个key。设置过期时间的键空间选择性移除
volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的key。
volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key。
volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除。
总结
Redis的内存淘汰策略的选取并不会影响过期的key的处理。内存淘汰策略用于处理内存不足时的需要申请额外空间的数据;过期策略用于处理过期的缓存数据。
Redis主要消耗什么物理资源?
内存。
Redis的内存用完了会发生什么?
如果达到设置的上限,Redis的写命令会返回错误信息(但是读命令还可以正常返回。)或者你可以配置内存淘汰机制,当Redis达到内存上限时会冲刷掉旧的内容。
Redis如何做内存优化?
可以好好利用Hash,list,sorted set,set等集合类型数据,因为通常情况下很多小的Key-Value可以用更紧凑的方式存放到一起。尽可能使用散列表(hashes),散列表(是说散列表里面存储的数少)使用的内存非常小,所以你应该尽可能的将你的数据模型抽象到一个散列表里面。比如你的web系统中有一个用户对象,不要为这个用户的名称,姓氏,邮箱,密码设置单独的key,而是应该把这个用户的所有信息存储到一张散列表里面
11、 线程模型
11.1、Redis线程模型
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器(file event handler)。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型。
文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字, 并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时, 与操作相对应的文件事件就会产生, 这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。
虽然文件事件处理器以单线程方式运行, 但通过使用 I/O 多路复用程序来监听多个套接字, 文件事件处理器既实现了高性能的网络通信模型, 又可以很好地与 redis 服务器中其他同样以单线程方式运行的模块进行对接, 这保持了 Redis 内部单线程设计的简单性。
11.2、事务
什么是事务?
事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。
Redis事务的概念
Redis 事务的本质是通过MULTI、EXEC、WATCH等一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
Redis事务的三个阶段
事务开始 MULTI
命令入队
事务执行 EXEC
事务执行过程中,如果服务端收到有EXEC、DISCARD、WATCH、MULTI之外的请求,将会把请求放入队列中排队
Redis事务相关命令
Redis事务功能是通过MULTI、EXEC、DISCARD和WATCH 四个原语实现的
Redis会将一个事务中的所有命令序列化,然后按顺序执行。
redis 不支持回滚,“Redis 在事务失败时不进行回滚,而是继续执行余下的命令”, 所以 Redis 的内部可以保持简单且快速。
如果在一个事务中的命令出现错误,那么所有的命令都不会执行;
如果在一个事务中出现运行错误,那么正确的命令会被执行。
WATCH 命令是一个乐观锁,可以为 Redis 事务提供 check-and-set (CAS)行为。 可以监控一个或多个键,一旦其中有一个键被修改(或删除),之后的事务就不会执行,监控一直持续到EXEC命令。
MULTI命令用于开启一个事务,它总是返回OK。 MULTI执行之后,客户端可以继续向服务器发送任意多条命令,这些命令不会立即被执行,而是被放到一个队列中,当EXEC命令被调用时,所有队列中的命令才会被执行。
EXEC:执行所有事务块内的命令。返回事务块内所有命令的返回值,按命令执行的先后顺序排列。 当操作被打断时,返回空值 nil 。
通过调用DISCARD,客户端可以清空事务队列,并放弃执行事务, 并且客户端会从事务状态中退出。
UNWATCH命令可以取消watch对所有key的监控。
事务管理(ACID)概述
原子性(Atomicity)
原子性是指事务是一个不可分割的工作单位,事务中的操作要么都发生,要么都不发生。
一致性(Consistency)
事务前后数据的完整性必须保持一致。
隔离性(Isolation)
多个事务并发执行时,一个事务的执行不应影响其他事务的执行
持久性(Durability)
持久性是指一个事务一旦被提交,它对数据库中数据的改变就是永久性的,接下来即使数据库发生故障也不应该对其有任何影响
Redis的事务总是具有ACID中的一致性和隔离性,其他特性是不支持的。当服务器运行在AOF持久化模式下,并且appendfsync选项的值为always时,事务也具有耐久性。
Redis事务支持隔离性吗
Redis 是单进程程序,并且它保证在执行事务时,不会对事务进行中断,事务可以运行直到执行完所有事务队列中的命令为止。因此,Redis 的事务是总是带有隔离性的。
Redis事务保证原子性吗,支持回滚吗
Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
Redis事务其他实现
基于Lua脚本,Redis可以保证脚本内的命令一次性、按顺序地执行,
其同时也不提供事务运行错误的回滚,执行过程中如果部分命令运行错误,剩下的命令还是会继续运行完
基于中间标记变量,通过另外的标记变量来标识事务是否执行完成,读取数据时先读取该标记变量判断是否事务执行完成。但这样会需要额外写代码实现,比较繁琐
26、分布式问题
26.1、Redis实现分布式锁
Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系Redis中可以使用SETNX命令实现分布式锁。
当且仅当 key 不存在,将 key 的值设为 value。 若给定的 key 已经存在,则 SETNX 不做任何动作
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
返回值:设置成功,返回 1 。设置失败,返回 0 。
使用SETNX完成同步锁的流程及事项如下:
使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间
释放锁,使用DEL命令将锁数据删除
27、如何解决 Redis 的并发竞争 Key 问题
27.1、为什么使用分布式锁
以商品减库存为例子,先来看看单机锁的场景下的一小段代码。
首先在redis中set key为“stock”,value为10。当程序执行时,判断stock是否大于0,大于0 则进行减1操作, 减完后重新赋值到stock中去。
synchronized同步锁是单机锁,在只有一个线程访问时,redis中的stock是不会出现问题的。 但本文阐述的是在并发场景下,所以本次案例采用 Jmeter 压测工具进行并发测试(有兴趣的小伙伴自行压测,这里就不展示结果了。。) 发现stock会出现负数的情况。
如今大部分互联网公司的业务应用基本都是是微服务架构, 在多线程的环境中,如果要避免同时操作一个共享变量产生资源争抢,数据问题,一般都会引入分布式锁来互斥,以保证共享变量的正确性,其使用范围是在同一个进程中。
如图所示,加上一个锁服务,所有进程在访问服务端之前都需要去这个服务上申请加锁。只有一个进程返回成功,其余的都会返回失败或阻塞等待,达到了互斥的效果。
27.2、 Redis锁
想要实现分布式锁,必须要求 Redis 有互斥的能力,我们可以使用 SETNX 命令,这个命令表示 SET if Not eXists,即如果 key 不存在,才会设置它的值,否则什么也不做。
多个进程访问服务端,加锁成功的客户端,就可以去操作共享资源。
操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。释放锁直接使用DEL 命令删除这个 key就可以了。
27.2.1、 redis 实现分布式锁,SETNX
这个逻辑非常简单,就是在每次访问资源之前, 先去redis中加一把锁, 加锁成功返回true, 失败返回false, 则会返回前端一个error。 失败证明当前有其他线程占有这把key为lockKey的锁, 则进行阻塞等待。 成功则可以之前后续业务代码,将stock减一, 等业务全部执行完后,将锁释放,执行delete操作,让后续的线程能拿到锁。
但是,它存在一个很大的问题,当客户端 1 拿到锁后,在中间业务突然发生逻辑异常,导致无法及时释放锁,即没走到delete操作,此时就会造成死锁。那怎么办? 看下一段进化。
27.2.2、 释放锁放到finaly
嘿嘿,是不是加上try catch,在finally后面做释放锁的操作就行了!
但有没有想过,我在代码中间留着那么大的一行,有没有想到呢?没错,如果客户端1拿到锁了,刚想往下执行业务代码,此时程序突然挂了, 是不是我连finally都走不到那里去,导致锁就无法进行释放,客户端1就会一直占用着这把锁。这也会造成死锁。
27.2.3、 如何避免死锁?
在 Redis 中实现时,就是给这个 key 设置一个过期时间。借助StringRedisTemplate的api Expire
给我们的锁加上一个10s的过期时间,10s后锁就会进行自动释放。
但如果发生以下场景:
1).客户端1 SETNX执行成功了,然后执行 EXPIRE 时由于网络问题,执行失败
2).或者 SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
3).又或者 SETNX 执行成功,客户端异常崩溃,EXPIRE 也没有机会执行
好在Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:
于是乎,我们解决了死锁的问题。以为这样redis锁的代码就算完成了吗? 还没那么简单,我们需要再进行完善优化。
试想这样一种场景:
如果客户端1加锁成功,开始执行业务代码,但是他执行的业务逻辑超时了, 超过了锁的过期时间10s,此时锁被自动释放,此时来了客户端2,它发现没人占用这把锁,于是乎它加锁成功也去执行业务代码了,然后客户端1刚好执行完业务代码后,去释放锁,但是它此时释放的是客户端2的锁
如何防止锁被别人释放?
客户端在加锁时,设置一个唯一标识。
我们在锁lockKey的value设置为UUID,在释放锁的前先判断锁是否是自己持有的。 但是还是有问题。。所以说实现redis锁是不是考虑的点比较多。
27.2.4、 如何正确评估锁过期时间?
前面我们提到,锁的过期时间如果评估不好,这个锁就会有提前过期的风险。
是否可以设计这样的方案:加锁时,先设置一个过期时间,然后我们开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间。
redis早就帮我们想好了。接下来介绍下redisson。
Redisson 是一个 Java 语言实现的 Redis SDK 客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般也把它叫做看门狗线程。
讲一下redisson lock方法的底层代码
默认是30s的超时时间,也可以自己进行设值。再看关键代码。
其实底层采用的是lua脚本,lua脚本保证了原子性。在执行lua脚本过程中,有一部分命令成功了,有一部分失败了,也不会出现回滚的操作。因为 Redis 处理每一个请求是单线程执行的,在执行一个 Lua 脚本时,其它请求必须等待,直到这个 Lua 脚本处理完成,这样一来,GET + DEL 之间就不会插入其它命令了。而且执行效率非常快!
之前分析的场景都是,锁在单个Redis 实例中可能产生的问题,并没有涉及到 Redis 的部署架构细节。
而我们在使用 Redis 时,一般会采用主从集群 + 哨兵的模式部署,这样做的好处在于,当主库异常宕机时,哨兵可以实现故障自动切换,把从库提升为主库,继续提供服务,以此保证可用性。
那当「主从发生切换」时,这个分布锁会依旧安全吗?用上面的图为例子
当客户端1在主库上执行 SET 命令,加锁成功,此时,主库异常宕机,SET 命令还未同步到从库上(主从复制是异步的),从库被哨兵提升为新主库,这个锁在新的主库上丢失了!
为此,Redis 的作者提出一种解决方案,就是我们经常听到的 Redlock(红锁)。
27.2.5、 Redlock红锁
Redlock 的方案基于 2 个前提:
1). 不再需要部署 从库 和 哨兵 实例,只部署 主库
2). 但主库要部署多个,官方推荐至少 5 个实例
整体的流程是这样的,一共分为 5 步:
客户端先获取当前时间戳 T1
客户端依次向这 5 个 Redis 实例发起加锁请求(用前面讲到的 SET 命令),且每个请求会设置请求的超时时间(毫秒级,要远小于锁的有效时间),这是为了 如果某一个实例加锁失败(包括网络超时、锁被其它人持有等各种异常情况),就立即向下一个 Redis 实例申请加锁
如果客户端从 >=3 个(大多数)以上 Redis 实例加锁成功,则再次获取当前时间戳 T2,如果 T2 - T1 < 锁的过期时间,此时,认为客户端加锁成功,否则认为加锁失败
加锁成功,去操作共享资源(例如修改 MySQL 某一行,或发起一个 API 请求)
加锁失败,向全部节点发起释放锁请求(前面讲到的 Lua 脚本释放锁)
1) 为什么要在多个实例上加锁?
本质上是为了「容错」,部分实例异常宕机,剩余的实例加锁成功,整个锁服务依旧可用。
2) 为什么大多数加锁成功,才算成功?
多个 Redis 实例一起来用,其实就组成了一个「分布式系统」。
在分布式系统中,总会出现「异常节点」,所以,在谈论分布式系统问题时,需要考虑异常节点达到多少个,也依旧不会影响整个系统的「正确性」。
这是一个分布式系统「容错」问题,这个问题的结论是:如果只存在「故障」节点,只要大多数节点正常,那么整个系统依旧是可以提供正确服务的。
这个问题的模型,就是我们经常听到的拜占庭将军问题,感兴趣可以去看算法的推演过程。
3) 为什么步骤 3 加锁成功后,还要计算加锁的累计耗时?
因为操作的是多个节点,所以耗时肯定会比操作单个实例耗时更久,而且,因为是网络请求,网络情况是复杂的,有可能存在延迟、丢包、超时等情况发生,网络请求越多,异常发生的概率就越大。
所以,即使大多数节点加锁成功,但如果加锁的累计耗时已经「超过」了锁的过期时间,那此时有些实例上的锁可能已经失效了,这个锁就没有意义了。
4) 为什么释放锁,要操作所有节点?
在某一个 Redis 节点加锁时,可能因为网络原因导致加锁失败。
例如,客户端在一个 Redis 实例上加锁成功,但在读取响应结果时,网络问题导致读取失败,那这把锁其实已经在 Redis 上加锁成功了。
所以,释放锁时,不管之前有没有加锁成功,需要释放所有节点的锁,以保证清理节点上残留的锁。
27.3 Zookeeper分布式锁
Zookeeper节点( Znode )
持久节点 (PERSISTENT)
默认的节点类型,创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。
持久节点顺序节点(PERSISTENT_SEQUENTIAL)
创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号。
临时节点(EPHEMERAL)
和持久节点相反, 创建节点的客户端与zookeeper断开连接后,临时节点会被删除。
临时顺序节点(EPHEMERAL_SEQUENTIAL)
临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。
27.3.1、Zookeeper分布式锁的原理
如图所示, 如果多个线程获取锁失败处于等待状态,当加锁成功的线程释放锁,会造成羊群效应,所有等待的线程都会去争抢锁,这会对服务器造成较大的影响。
所以在3.0版本后有了以下的改革
获取锁
首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。
之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。
这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。
Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。
于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。
这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。
Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。
于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。
释放锁分为两种情况:
任务完成,客户端显示释放。当任务完成时,Client1会显示调用删除节点Lock1的指令。
任务执行过程中,客户端崩溃
获得锁的Client1在任务执行过程中,如果Duang的一声崩溃,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点Lock1会随之自动删除。
由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2顺理成章获得了锁。
同理,如果Client2也因为任务完成或者节点崩溃而删除了节点Lock2,那么Client3就会接到通知。
最终,Client3成功得到了锁。
安全性
场景:
1. 客户端 1 和 2 都尝试创建「临时节点」,例如 /lock2. 假设客户端 1 先到达,则加锁成功,客户端 2 加锁失败
3. 客户端 1 操作共享资源
4. 客户端 1 删除 /lock 节点,释放锁
Zookeeper 不像 Redis 那样,需要考虑锁的过期时间问题,它是采用了「临时节点」,保证客户端 1 拿到锁后,只要连接不断,就可以一直持有锁。
客户端 1 此时会与 Zookeeper 服务器维护一个 Session,这个 Session 会依赖客户端「定时心跳」来维持连接。如果 Zookeeper 长时间收不到客户端的心跳,就认为这个 Session 过期了,也会把这个临时节点删除。
同样地,基于此问题,我们也讨论一下 GC 问题对 Zookeeper 的锁有何影响:
客户端 1 创建临时节点 /lock 成功,拿到了锁
客户端 1 发生长时间 GC
客户端 1 无法给 Zookeeper 发送心跳,Zookeeper 把临时节点「删除」
户端 2 创建临时节点 /lock 成功,拿到了锁客户端 1 GC 结束,它仍然认为自己持有锁(冲突)
可见,即使是使用 Zookeeper,也无法保证进程 GC、网络延迟异常场景下的安全性。
Zookeeper 的优点:
1)不需要考虑锁的过期时间
2)watch 机制,加锁失败,可以 watch 等待锁释放,实现乐观锁
Zookeeper 的劣势:
1)性能不如 Redis
2)部署和运维成本高
3)客户端与 Zookeeper 的长时间失联,锁被释放问题
27.3.2、基于curator实现zookeeper分布式锁
看下加锁的底层源码
总结
28、 分布式Redis是前期做还是后期规模上来了再做好?为什么?
既然Redis是如此的轻量(单实例只使用1M内存),为防止以后的扩容,最好的办法就是一开始就启动较多实例。即便你只有一台服务器,你也可以一开始就让Redis以分布式的方式运行,使用分区,在同一台服务器上启动多个实例。
一开始就多设置几个Redis实例,例如32或者64个实例,对大多数用户来说这操作起来可能比较麻烦,但是从长久来看做这点牺牲是值得的。
这样的话,当你的数据不断增长,需要更多的Redis服务器时,你需要做的就是仅仅将Redis实例从一台服务迁移到另外一台服务器而已(而不用考虑重新分区的问题)。一旦你添加了另一台服务器,你需要将你一半的Redis实例从第一台机器迁移到第二台机器。
29 什么是 RedLock
分布式应用在逻辑 处理中经常会遇到并发问题。如一个操作要修改用户的状态,需要先读出用户的状态,再在内存中进行修改,改完了再还回去。但是如果有多个这样的操作同时进行,就会出现并发问题,,因为读取和修改这两个操作不是原子操作(原子操作是指不会被线程调度机制打断的操作,原子操作一旦开始,就会一直运行结束,中间不会有任何线程切换。)
29.1、什么是分布式锁
分布式锁就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。
如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
简单来说:分布式锁就是在分布式系统下用来控制同一时刻,只有一个 JVM 进程中的一个线程可以访问被保护的资源。
29.2、合格分布式锁的特征:
1)、互斥性:任意时刻,只有一个客户端能持有锁。
2)、锁超时释放:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁。
3)、可重入性:一个线程如果获取了锁之后,可以再次对其请求加锁。
4)、高性能和高可用:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效。
5)、安全性:锁只能被持有的客户端删除,不能被其他客户端删除
29.3、分布式锁的原理
分布式锁本质上就是在Redis里面占一个坑,当别的线程也要来占坑时,发现已经被占了,只好放弃或者稍后再试。
占坑一般使用setnx(set if not exists)指令,如果 key 不存在,则设置 value 给这个key,返回1;否则啥都不做,返回0。只允许一个客户端占坑,先来先占,完成操作再调用del命令释放坑。
需要注意:
1)、一定要用 SET key value [EX seconds|PX milliseconds|KEEPTTL] [NX|XX] [GET] 执行,如SET key value EX 60 NX来保证setnx和expire指令原子执行
2)、value要具有唯一性。这个是为了在解锁的时候,需要验证value是和加锁的一致才删除key(解铃还须系铃人)。同时,验证value和释放锁也要保证原子性,可以通过lua脚本来实现。如:
// 获取锁的 value 与 ARGV[1] 是否匹配,匹配则执行 del
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
29.4、死锁问题
如果逻辑执行中出现异常,del指令没有被调用,导致锁不能释放,就会造成死锁问题,锁永远得不到释放。因此需在拿到锁时设置过期时间,这样即使出现异常也能保证在到期之后释放锁。
redis1.x版本需要用两个指令来获取锁和设置过期时间,分别是setnx和expire,但是setnx和expire是两条指令而不是原子指令,如果在setnx和expire两个指令之间,服务器挂掉了也会导致expire得不到执行,也会造成死锁。解决这个问题需要使用lua脚本来使这两个指令变成一个原子操作。
Redis2.8版本中加入了set指令的拓展参数,可以使得setnx和expire指令可以原子执行。如:SET key value EX 60 NX
29.5、超时问题
如果在加锁后的逻辑处理执行时间太长,以至于超过了锁的超时机制,就会出现问题,因为这个时候,A线程持有的锁过期了,但A线程的逻辑还未处理完,这时候B线程获得了锁,仍然存在并发问题。如果这时A线程执行完成了任务,然后去释放锁,这时释放的就是B线程创建和持有的锁。
为了避免这个问题:
1)、Redis分布式锁不要用来执行较长时间的任务
2)、加锁的value是个特殊值(如uuid),只有持有锁的线程知道(可使用ThreadLocal存放在线程本地变量中),释放锁前先对比value是否相同,相同的话再释放锁。
为了防止对比时,释放锁前当前锁超时,其他线程再创建新的锁,需要使获取锁value和释放锁是一个原子操作,用lua脚本来解决。
29.6、分布式锁之过期时间到了锁失效但任务还未执行完毕
某个线程在申请分布式锁的时候,为了应对极端情况,比如机器宕机,那么这个锁就一直不能被释放。一个比较好的解决方案是,申请锁的时候,预估一个程序的执行时间,然后给锁设置一个超时时间,这样,即使机器宕机,锁也能自动释放。
但是这也带来了一个问题,就是在有时候负载很高,任务执行的很慢,锁超时自动释放了任务还未执行完毕,这时候其他线程获得了锁,导致程序执行的并发问题。
对这种情况的解决方案是:
①:锁的超时时间放大为平均执行时间的3~5倍
②:在获得锁之后,就开启一个守护线程,定时去查询Redis分布式锁的到期时间,如果发现将要过期了,就进行续期(Redisson对此做了封装,建议使用)。
29.7、代码中加锁和释放锁使用
加锁代码放在try代码块中
释放锁的代码一定放在finally代码块中,保证出现异常,一定会释放锁
29.8、分布式锁之Redlock(红锁)算法
我们通常使用Cluster集群或者哨兵集群部署来保证Redis的高可用,这两种模式都是基于主从架构数据同步复制实现的数据同步,而Redis的主从复制默认是异步的。
29.8.1、集群环境下分布式锁的问题:
在Sentinel集群中,当主节点挂掉时,从节点会取而代之,但客户端上并没有明显感知。比如第一个客户端在主节点上申请成功了一把锁,但是这把锁还没有来得及同步到从节点,主节点突然挂掉了,然后从节点变成了主节点,这个新的主节点内部没有这个锁,所以当另一个客户端过来请求加锁时,立即就批准了。这样导致系统中同样一把锁被两个客户端同时持有,不安全性由此产生。这种不安全仅在主从发生failover(失效接管)的情况下才会产生,持续的时间极短,业务系统多数情况下可以容忍。
29.8.2、Redlock的出现就是为了解决这个问题。
Redlock 红锁是为了解决主从架构中当出现主从切换导致多个客户端持有同一个锁而提出的一种算法,要使用Redlock,需要提供多个Redis Master实例,这些实例之间相互独立,没有主从关系。同很多分布式算法一样,Redlock也使用 “大多数机制“;加锁时,它会向过半节点发送 set(key,value,nx=True,ex=xxx)指令,只要过半节点set成功,就认为加锁成功。释放锁时,需要向所有节点发送del指令。
不过Redlock算法还需要考虑出错重试、时钟漂移(时钟抖动频率在10hz一下)等很多细节问题。同时因为Redlock需要向多个节点进行读写,意味着其相比单实例Redis的性能会下降一些
Redlock使用场景:非常看重高可用性,即使Redis挂了一台也完全不受影响就使用Redlock。代价是需要更多的Redis实例,性能也会下降,因此不推荐使用。
29.8.3、Redisson
git官方地址:https://github.com/redisson/redisson
Redisson是一个企业级的开源Redis Client,也提供了分布式锁的支持
上面说了为了避免死锁问题,需要加锁的同时设置有效期。但是又存在超时问题,如果超时锁失效了,任务还未执行完毕,其他线程可能获得锁,又会造成安全问题。
29.8.4、Redisson分布式锁的实现:
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://ip:port")
.addNodeAddress("redis://ip:port")
...;
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("key");
lock.lock(); // 获得锁
lock.unlock(); // 释放锁
只需要通过它的api中的lock和unlock即可完成分布式锁,具体细节交给Redisson去实现:
1)redisson所有指令都通过lua脚本执行,redis支持lua脚本原子性执行
2)watch dog自动延期机制
redisson设置一个key的默认过期时间为30s,如果某个客户端持有一个锁超过了30s,还想继续持有这把锁,怎么办?
redisson中有一个watchdog的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔10秒检查一下锁是否释放,如果没有释放,则帮你把key的超时时间重新设为30s。这样的话,就算一直持有锁也不会出现key过期了,其他线程获取到锁的问题了。
redisson的“看门狗”逻辑保证了没有死锁发生。
如果机器宕机了,看门狗也就没了。此时就不会延长key的过期时间,到了30s之后就会自动过期了,其他线程可以获取到锁)
注意:
1):watchDog 只有在未显示指定加锁超时时间(leaseTime)时才会生效
2):lockWatchdogTimeout 设定的时间leaseTime(过期时间)不要太小 ,比如设置的是 100 毫秒,由于网络直接导致加锁完后,watchdog 去延期时,这个 key 在 redis 中已经被删除了
3)支持可重入锁
4)加锁几次,释放几次
29.8.5、Redisson实践
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
</dependency>
配置连接Redis
// 1. Create config object
Config config = new Config();
config.useClusterServers().addNodeAddress("redis://127.0.0.1:7181");
config = Config.fromYAML(new File("config-file.yaml"));
创建Redisson实例
RedissonClient redisson = Redisson.create(config);
获取map缓存,通过Redisson封装的ConcurrentMap的实现
RMap<MyKey, MyValue> map = redisson.getMap("myMap");
获取分布式锁,通过Redisson封装的Lock的实现
RLock lock = redisson.getLock("myLock");
获取基于Redis的java.util.concurrent.ExecutorService的实现
RExecutorService executor = redisson.getExecutorService("myExecutorService");
加锁和释放锁
lock.lock(); // 获得锁
lock.unlock(); // 释放锁
29.8.6、与Spring整合
引入依赖
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<!-- for Spring Data Redis v.2.4.x -->
<artifactId>redisson-spring-data-24</artifactId>
<version>3.15.0</version>
</dependency>
配置 RedissonConfig,注册RedissonConnectionFactory到Spring容器中
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Redis分布式锁,Redisson
*
* @date 2022/11/18
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redissonClient) {
return new RedissonConnectionFactory(redissonClient);
}
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config redissonConfig = new Config();
// 单机模式
SingleServerConfig singleServerConfig = redissonConfig.useSingleServer();
singleServerConfig.setPassword("xxx");
// Redis url should start with redis:// or rediss:// (for SSL connection)
singleServerConfig.setAddress("redis://host:port");
// 主从模式
// redissonConfig.useMasterSlaveServers()
// .setMasterAddress("redis://127.0.0.1:6379")
// .addSlaveAddress(" redis://127.0.0.1:6379");
// 哨兵模式
/* redissonConfig.useSentinelServers()
.addSentinelAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6389")
.setMasterName("master")
.setPassword("password")
.setDatabase(0);*/
// 集群模式
/* redissonConfig.useClusterServers()
// use "rediss://" for SSL connection
.addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:7181", "redis://127.0.0.1:7182");*/
// Redisson客户端
RedissonClient redissonClient = Redisson.create(redissonConfig);
// 为RedissonUtil注入RedissonClient 依赖
RedissonUtil.setRedissonClient(redissonClient);
return redissonClient;
}
}
使用 RedissonClient 加解锁
需要注意,tryLock、lock、unlock 在执行失败(获取锁/释放锁)时,就会抛出异常,需要进行异常处理判断
1)在需要加锁的服务里注入RedissonClient 依赖,即可使用
@Autowired
private RedissonClient redissonClient;
@Test
void redissonTest() {
RLock lock = redissonClient.getLock("myRedissonLock");
try {
lock.lock();
// do something
} finally {
lock.unlock();
}
}
封装 RedissonUtil,为 RedissonUtil 注入 RedissonClient 依赖 RedissonUtil
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class RedissonUtil {
private RedissonUtil() {
}
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonUtil.class);
private static RedissonClient redissonClient;
public static void setRedissonClient(RedissonClient redissonClient) {
RedissonUtil.redissonClient = redissonClient;
}
/**
* 尝试获取锁
*
* @param lockKey
* @return 成功true,失败false
*/
public static Boolean tryLock(String lockKey) {
try {
RLock rLock = redissonClient.getLock(lockKey);
return rLock.tryLock();
} catch (Exception e) {
LOGGER.error("tryLock error: " + e.getMessage(), e);
return false;
}
}
/**
* 超时尝试获取锁
*
* @param lockKey
* @param waitTime 超时等待时间,单位秒
* @return 成功true,失败false
*/
public static Boolean tryLock(String lockKey, long waitTime) {
try {
RLock rLock = redissonClient.getLock(lockKey);
return rLock.tryLock(waitTime, TimeUnit.SECONDS);
} catch (Exception e) {
LOGGER.error("tryLock error: " + e.getMessage(), e);
return false;
}
}
/**
* 加锁,默认持有锁30秒
*
* @param lockKey
* @return 成功true,失败false
*/
public static Boolean lock(String lockKey) {
try {
RLock rLock = redissonClient.getLock(lockKey);
// 默认加锁生存时间为30s
rLock.lock();
return true;
} catch (Exception e) {
LOGGER.error("lock error: " + e.getMessage(), e);
return false;
}
}
/**
* 加锁,自定义持有时间,单位秒
*
* @param lockKey
* @param holdTime 持有时长
* @return 成功true,失败false
*/
public static Boolean lock(String lockKey, long holdTime) {
try {
RLock rLock = redissonClient.getLock(lockKey);
// 默认加锁生存时间为30s
rLock.lock(holdTime, TimeUnit.SECONDS);
return true;
} catch (Exception e) {
LOGGER.error("lock error: " + e.getMessage(), e);
return false;
}
}
/**
* 释放锁
*
* @param lockKey
* @return 成功true,失败false
*/
public static Boolean unlock(String lockKey) {
try {
RLock rLock = redissonClient.getLock(lockKey);
rLock.unlock();
return true;
} catch (Exception e) {
LOGGER.error("unlock error: " + e.getMessage(), e);
return false;
}
}
}
为 RedissonUtil 注入 RedissonClient 依赖(建议在RedissonConfig中在创建完RedissonClient对象之后即为RedissonUtil注入RedissonClient 依赖):
@PostConstruct
public void initRedissonClient() {
RedissonUtil.setRedissonClient((RedissonClient)
applicationContext.getBean("redissonClient"));
}
String lockKey = "myRedissonLock";
try {
Boolean isLock = RedissonUtil.lock(lockKey);
// do something
} finally {
RedissonUtil.unlock(lockKey);
}
29.8.7、Redisson源码
加锁:
/**
* KEYS[1] 表示的是 getName() ,代表的是锁名
* ARGV[1] 表示的是 internalLockLeaseTime 默认值是30s
* ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用锁对象id+线程id, 表示当前访问线程,用于区分不同服务器上的线程
*
* @param waitTime -1
* @param leaseTime 30
* @param unit
* @param threadId
* @param command
* @param <T>
* @return
*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 如果锁空闲
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 则向redis中添加一个key为指定锁名的set,并且向set中添加一个field为线程id,值=1的键值对,表示此线程的重入次数为1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置生存时间
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果锁存在,且是当前线程持有的
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 重入次数+1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置生存时间
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 锁存在, 但不是当前线程加的锁,则返回锁的过期时间
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
解锁:
···/**
* KEYS[1] 表示的是getName() 代表锁名test_lock
* KEYS[2] 表示getChanelName() 表示的是发布订阅过程中使用的Chanel
* ARGV[1] 表示的是LockPubSub.unLockMessage 是解锁消息,实际代表的是数字 0,代表解锁消息
* ARGV[2] 表示的是internalLockLeaseTime 默认的有效时间 30s
* ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是当前锁id+线程id
*
* @param threadId
* @return
*/
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 如果锁不存在;或者锁存在,但不是当前线程锁加的锁,则返回nil结束
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 锁是当前线程持有,重入次数减1
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 重入次数减1后仍大于0,则续生存时间
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + // 删除锁
"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁删除的消息,channel为 redisson_lock__channel:{lock_name}
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
总结:
Redission锁的结构为hash,key为锁的key
Redission加锁会在锁的hash中添加field为当前线程id,value为1的元素
Redission解锁会获取当前线程的id判断是不是当前线程加的锁,会则释放成功,否则释放失败
注意:Redission适合自动续期,任务处理完手动释放锁的场景。不适合超时自动释放的场景,超时自动释放的场景使用setnx+expire
分布式锁的应用
解决优先查询,然后减少数据,然后在更新的操作
37、 Redis与Memcached的区别
两者都是非关系型内存键值数据库,现在公司一般都是用 Redis 来实现缓存,而且 Redis 自身也越来越强大了!Redis 与 Memcached 主要有以下不同:
1) memcached所有的值均是简单的字符串,redis作为其替代者,支持更为丰富的数据类型
2) redis的速度比memcached快很多
3) redis可以持久化其数据
38、如何保证缓存与数据库双写时的数据一致性?
在做系统优化时,想到了将数据进行分级存储的思路。因为在系统中会存在一些数据,有些数据的实时性要求不高,比如一些配置信息。基本上配置了很久才会变一次。而有一些数据实时性要求非常高,比如订单和流水的数据。所以这里根据数据要求实时性不同将数据分为三级。
第1级:订单数据和支付流水数据;这两块数据对实时性和精确性要求很高,所以不添加任何缓存,读写操作将直接操作数据库。
第2级:用户相关数据;这些数据和用户相关,具有读多写少的特征,所以我们使用redis进行缓存。
第3级:支付配置信息;这些数据和用户无关,具有数据量小,频繁读,几乎不修改的特征,所以我们使用本地内存进行缓存。
但是只要使用到缓存,无论是本地内存做缓存还是使用 redis 做缓存,那么就会存在数据同步的问题,因为配置信息缓存在内存中,而内存时无法感知到数据在数据库的修改。这样就会造成数据库中的数据与缓存中数据不一致的问题。接下来就讨论一下关于保证缓存和数据库双写时的数据一致性。
38.1、解决方案
那么我们这里列出来所有策略,并且讨论他们优劣性。
先更新数据库,后更新缓存
先更新数据库,后删除缓存
先更新缓存,后更新数据库
先删除缓存,后更新数据库
38.1.1、先更新数据库,后更新缓存
这种场景一般是没有人使用的,主要原因是在更新缓存那一步,为什么呢?因为有的业务需求缓存中存在的值并不是直接从数据库中查出来的,有的是需要经过一系列计算来的缓存值,那么这时候后你要更新缓存的话其实代价是很高的。如果此时有大量的对数据库进行写数据的请求,但是读请求并不多,那么此时如果每次写请求都更新一下缓存,那么性能损耗是非常大的。
举个例子比如在数据库中有一个值为 1 的值,此时我们有 10 个请求对其每次加一的操作,但是这期间并没有读操作进来,如果用了先更新数据库的办法,那么此时就会有十个请求对缓存进行更新,会有大量的冷数据产生,如果我们不更新缓存而是删除缓存,那么在有读请求来的时候那么就会只更新缓存一次。
38.1.2、先更新缓存,后更新数据库
这一种情况应该不需要我们考虑了吧,和第一种情况是一样的。而且存在弊端,更新数据库失败后,缓存和数据不一致
38.1.3、先删除缓存,后更新数据库
该方案也会出问题,具体出现的原因如下。
此时来了两个请求,请求 A(更新操作) 和请求 B(查询操作)
请求 A 会先删除 Redis 中的数据,然后去数据库进行更新操作
此时请求 B 看到 Redis 中的数据时空的,会去数据库中查询该值,补录到 Redis 中
但是此时请求 A 并没有更新成功,或者事务还未提交
那么这时候就会产生数据库和 Redis 数据不一致的问题。如何解决呢?其实最简单的解决办法就是延时双删的策略。
但是上述的保证事务提交完以后再进行删除缓存还有一个问题,就是如果你使用的是 Mysql 的读写分离的架构的话,那么其实主从同步之间也会有时间差。
此时来了两个请求,请求 A(更新操作) 和请求 B(查询操作)
请求 A 更新操作,删除了 Redis
请求主库进行更新操作,主库与从库进行同步数据的操作
请 B 查询操作,发现 Redis 中没有数据
去从库中拿去数据
此时同步数据还未完成,拿到的数据是旧数据
此时的解决办法就是如果是对 Redis 进行填充数据的查询数据库操作,那么就强制将其指向主库进行查询。
38.1.3、先更新数据库,后删除缓存
问题:这一种情况也会出现问题,比如更新数据库成功了,但是在删除缓存的阶段出错了没有删除成功,那么此时再读取缓存的时候每次都是错误的数据了。
此时解决方案就是利用消息队列进行删除的补偿。具体的业务逻辑用语言描述如下:
请求 A 先对数据库进行更新操作
在对 Redis 进行删除操作的时候发现报错,删除失败
此时将Redis 的 key 作为消息体发送到消息队列中
系统接收到消息队列发送的消息后再次对 Redis 进行删除操作
但是这个方案会有一个缺点就是会对业务代码造成大量的侵入,深深的耦合在一起,所以这时会有一个优化的方案,我们知道对 Mysql 数据库更新操作后再 binlog 日志中我们都能够找到相应的操作,那么我们可以订阅 Mysql 数据库的 binlog 日志对缓存进行操作。
每种方案各有利弊,比如在第二种先删除缓存,后更新数据库这个方案我们最后讨论了要更新 Redis 的时候强制走主库查询就能解决问题,那么这样的操作会对业务代码进行大量的侵入,但是不需要增加的系统,不需要增加整体的服务的复杂度。最后一种方案我们最后讨论了利用订阅 binlog 日志进行搭建独立系统操作 Redis,这样的缺点其实就是增加了系统复杂度。其实每一次的选择都需要我们对于我们的业务进行评估来选择,没有一种技术是对于所有业务都通用的。没有最好的,只有最适合我们的。
39、Redis常见性能问题和解决方案?
Redis是一个高性能的内存数据库,它可以提供快速的数据存储和访问,支持多种数据结构和功能。但是在实际使用过程中,也会遇到一些性能问题,比如内存不足、持久化开销、主从复制延迟等。这些问题可能会影响Redis的稳定性和可用性,甚至导致数据丢失或服务中断。那么,如何优化Redis的性能,避免或解决这些问题呢?本文将介绍一些常见的Redis性能问题和解决方案,希望对你有所帮助。
39.1、内存不足问题
Redis是一个基于内存的数据库,如果内存不足,就会导致Redis崩溃。为了防止这种情况发生,我们可以采取以下措施:
39.1.1、设置合理的内存上限
Redis提供了maxmemory参数,可以限制Redis使用的最大内存。当内存达到上限时,Redis会根据maxmemory-policy参数指定的策略,自动淘汰一些数据,释放内存空间。常用的策略有volatile-lru(淘汰最近最少使用的带过期时间的键)、allkeys-lru(淘汰最近最少使用的任意键)、volatile-random(随机淘汰带过期时间的键)、allkeys-random(随机淘汰任意键)等。我们可以根据业务需求,选择合适的策略。
39.1.2、优化数据结构
Redis支持多种数据结构,比如字符串、列表、集合、哈希、有序集合等。不同的数据结构占用的内存空间不同,我们可以根据数据特点,选择合适的数据结构,减少内存消耗。例如,如果数据是简单的键值对,可以使用字符串;如果数据是多个字段组成的对象,可以使用哈希;如果数据是有序的数值集合,可以使用有序集合等。此外,Redis还提供了一些特殊的编码方式,比如intset、ziplist等,可以进一步压缩数据大小。
39.1.3、清理过期数据
Redis支持给数据设置过期时间,当数据过期后,Redis会自动删除它们。但是,在高并发场景下,Redis可能没有足够的时间来及时清理过期数据,导致内存占用增加。为了解决这个问题,我们可以调整Redis清理过期数据的策略。Redis提供了两种策略:定时删除和惰性删除。定时删除是指每隔一段时间,Redis会扫描一部分键,删除其中已经过期的键;惰性删除是指每次访问一个键时,Redis会检查它是否已经过期,如果是,则删除它。我们可以通过修改hz参数(定时删除频率)和maxmemory-samples参数(每次扫描键的数量)来调整定时删除策略;我们也可以通过修改activerehashing参数(是否启用主动重哈希)来减少惰性删除导致的哈希表碎片
39.1.3、持久化开销问题
Redis提供了两种持久化方式:RDB和AOF。RDB是指定期生成内存快照文件;AOF是记录每次写操作日志。这两种方式各有优缺点,我们可以根据实际需求,选择合适的方式或者同时使用两种方式。
RDB的优点有:
RDB文件是一个紧凑的二进制文件,它保存了Redis在某个时间点上的数据集,非常适合用于备份和灾难恢复。
RDB文件的生成和载入不会阻塞主进程,因为它们是由子进程来完成的,保证了Redis的高性能。
RDB文件的恢复速度比AOF文件快,因为它只需要载入数据到内存即可。
RDB的缺点有:
RDB文件是按照一定的时间间隔来生成的,如果在生成期间发生故障,可能会丢失一部分数据。
RDB文件的生成需要fork出一个子进程,如果数据集很大,fork操作可能会耗时较长,造成服务器在一段时间内停止处理请求。
RDB文件的压缩和校验可能会消耗一些CPU和内存资源。
AOF的优点有:
AOF文件可以保证数据的安全性,因为它可以记录每次写操作,并且可以配置同步策略来控制写入磁盘的频率。
AOF文件可以通过重写机制来移除冗余命令,减少文件大小,并且可以在重写过程中删除一些误操作的命令。
AOF文件是一个纯文本文件,可以方便地查看和编辑。
AOF的缺点有:
AOF文件通常比RDB文件大,因为它需要记录更多的信息,并且可能包含一些重复或无效的命令。
AOF文件的恢复速度比RDB文件慢,因为它需要重新执行所有的命令,并且可能遇到一些错误或不兼容的情况。
AOF文件的写入和同步可能会影响Redis的性能,尤其是在高并发场景下。
主从复制延迟问题
Redis支持主从复制功能,可以让一个或多个从服务器复制主服务器的数据。这样可以提高数据的可用性和读取性能,也可以实现故障转移和负载均衡。但是,在主从复制过程中,也可能会出现一些延迟问题,导致主从服务器之间的数据不一致。这些问题可能有以下原因:
网络带宽不足或者网络延迟过高,导致从服务器接收主服务器发送的数据包速度慢或者丢包率高。
主服务器处理写请求过多或者执行慢命令过多,导致从服务器积累了大量待处理的数据包或者命令。
从服务器处理能力不足或者执行慢命令过多,导致从服务器无法及时处理主服务器发送过来的数据包或者命令。
为了解决主从复制延迟问题,我们可以采取以下措施:
1)优化网络环境,选择高速稳定的网络连接主从服务器,并且尽量减少网络距离和中间节点。
2)优化主服务器的写性能,尽量减少慢命令的使用,比如keys、hgetall等,可以使用scan、hscan等替代;也可以使用管道或者事务来批量执行命令,减少网络开销。
3)优化从服务器的读性能,尽量减少慢命令的使用,比如sort、sunion等,可以使用有序集合或者集合交集等替代;也可以使用懒惰删除或者异步删除来处理过期键,避免阻塞读操作。
4)监控主从服务器的延迟情况,可以使用info replication命令查看主从服务器之间的偏移量和延迟时间,也可以使用psync命令查看主从服务器之间的复制进度。
40、Redis官方为什么不提供Windows版本?
因为目前Linux版本已经相当稳定,而且用户量很大,无需开发windows版本,反而会带来兼容性等问题。
Redis官方没有提供Windows版本有几个原因。
1).Redis的开发团队规模较小,由三四名核心开发者组成。他们更加熟悉和习惯Unix-like系统,在这些系统上进行开发和测试可以更高效地进行。然而,提供Windows版本会消耗较多资源,可能会影响其他开发进度。
2).Redis利用了Unix系统的特性,例如fork()调用来实现持久化和主从复制等功能。在Windows上实现这些功能会更加复杂,并且需要额外的开发工作量。因此,为了保持团队高效,Redis团队专注于在Unix-like系统上进行开发和测试。
3).一个字懒,多一事不如少一事,Redis是开源软件。
Redis的Windows版本目前稳定版为3.0,最初由微软维护,后来由tporadowski接手维护。
41、一个字符串类型的值能存储最大容量是多少?
512M
42、Redis如何做大量数据插入?
42.1、Redis管道(pipeline)流操作
总的来说Redis的管道可以在大量数据需要一次性操作完成的时候,使用Pipeline进行批处理,将多次操作合并成一次操作,可以减少链路层的时间消耗。
流水线:
redis的读写速度十分快,所以系统的瓶颈往往是在网络通信中的延迟。
redis可能会在很多时候处于空闲状态而等待命令的到达。
为了解决这个问题,可以使用redis的流水线,流水线是一种通讯协议,类似一个队列批量执行一组命令。
42.2、redis的管道 pipeline批量set
@RequestMapping(value = "/redisPipeline", method = RequestMethod.POST)
@ApiOperation(value = "redis的管道 pipeline 添加数据测试")
public void redistest(){
log.info("redistest开始");
// 开始时间
long start = System.currentTimeMillis();
RedisSerializer stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(stringSerializer);
List<String> result = redisTemplate.executePipelined(new SessionCallback() {
//执行流水线
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
//批量处理的内容
for (int i = 0; i < 10000; i++) {
operations.opsForValue().set("redistest:" + "k" + i, "v" + i);
}
//注意这里一定要返回null,最终pipeline的执行结果,才会返回给最外层
return null;
}
});
// 结束时间
long end = System.currentTimeMillis();
log.info("运行时间:"+(end-start));
}
//耗时:309;
42.2、批量操作multi和pipeline效率的比较
multi和pipeline的区别在于multi会将操作都即刻的发送至redis服务端queued(队列)起来,每条指令放入queued的操作都有一次通信开销,执行exec时redis服务端再一口气执行queued队列里的指令,pipeline则是在客户端本地queued起来,执行exec时一次性的发送给redis服务端,这样只有一次通信开销。比如我有5个incr操作,multi的话这5个incr会有5次通信开销,但pipeline只有一次。
所以在批量操作使用pipeline效率会更高。
43、Redis里面有1亿个key,找出10w个key固定前缀?
可以用keys和scan来查看key的情况。keys只能在我们很清楚查找的内容很少时使用,如果keys *查出来有几千万条数据,对于客户端来说我们很难一次性处理这么多信息;对于服务器来说,这种规模的查找会因为O(n)的复杂度造成服务器卡顿。因为Redis是单线程程序,顺序执行所有指令,所以其它客户端的指令就只能等待。
43.1、scan指令的使用
提供了limit参数,我们可以控制返回数据的规模,要注意的是,返回的结果可能会有重复,需要客户端去重。首先插入100条数据,如图:
而后我们用scan这个指令去查找:
当这个指令返回的数字标号为0时,说明已经遍历完成。注意如果返回的是empty list or set时,只要标号不为0,那么遍历就还没有结束。
43.2、一个规律
为了后面描述字典的结构和scan遍历跳转的原则便于理解,这里讲一个规律。
任何自然数与2^n进行求余(模)运算,最终的结果其实跟这个数字和(2^n - 1)进行位'&'运算的结果是相同的。这是因为,对2^n求余或者说mod运算的本质,是找到这个数字在除开2^n整数倍之后,还能剩下的数字是几,那么对于2^n - 1来说,刚好就是比2^n小,且二进制全为1,这个全为1的二进制数,和这个自然数进行'&'运算可以真实的复刻出自然数中比2^n小的位还剩多少(对于大于2^n的高位来说,其实都是2^n的整数倍,在和全1的低位进行与的过程中,这一部分数据都会归0),两者本质相同。如下图描述了14005对8进行求余运算,结果其实和14005与7的'&'运算结果一致:
43.3、扩容与缩容
首先,当我们有一个长度为8的数组,我们需要把一大堆元素存储在这个长度为8的数组上,怎么办呢?最简单的办法就是求余,因为任何一个数字,对8进行求余的结果,必定为0-7的任何一个数,这个时候我们可以根据这个余数安排这些元素放在数组上的位置。如果两个元素对8求与的结果是相同的,比如11和19,对8求余的结果都为3(二进制为011),那么我们可以把11和19都放在3这个元素的位置上,这种现象我们可以叫做元素的碰撞。这两个元素在3这个位置将会由一个链表进行维护。
此时,如果这个长度为8的数组进行了扩容,长度增加一倍,变为16,再用对16求余的办法对现有的元素重新进行位置摆放,这个过程我们叫做rehash,那么11和19的位置又在哪里呢?对16求余我们发现,他们的余数为11和3,注意换算为二进制就是1011和0011,那么在新的数组结构中,11和19就会被安放在11和3这两个位置上。
而缩容的过程,与上述相反,也就是之前在1011和0011这两个位置上的元素又会合并到0011这个位置上。
43.4、scan的遍历策略以及原因
scan遍历正是利用了上述的规律和hash的规律,找到了一种独特的遍历路径,使得scan在遍历过程中,无论是发生扩容还是缩容,都尽量避免了历史链表元素的重复遍历和完全避免了遗漏。
这种策略不同于我们一般对数组进行从0到length-1的遍历,也不同于从length-1到0的遍历,而是采用二进制高位反向进位加法,计算下一次遍历的槽点。如图:
而理解了上述的两个规律,我们会发现,这种高位进位法就能够保证在扩容或者缩容时达到完全避漏尽量不重的效果:
如图,假如scan完成了原数组10位的所有元素遍历,此时发生了数组的扩容,形成了下方的数组结构,那么按照scan的遍历策略,他就会去遍历原数组01位置,即现数组001位置,也就是说,它遍历的下一个槽点没有因为数组的扩容而发生变化。
这里需要说明的是,redis是单线程的,扩容和缩容是发生在本次scan结束之后,所以对于扩容来说,遍历历史的所有数据结果不会因为这个扩容而发生改变。
对于缩容来说,如果此时遍历完了010的所有元素,此刻发生了缩容,那么原本即将遍历的110的槽点变成了新数组的10位置(去掉高位),就会重复遍历原010位置的元素,所以说会造成一定的重复。所以对于缩容来说,是会有少量的重复元素,但量占整体元素的比例应该是很低的。
43.5、渐进式rehash
Java的 HashMap在扩容时会一次性将旧数组下挂接的元素全部转移到新数组下面。如果HashMap中元素特别多,线程就会出现卡顿现象。Redis为了解决这个问题,采用“渐进式rehash”。
它会同时保留旧数组和新数组,然后在定时任务中以及后续对 hash 的指令操作中渐渐地将旧数组中挂接的元素迁移到新数组上。这意味着要操作处于rehash中的字典,需要同时访问新旧两个数组结构。如果在旧数组下面找不到元素,还需要去新数组下面寻找。
scan也需要考虑这个问题,对于rehash中的字典,它需要同时扫描新旧槽位,然后将结果融合后返回给客户端。
43.6、系列指令
scan指令是一系列指令,除了可以遍历所有的key之外,还可以对指定的容器集合进行遍历。比如zscan遍历zset集合元素,hscan遍历 hash字典的元素,sscan遍历set集合的元素。
43.7、避免大key
出现大key,会造成内存分配与回收时服务的卡顿,对集群迁移也会造成卡顿,所以在平时的业务开发中,要尽量避免大key的产生。
如果你观察到Redis 的内存大起大落,这极有可能是因为大key导致的,这时候你就需要定位出具体是哪个key,进一步定位出具体的业务来源,然后再改进相关业务代码设计。
对于大key的量化定义:
·一个STRING类型的Key,它的值为5MB(数据过大)
·一个LIST类型的Key,它的列表数量为20000个(列表数量过多)
.一个ZSET类型的Key,它的成员数量为10000个(成员数量过多)
·一个HASH格式的Key,它的成员数量虽然只有1000个但这些成员的value总大小为100MB(成员体积过大)
如图,可以使用指令进行大key的查询:
44、使用Redis做过异步队列吗,是如何实现的
45、Redis如何实现延时队列
使用Redis实现延迟队列
实现思路
redis作为一款高性能的NoSQL数据库,具备快熟读写,高并发,数据持久化等特点,非常适用与实现延迟队列 ,redis提供了丰富的数据结构.
其中利用redis的ZSET集合 (有序集合)数据结构就可以实现一个简单的延迟队列
redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,
将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小
(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。
详细步骤
本文将介绍如何使用Redis的Sorted Set数据结构来实现延迟队列,并提供一个完整的示例代码。同时,我们还将会给出对应的测试用例和测试结果。
如下我先给同学们概括下,针对Spring Boot项目,如何利用Redis实现延迟队列的一些实现步骤?
45.1、引入相关依赖 (集成redis)
<!--集成redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
45.2、配置redis
#redis配置
Spring:
redis:
database: 0 #Redis数据库索引(默认为0)
host: 127.0.0.1 #redis服务器ip,由于我是搭建在本地,固指向本地ip
port: 6379 #redis服务器连接端口
password: #redis服务器连接密码(默认为空)
# 连接池配置
jedis.pool:
max-active: 20 #连接池最大连接数(使用负值表示没有限制)
max-wait: -1 #连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 #连接池中的最大空闲连接
min-idle: 0 #连接池中的最小空闲连接
timeout: 1000 #连接超时时间(毫秒)。我设置的是1秒
45.3、创建redis配置
@Configuration
public class RedisConfig {
/**
* RedisTemplate配置
*/
@Bean("redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用fastjson进行序列化处理,提高解析效率
FastJsonRedisSerializer<Object> serializer = new FastJsonRedisSerializer<Object>(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
// key的序列化采用StringRedisSerializer
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
// 使用fastjson时需设置此项,否则会报异常not support type
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
return template;
}
/**
* redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param connectionFactory
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
45.4、序列化
/**
* @Description:使用fastjson实现redis的序列化
*/
public class FastJsonRedisSerializer<T> implements RedisSerializer<T> {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class<T> clazz;
public FastJsonRedisSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return (T) JSON.parseObject(str, clazz);
}
}
45.5、创建消息类 DelayMessage
这里定义一个消息类 , 包含消息的id,消息内容,以及到期时间(消息的执行时间) , 代码如下
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessage implements Serializable {
/**
* 切记实例化
*/
private static final long serialVersionUID = -7671756385477179547L;
/**
* 消息 id
*/
private String id;
/**
* 消息内容
*/
private String content;
/**
* 消息到期时间(指定当前消息在什么时间开始消费(时间戳))
*/
private long expireTime;
}
45.6、创建延迟队列类 DelayQueue
创建一个延迟队列类 , 提供,添加消息,删除消息,和获取消息的方法 , 具体代码如下
@Component
public class DelayQueue {
/**
* key后面拼接当前机器的内网ip : 用于集群区分,解决集群出现的并发问题
*/
private static final String KEY = "delay_queue:" + getHostAddress();
@Autowired
private RedisTemplate redisTemplate;
/**
* 添加消息到延时队列中
*/
public void put(DelayMessage message) {
redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime());
}
/**
* 从延时队列中删除消息
*/
public Long remove(DelayMessage message) {
Long remove = redisTemplate.opsForZSet().remove(KEY, message);
return remove;
}
/**
* 获取延时队列中已到期的消息
*/
public List<DelayMessage> getExpiredMessages() {
// 1 : 获取到开始时间
long minScore = 0;
// 2 : 获取到结束时间
long maxScore = System.currentTimeMillis();
// 3 : 获取到指定范围区间的数据列表
Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore);
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
// 4 : 把对象进行封装,返回
List<DelayMessage> result = new ArrayList<>();
for (Object message : messages) {
DelayMessage delayMessage = JSONObject.parseObject(JSON.toJSONString(message), DelayMessage.class);
result.add(delayMessage);
}
return result;
}
/**
* 获取地址(服务器的内网地址)(内网ip)
*
* @return
*/
public static String getHostAddress() {
InetAddress localHost = null;
try {
localHost = InetAddress.getLocalHost();
} catch (
UnknownHostException e) {
e.printStackTrace();
}
return localHost.getHostAddress();
}
}
45.7、DelayMessageHandler 消息处理类
创建一个消息处理累, 添加一个处理过期的消息,写个定时任务,间隔1s轮询延时队列中已到期的任务,如果获取不到为空,
则不进行消息处理的逻辑 , 反之继续轮询
@Component
public class DelayMessageHandler {
public static SimpleDateFormat dateTimeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private DelayQueue delayQueue;
/**
* 处理已到期的消息(轮询)
*/
@Scheduled(fixedDelay = 1000)
public void handleExpiredMessages() {
String currentTime = getCurrentTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中
List<DelayMessage> messages = delayQueue.getExpiredMessages();
List<DelayMessage> messages_2 = delayQueue.getExpiredMessages();
System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessage message : messages) {
System.out.println(message.getId() + " --> 消息开始处理");
try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(message.getId() + " --> 消息处理结束");
// 2.2 : 处理完消息,删除消息
delayQueue.remove(message);
}
}
}
/**
* 获取到的当前时分秒
*
* @return
*/
public static String getCurrentTime() {
String format = dateTimeFormater.format(new Date());
return format;
}
}
执行结果 : (我们可以看到 , 消息正在慢慢的被消费)
2023-11-03 15:06:01 待处理消息数量:0
2023-11-03 15:06:02 待处理消息数量:0
2023-11-03 15:06:03 待处理消息数量:0
2023-11-03 15:06:04 待处理消息数量:0
# 此处开始调用接口 , 往延迟队列中添加消息
2023-11-03 15:06:05 待处理消息数量:4
2023-11-03 15:06:05 :1 --> 消息开始处理
2023-11-03 15:06:05 :1 --> 消息处理结束
2023-11-03 15:06:05 :13 --> 消息开始处理
2023-11-03 15:06:05 :13 --> 消息处理结束
2023-11-03 15:06:05 :5 --> 消息开始处理
2023-11-03 15:06:05 :5 --> 消息处理结束
2023-11-03 15:06:05 :9 --> 消息开始处理
2023-11-03 15:06:05 :9 --> 消息处理结束
2023-11-03 15:06:18 待处理消息数量:12
2023-11-03 15:06:18 :10 --> 消息开始处理
2023-11-03 15:06:18 :10 --> 消息处理结束
2023-11-03 15:06:18 :14 --> 消息开始处理
2023-11-03 15:06:18 :14 --> 消息处理结束
2023-11-03 15:06:18 :2 --> 消息开始处理
2023-11-03 15:06:18 :2 --> 消息处理结束
2023-11-03 15:06:18 :6 --> 消息开始处理
此处我们会发现一个问题 , @Scheduled 注解是轮询执行的 , 如果上一个任务没执行完毕 , 定时器会等待 , 等待上一次执行完毕
也就是说 , @Scheduled 注解表示异步执行的 , 那么就会出现一个问题 , 每一个消息处理都会耗时3秒,
假设有 A B 两条消息 , 消息的过期时间是一致的 , 那么这两个消息会被同时从缓存中取出准备消费 ,假设A消息第一个开始消费 ,
那么B消息,就要等待3秒 , 等A消息执行完成,才开始消费B消息 , 那么就会出现消息堆积,延迟消费的情况 , 本来14:00就要消费的消息,等到了 14:10 才开始消费(可能会更晚) ,
如果消息量足够大的情况下 , 就会出现问题 , 内存泄漏 , 消息堆积 , 延迟消费等情况
45.8、解决消息延迟
解决办法 : 开线程去执行 (使用线程池) , 使用以下代码 , 我们消费一条消息,就需要创建一个线程去后台消费 , 就会解决了上面的问题 ,
(这里需要用到线程池,我为了偷懒 ,就简单模拟了一下)
/**
* 处理已到期的消息(轮询)
*/
@Scheduled(fixedDelay = 1000)
public void handleExpiredMessages() {
String currentTime = getCurrentTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中
List<DelayMessage> messages = delayQueue.getExpiredMessages();
System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessage message : messages) {
// 2.1 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
new Thread(() -> {
System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
// 2.2 : 处理完消息,删除消息
delayQueue.remove(message);
}).start();
}
}
}
执行结果 : 开启线程异步执行消息
2023-11-03 15:18:33 待处理消息数量:0
2023-11-03 15:18:34 待处理消息数量:0
2023-11-03 15:18:35 待处理消息数量:0
2023-11-03 15:18:36 待处理消息数量:4
2023-11-03 15:18:36 :1 --> 消息开始处理
2023-11-03 15:18:36 :13 --> 消息开始处理
2023-11-03 15:18:36 :5 --> 消息开始处理
2023-11-03 15:18:36 :9 --> 消息开始处理
2023-11-03 15:18:37 待处理消息数量:4
2023-11-03 15:18:37 :1 --> 消息开始处理 // 注意:(此消息被重复消费了)
2023-11-03 15:18:37 :13 --> 消息开始处理
2023-11-03 15:18:37 :5 --> 消息开始处理
2023-11-03 15:18:37 :9 --> 消息开始处理
2023-11-03 15:18:38 待处理消息数量:8
2023-11-03 15:18:38 :1 --> 消息开始处理
2023-11-03 15:18:38 :5 --> 消息开始处理
2023-11-03 15:18:38 :9 --> 消息开始处理
2023-11-03 15:18:38 :13 --> 消息开始处理
2023-11-03 15:18:38 :10 --> 消息开始处理
2023-11-03 15:18:38 :6 --> 消息开始处理
2023-11-03 15:18:38 :2 --> 消息开始处理
2023-11-03 15:18:38 :14 --> 消息开始处理
2023-11-03 15:18:36 :9 --> 消息处理结束
2023-11-03 15:18:36 :5 --> 消息处理结束
2023-11-03 15:18:36 :1 --> 消息处理结束
2023-11-03 15:18:36 :13 --> 消息处理结束
我们使用了开启新线程的方式来消费消息 , 消息延迟的问题解决了 , 但是又出现了新的问题 , 消息会出现重复消费的情况
问题的原因 : 我们第一次定时 , 取出了符合条件的4条过期的消息 , 我们开启了4个线程去执行 , 当第二秒 , 我们又获取了符合条件的消息 ,
因为第一次获取的消息执行需要时间 , 那么我们第二次拿消息的时候 , 就会有可能把第一次的4条消息 , 也拿出来 , 然后开线程再次消费 , 就会出现重复消费的情况了
45.9、解决重复消费方案
这个问题出现原因是 , 当前线程不知道这个消息已经被其他线程正在处理了 ,只要解决这个问题 ,
当前线程开始处理这个消息,先判断当前消息有没有被其他线程处理 , 如果正在处理,则不进行处理了 , 如果没处理,则开始进行处理
我们知道 redis删除元素的 remove() 方法 , 有一个返回值 , 表示删除的状态 ,
我们可以在消息处理前 , 先 remove() 这个消息 , 如果 remove()成功,则表示当前消息没有被消费 , 如果 remove()失败,则表示该消息已经被消费了
/**
* 处理已到期的消息(轮询)
*/
@Scheduled(fixedDelay = 1000)
public void handleExpiredMessages() {
String currentTime = getCurrentTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中
List<DelayMessage> messages = delayQueue.getExpiredMessages();
System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessage message : messages) {
// 2.1 : 处理消息:先删除消息,获取当前消息是否已经被其他人消费
Long remove = delayQueue.remove(message);
if (remove > 0) {
// 2.2 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
new Thread(() -> {
System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
}).start();
}
}
}
}
执行结果 : 我们会发现 , 重复消费的问题 , 解决了
2023-11-03 15:31:36 待处理消息数量:4
2023-11-03 15:31:36 :1 --> 消息开始处理
2023-11-03 15:31:36 :13 --> 消息开始处理
2023-11-03 15:31:36 :5 --> 消息开始处理
2023-11-03 15:31:36 :9 --> 消息开始处理
2023-11-03 15:31:37 待处理消息数量:0
2023-11-03 15:31:38 待处理消息数量:4
2023-11-03 15:31:38 :10 --> 消息开始处理
2023-11-03 15:31:38 :14 --> 消息开始处理
2023-11-03 15:31:38 :2 --> 消息开始处理
2023-11-03 15:31:38 :6 --> 消息开始处理
2023-11-03 15:31:36 :9 --> 消息处理结束
2023-11-03 15:31:36 :5 --> 消息处理结束
2023-11-03 15:31:36 :13 --> 消息处理结束
2023-11-03 15:31:36 :1 --> 消息处理结束
2023-11-03 15:31:39 待处理消息数量:0
2023-11-03 15:31:40 待处理消息数量:0
2023-11-03 15:31:38 :10 --> 消息处理结束
2023-11-03 15:31:38 :2 --> 消息处理结束
2023-11-03 15:31:38 :6 --> 消息处理结束
2023-11-03 15:31:38 :14 --> 消息处理结束
2023-11-03 15:31:41 待处理消息数量:4
2023-11-03 15:31:41 :11 --> 消息开始处理
2023-11-03 15:31:41 :15 --> 消息开始处理
2023-11-03 15:31:41 :3 --> 消息开始处理
2023-11-03 15:31:41 :7 --> 消息开始处理
2023-11-03 15:31:42 待处理消息数量:0
2023-11-03 15:31:43 待处理消息数量:0
2023-11-03 15:31:41 :7 --> 消息处理结束
2023-11-03 15:31:41 :11 --> 消息处理结束
2023-11-03 15:31:41 :3 --> 消息处理结束
2023-11-03 15:31:41 :15 --> 消息处理结束
但是还会出现问题 , 如果服务重启 , 或者服务宕机 , 那么当前执行中的消息 , 在下次服务启动的时候 , 就会出现消息丢失的情况
我给出的解决方案就是 : 创建一张临时数据表 , 当消息开始消费的时候 ,在表中添加一条记录,当消息消费成功,则把临时表中的记录删除
当服务重启 , 则把临时表中的记录,读到延迟队列中 , 就解决了消息丢失的情况
关键点
使用 缓存的key带内网ip的方式,解决了集群,多机器会出现的所有问题.
使用 后台线程,线程池,解决了消息堆积,延迟消费的问题.
使用 先删除key的方法 , 解决了消息重复消费的问题.
把当前处理的消息进行持久化,解决了消息丢失的问题.
这个只是我给出的解决方案 , 并不是完美的 , 如果想实现消息队列 , 最好是使用 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等
46、Redis回收进程如何工作的?(过期策略采用的什么)
Redis数据删除策略-惰性删除
惰性删除:设置该key过期时间后,我们不去管他,当需要该key时,我们在检查是否过期,如果过期,我们就删掉它,反之返回该key。
例子: set name zhangsan 10 get name //发现name过期了,直接删除key
优点:对CPU友好,只会在使用该key时才会进行过期检查,对于很多用不到的key不用浪费时间进行过期检查
缺点:对内存不友好,如果一个key已经过期,但是一直没有使用,那么该key就会一直在内存,内存永远不会释放。
Redis数据删除策略—定期删除
Redis数据删除策略-定期删除
每隔一段时间,我们就对一些key进行检查,删除里面过期的key(从一定数量的数据库中取出一定数量的随机key进行检查,并删除其中的过期key)。
定期清理有两种模式:
1)、SLOW模式是定时任务,执行频率默认为10hz,每次不超过25ms,以通过修改配置文件redis.conf的hz选项来调整这个次数
2)、FAST模式执行频率不固定,但两次间隔不低于2ms,每次耗时不超过1ms
优点:可以通过限制删除操作执行的时长和频率来减少删除操作对CPU的影响。另外定期删除,也能有效释放过期键占用的内存。
缺点:难以确定删除操作执行的时长和频率。
Redis的过期删除策略:惰性删除+定期删除两种策略进行配合使用
47、Redis回收使用的是什么算法?
3.0之前 LRU算法,3.0及之后noeviction