NoSql 数据存储:Redis
- 前言
- 安装
- 在Windows上安装Redis
- 在Linux上源代码安装Redis
- 远程连接
- 基本操作
- 常用命令
- string
- hash
- list
- set
- zset
- SpringBoot整合Redis
- 基本使用
- 项目案例
- 排行榜应用,取浏览量最高TOP N数据的操作
- 需要精确设定过期时间的应用
- 计数器应用
- Uniq操作,获取某段时间所有数据排重值
- 实时系统,反垃圾系统
- Pub/Sub构建实时消息系统
- 缓存
- 限流
- 分布式锁
- Lua脚本实现分布式锁
前言
Redis是一个开源(BSD许可)的键值对内存数据结构存储,用作数据库、缓存、消息代理和流引擎。Redis提供了数据结构,如字符串、哈希、列表、集合、排序集合、范围查询、位图、超日志、地理空间索引和流。Redis具有内置复制、Lua脚本、LRU驱逐、事务和不同级别的磁盘持久性,并通过Redis Sentinel和Redis Cluster提供高可用性和自动分区。
你可以在这些类型上运行原子操作,比如追加字符串;递增哈希值;将元素压入列表;计算集合的交、并、差;或者得到排序集合中排名最高的元素。
为了达到最佳性能,Redis使用内存中的数据集。根据您的使用情况,Redis可以通过定期将数据集转储到磁盘或通过将每个命令附加到基于磁盘的日志来持久化您的数据。如果您只需要一个功能丰富、联网的内存缓存,也可以禁用持久性。
Redis支持异步复制,具有快速的非阻塞同步和自动重连接,可以在网拆分时进行部分重同步。
安装
最新的稳定版本总是可以在固定的https://download.redis.io/redis-stable.tar.gz URL上获得,以及它的SHA-256总和。
在Windows上安装Redis
Redis在Windows上不受官方支持。
进入https://github.com/tporadowski/redis/releases/tag/v5.0.14.1可以找到其他大神提供的Windows的安装包,
- 选择你想要的版本,点击下载对应文件,比如
.zip
或者.msi
。 - 解压文件
- 双击redis-server.exe启动服务端
- 双击redis-cli.exe启动客户端连接服务端
- 在客户端输入
“ping”
,出现“PONG”
,即证明连接成功
在Linux上源代码安装Redis
有两种方式获取源代码包,一种使用wget
命令进行下载
wget https://download.redis.io/releases/redis-7.0.15.tar.gz
如图所示
另一种是去官网(https://redis.io/download/)下载后上传到服务器
- 解压文件
然后就是解压文件(切换到root用户,执行命令,方式再操作过程中权限不够)
tar -axvf redis-7.0.15.tar.gz
如图所示
- 编译
解压完成后本地就会有两个文件,如图所示
进入解压后的目录,使用命令编译源文件
make && make install
如图所示
你可以再安装的时候指定目录
make install PREFIX=/usr/local/redis
查看/user/local
目录和,会发现多了一个redis目录。
这里我们没有安装gcc
,再编译中好像会自动安装(可能是新版的原因)。你可以使用命令查看gcc
版本,确认是否安装。
gcc -v
如图所示
如果没有,再执行命令安装
yum install gcc
- 查看redis版本
使用命令查看redis版本,说明安装成功
redis-cli -v
如图所示
- 进入安装目录
默认安装路径是在 /usr/local/bin
(安装时没指定目录,所有文件都在bin目录下)
- 启动
执行命令启动
./redis-server
如图所示
端口号为6379,前台方式启动,当客户端关闭时,redis也会关闭。
- 修改配置文件
我们找到redis安装目录下的redis.conf
文件。
如图所示
修改之前先备份一份
cp redis.conf redis.conf.bak
如图所示
然后修改文件
vim redis.conf
(1)修改后台运行
找到daemonize no
修改为daemonize yes
。
daemonize yes
(2)修改端口号
如果不准备用默认端口,可以修改。将port 6379
改为你需要的端口,比如:port 6389
。
port 6389
如果修改端口的话,建议将文件也修改为对应端口文件,比如:/redis/6379.conf
(3)修改监听地址
默认是 127.0.0.1,会导致只能在本地访问。修改成 0.0.0.0 则可以在任意 IP 访问,生产环境不要设置 0.0.0.0。
bind 0.0.0.0
(4)其他配置
# 设置 redis 能够使用的最大内存
maxmemory 256mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"
修改完后,保存退出。
- 指定配置文件启动
# 指定配置文件启动 redis,默认
./redis-server /home/lighthouse/redis-7.0.15/redis.conf
- 查看redis是否启动
使用命令查看进程
ps -aux|grep redis
如图所示
也可以使用端口监听方式查看
netstat -lanp |grep 6379
如图所示
- 打开redis连接
./redis-cli
如图所示
返回pong,连接成功(redis默认是没有设置密码的,如果有需要自己去了解)。
- 关闭redis
(1)使用kill -9 进程id
强制关闭
先使用ps -aux|grep redis
查看进程
然后执行命令
kill -9 28984
如图所示
这种方式关闭redis,容易丢失数据。
(2)使用redis-cli shutdown
关闭
- 开机自启动
如果你的服务器关闭了的话,redis 也会关闭,你可以通过systemctl
进行自启动
创建Redis服务文件:
vi /etc/systemd/system/redis.service
配置内容如下
[Unit]
Description=Redis
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /home/lighthouse/redis-7.0.15/redis.conf
ExecReload=/usr/local/bin/redis-server -s reload
ExecStop=/user/local/bin/redis-server -s stop
PrivateTmp=true
[Install]
WantedBy=multi-user.target
配置完成保存退出。
执行systemctl daemon-reload
重新加载系统服务。
执行systemctl start redis
启动redis。
执行systemctl status redis
查看服务状态。
执行systemctl enable redis
设置开机自启动,然后重启服务器看看效果(貌似不需要配置文件也能设置)。
执行systemctl stop redis
停止运行。
执行systemctl disable redis
取消开机自启动。
执行systemctl restart redis
重启redis。
远程连接
使用RedisInsight工具进行远程连接
官网下载点击进入,最下面填写好信息就行(随便填)
点击.exe
下一步到最后一步就行,打开工具
点击Add connection details manually,添加连接。
输入URL和密码后,连接成功如下
如果连接失败,看三个地方:
- 检查服务器防火墙是否开启。
- 将
bind 127.0.0.1
注释(127.0.0.1
只能本地连接)。
- 找到
requirepass
设置你的密码,当然你也可以忽略这一步。
都配好后重启服务,即可。
- 使用
点击连接进入,可以查看的创建的内容,点击Key按钮,创建一个示例
假如创建一个字符串,点击Add Key
保存后,如图
基本操作
我们先来了解一下redis的一些常用命令,先登入到redis:
./redis-cli
语法:
redis-cli -h ip -p port -a password
如果在命令行上'-a'
或者 '-u'
,会被认为不安全
可以在登入进去以后,输入auth 密码
,进行验证
常用命令
keys
:查看所有键
keys *
dbsize
:键总数
dbsize
exists key
:检查键是否存在
exists test
del key [key ...]
:删除键
del username
expire key seconds
:键过期
expire username 100
ttl key
: 通过 ttl 命令观察键键的剩余过期时间
ttl username
结果等于-1:有效期永久
结果等于-2 :该数据不存在
结果大于0 :该数据有效期为当前数字秒
type key
:键的数据结构类型
type username
string
字符串 string 是 Redis 最简单的数据结构。字符串结构使用非常广泛,一个常见的用途就是缓存用户信息。我们将用户信息结构体 使用 JSON 序列化成字符串,然后将序列化后的字符串塞进 Redis 来缓存。
set key value
:设置字符串值,键为 key 的值为 value。
set school xiaotiancai
get key
:获取key 的值。
get school
append key value
:将值追加到 key 的字符串值的末尾。
append school 2
strlen key
:获取字符串值的长度。
strlen school
setex key seconds value
:设置带有过期时间的键值对。
setex school 1000 xiaojiahuo
mset key value [key value ...]
:设置多个值
mset user abc user2 def
mget key [key ...]
:获取多个值
mget user user2
incr key
:自加
incr num
如果不是数值类型就会报错
incrby key num
:指定累加数值。
incrby num 100
decr key
:自减
decr num
decrby key num
:指定递减小数数值。
decrby num 10
hash
HASH 类型在 Redis 中用于存储键值对的集合。每个 HASH 可以包含多个字段(field)和对应的值(value)。
hset key field value
:设置值
hset maps name zhangsan age 18
hget key field
:获取值
hget maps name
hgetall key
:获取所有字段和值
hgetall maps
hexists key field
:检查字段是否存在
hexists maps sex
hdel key field [field ...]
:删除 HASH 中一个或多个字段。
hdel maps age
hlen key
:计算 field 个数
hlen maps
hmget key field [field ...]
:批量获取 field-value
hmget maps name age
hkeys key
:获取所有 field
hkeys maps
hvals key
:获取所有 value
hvals maps
list
rpush key value [value ...]
:从右边插入元素
rpush classlist one two three
lpush key value [value ...]
:从左边插入元素
lpush classlist2 one two three
linsert key before|after pivot value
:向某个元素前或者后插入元素
#向two元素前面添加four
linsert list before two four
#向four后面添加five
linsert list after four five
lset key index value
:设置指定索引的元素值
lset list 0 six
lrange key start end
:获取指定范围内的元素列表,lrange key 0 -1可以从左到右获取列表的所有元素
lrange list 0 -1
lindex key index
:获取列表指定索引下标的元素
lindex list 0
llen key
:获取列表长度
llen list
lpop key
:从列表左侧弹出元素
lpop list 2
rpop key
:从列表右侧弹出元素
rpop list 2
set
sadd key element [element ...]
:添加元素,返回结果为添加成功的元素个数
sadd set1 a b c d e f
srem key element [element ...]
:删除元素,返回结果为成功删除元素个数
srem set1 a b c d e f
smembers key
:获取所有元素
smembers set2
sismember key element
:判断元素是否在集合中,如果给定元素 element 在集合内返回 1,反之返回 0
sismember set1 a
scard key
:计算元素个数,scard 的时间复杂度为 O(1),它不会遍历集合所有元素
scard set2
spop key
:从集合随机弹出元素,从 3.2 版本开始,spop 也支持[count]参数。
spop set2 6
srandmember key [count]
:随机从集合返回指定个数元素,[count]是可选参数,如果不写默认为 1
srandmember set1 3
sinter key [key ...]
:求多个集合的交集
sinter set1 set2
sunion key [key ...]
:求多个集合的并集
sunion set1 set2
sdiff key [key ...]
:求多个集合的差集
sdiff set1 set2
zset
zadd key score member [score member ...]
:添加成员,返回结果代表成功添加成员的个数。
zadd sortset 1 one 2 two 4 three 5 four
zrem key member [member ...]
:删除成员
zrem sortset three four
zcard key
:计算成员个数
zcard sortset
zrange key start end [withscores]
和zrevrange key start end [withscores]
:返回指定排名范围的成员,zrange 是从低到高返回,zrevrange 反之。
zrange sortset 0 -1 withscores
zrevrange sortset 0 -1 withscores
zremrangebyrank key start end
:删除指定排名内的升序元素
zremrangebyrank sortset 0 2
zremrangebyscore key min max
:删除指定分数范围的成员
zremrangebyscore sortset 0 5
SpringBoot整合Redis
- 引入pom依赖:
这是Redis官方提供的依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.2.0-alpha3</version>
</dependency>
并在配置文件中切换 Redis 客户端为 jedis
spring.redis.client-type=jedis
这是SpringBoot提供的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.4.0</version>
</dependency>
配置连接
#redis地址
spring.redis.host=127.0.0.1
#redis端口
spring.redis.port=6379
#redis密码(非必须)
spring.redis.password=123456
除此之外,还有一些常见的配置(yaml
文件改变格式就行)
# 读取超时
spring.redis.timeout=5000
# 连接超时
spring.redis.connect-timeout=10000
#数据库编号
spring.redis.database= 0
#是否开启ssl
spring.redis.ssl=false
# 自动开启连接池
spring.redis.jedis.pool.enabled=true
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=8
# 连接池最大阻塞等待时间(毫秒)
spring.redis.jedis.pool.max-wait=10000
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
#客户端名称
spring.redis.client-name=myClientName
#集群部署 ip:port,...
spring.redis.cluster.nodes=192.168.1.1:6379,192.168.1.2:6379,192.168.1.3:6379
#哨兵模式 主节点名称
spring.redis.sentinel.master=mymaster
#哨兵模式 节点列表 ip:port,...
spring.redis.sentinel.nodes=192.168.1.1:7001,192.168.1.1:7002,192.168.1.1:7003
# 用于使用哨兵进行身份验证的密码
spring.redis.sentinel.password=123456
注册RedisTemplate(序列化可以帮助查询的数据显示方式,如果不配置可能显示不出来)
@Configuration
public class MyConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//设置默认序列化方式
redisTemplate.setDefaultSerializer(stringRedisSerializer);
//启动默认序列化
redisTemplate.setEnableDefaultSerializer(true);
return redisTemplate;
}
}
你也可以单独设置String的序列化方式:
redisTemplate.setStringSerializer(stringRedisSerializer);
你也可以设置通用序列化方式:
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
还可以单独设置Hash的序列化方式:
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
无论设置哪一种都会覆盖相对应默认的序列化方法。
你不需要显式调用,RedisTemplate 实现了 InitializingBean 接口,Spring 在处理 bean 初始化过程时,会自动检测并调用
redisTemplate.afterPropertiesSet();
完整配置如下:
@Configuration
public class MyConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
//设置了 ObjectMapper 的可见性规则。通过该设置,所有字段(包括 private、protected 和 package-visible 等)都将被序列化和反序列化,无论它们的可见性如何。
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//启用了默认的类型信息 NON_FINAL 参数表示只有非 final 类型的对象才包含类型信息,这可以帮助在反序列化时正确地将 JSON 字符串转换回对象。
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
//value的序列化方式采用jackson
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
return redisTemplate;
}
}
整合完毕。
基本使用
下面再来讲讲在代码中如何操作redis。
(1)查看所有key
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//查看所有key
Set<String> keys = redisTemplate.keys("*");
System.out.println(keys);
/** Output
* [name, identity, age, amount]
*/
}
}
(2)判断key是否存在
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//判断key是否存在
Boolean b = redisTemplate.hasKey("name");
System.out.println(b);
/** Output
* true
*/
}
}
(3)删除key
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//删除key
redisTemplate.delete("identity");
}
}
(4)设置key有效期
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//设置key有效期
redisTemplate.expire("age",100,TimeUnit.SECONDS);
}
}
(5)获取key有效期
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//获取key有效期
Long age = redisTemplate.getExpire("age");
System.out.println(age);
/** Output
* 100
*/
}
}
(6)查询key类型
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//查询key类型
DataType name = redisTemplate.type("name");
System.out.println(name);
/** Output
* STRING
*/
}
}
(7)重命名key
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//重命名key
redisTemplate.rename("name","newname");
}
}
(8)移除key存活时间
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//移除存活时间
redisTemplate.persist("age");
}
}
- string
(1)设置字符串值,键为 key 的值为 value。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
//赋值
str.set("name","zhangsan");
str.set("age",18);
}
}
(2)获取值。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
Object name = str.get("name");
System.out.println(name);
Object age = str.get("age");
System.out.println(age);
/** Output
* zhangsan
* 18
*/
}
}
(3)将值追加到 key 的字符串值的末尾。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
Object name = str.append("name","2");
System.out.println(name);
}
}
(4)获取字符串值的长度。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
Long name = str.size("name");
System.out.println(name);
/** Output
* 11
*/
}
}
(5)设置带有过期时间的键值对。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
str.set("sex","男",100, TimeUnit.SECONDS);
}
}
TimeUnit:TimeUnit.DAYS
天、TimeUnit.HOURS
小时、TimeUnit.MINUTES
分钟、TimeUnit.SECONDS
秒、TimeUnit.MILLISECONDS
毫秒
(6)设置多个值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
Map<String,Object> map = new HashMap<>();
map.put("identity","100000000");
map.put("amount",10000);
str.multiSet(map);
}
}
(7)获取多个值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
List<Object> objects = str.multiGet(Arrays.asList("name", "age"));
objects.forEach(item->{
System.out.println(item);
});
/** Output
* zhangsan
* 18
*/
}
}
(8)自加、自减
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ValueOperations<String, Object> str = redisTemplate.opsForValue();
//正数为加
str.increment("age",2);
//负数为减
Long age = str.increment("age", -1);
System.out.println(age);
/** Output
* 1
*/
}
}
- hash
(1)设置值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
HashOperations<String, Object, Object> keys = redisTemplate.opsForHash();
//设置值
keys.put("maps","name","zhang");
Map<String,Object> map = new HashMap<>();
map.put("name","san");
map.put("age",13);
keys.putAll("maps2",map);
}
}
(2)获取值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//获取值
Object name = keys.get("maps", "name");
System.out.println(name);
/** Output
* san
*/
}
}
(3)获取所有keys
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//获取所有keys
Set<Object> maps = keys.keys("maps");
System.out.println(maps);
/** Output
* [name, age]
*/
}
}
(4)获取所有values
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//获取所有values
List<Object> maps = keys.values("maps");
System.out.println(maps);
/** Output
* [san, 13]
*/
}
}
(5)判断key是否存在
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//判断key是否存在
Boolean b = keys.hasKey("maps", "name");
System.out.println(b);
/** Output
* true
*/
}
}
(6)删除key
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//删除key
keys.delete("map","name","age");
}
}
(7)计算key数量
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//计算key数量
Long map = keys.size("maps");
System.out.println(map);
/** Output
* 2
*/
}
}
(8)获取多个key
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
//获取多个keys
keys.multiGet("maps",Arrays.asList("name","age"));
/** Output
* [san, 13]
*/
}
}
(9)JSON形式显示
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
HashOperations<String, Object, Object> keys = redisTemplate.opsForHash();
//以键值对形式获取
Map<Object, Object> map = keys.entries("maps");
System.out.println(map);
/** Output
* {name=san, age=13}
*/
}
}
- list
(1)插入元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
//从左边插入元素
keys.leftPush("lists","one","two");
keys.leftPushAll("lists","one","two");
//从右边插入元素
keys.rightPush("lists2","one","two");
keys.rightPushAll("lists2","one","two");
}
}
(2)获取元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
//获取元素
List<Object> list = keys.range("lists", 0, -1);
list.forEach(item->{
System.out.println(item);
});
/** Output
* two
* three
*/
}
}
(3)移除元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
//从左边移除元素
keys.leftPop("lists");
//从右边移除元素
keys.rightPop("lists2");
}
}
(4)获取元素长度
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
//获取元素长度
Long sets = keys.size("lists");
System.out.println(sets);
/** Output
* 2
*/
}
}
(5)指定位置替换元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
//指定位置替换
keys.set("lists",1,"three");
}
}
(6)删除相同元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
Long remove = keys.remove("lists", 1, "three");
System.out.println(remove);
/** Output
* 1
*/
}
}
index=0
,删除所有值等于value
的元素。
index>0
,从头部开始删除第一个值等于value
的元素。
index<0
,从尾部开始删除第一个值等于value
的元素。
(7)获取下标元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ListOperations<String, Object> keys = redisTemplate.opsForList();
Object sets = keys.index("lists", 0);
System.out.println(sets);
/** Output
* two
*/
}
}
- set
(1)插入值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
SetOperations<String, Object> keys = redisTemplate.opsForSet();
keys.add("sets","one","two","three");
keys.add("sets2","three","four","five");
}
}
(2)获取值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
SetOperations<String, Object> keys = redisTemplate.opsForSet();
//获取值
Set<Object> sets = keys.members("sets");
System.out.println(sets);
/** Output
* [two, three, one]
*/
}
}
(3)删除值
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
SetOperations<String, Object> keys = redisTemplate.opsForSet();
//删除值
keys.remove("sets","three");
}
}
(4)判断值是否存在
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
SetOperations<String, Object> keys = redisTemplate.opsForSet();
//判断值是否存在
Boolean member = keys.isMember("sets", "four");
System.out.println(member);
/** Output
* false
*/
}
}
(5)并集、交集、差集
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
SetOperations<String, Object> keys = redisTemplate.opsForSet();
//并集
Set<Object> intersect = keys.intersect("sets", "sets2");
System.out.println(intersect);
//交集
Set<Object> union = keys.union("sets", "set2");
System.out.println(union);
//差集
Set<Object> difference = keys.difference("sets", "set2");
System.out.println(difference);
/** Output
* [three]
* [two, one]
* [two, one]
*/
}
}
- zset
(1)添加元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ZSetOperations<String, Object> keys = redisTemplate.opsForZSet();
keys.add("zset","one",0);
keys.add("zset","two",1);
keys.add("zset","three",2);
}
}
(2)获取元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ZSetOperations<String, Object> keys = redisTemplate.opsForZSet();
Set<Object> zset = keys.range("zset", 0, -1);
System.out.println(zset);
Set<Object> zset1 = keys.reverseRange("zset", 0, -1);
System.out.println(zset1);
/** Output
* [one, two, three]
* [three, two, one]
*/
}
}
(3)删除元素
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ZSetOperations<String, Object> keys = redisTemplate.opsForZSet();
keys.remove("zset","one","two");
}
}
(4)获取元素数量
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/get")
public void get(HttpServletResponse response){
ZSetOperations<String, Object> keys = redisTemplate.opsForZSet();
Long zset = keys.size("zset");
System.out.println(zset);
/** Output
* 3
*/
}
}
你可以封装一个工具类,方便再日常开发中共同使用
@Component
public class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
/**
* string缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* string缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* string缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
项目案例
排行榜应用,取浏览量最高TOP N数据的操作
现有表结构如下:
示例代码如下:
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private BillboardService billboardService;
@GetMapping("/get")
public void get(HttpServletResponse response){
//查询排行榜数据
List<Billboard> list = billboardService.list();
//项list添加数据
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
list.forEach(item->{
zset.add("billboard",item,item.getPageView());
});
//倒序,获取前5条
Set<Object> billboard = zset.reverseRange("billboard", 0, 4);
billboard.forEach(item->{
Billboard b = (Billboard) item;
System.out.println(b.getId()+","+b.getTitile()+","+b.getPageView());
});
/** Output
* 1, 二十届三中全会7月召开,999999
* 4,神十七载人飞行任务圆满成功,400053
* 5,天涯社区将恢复访问,299931
* 3,2套房京籍家庭可在五环外新购1套房,238222
* 2,低估了五一堵车的程度,213412
*/
//正序
billboard = zset.range("billboard", 0, -1);
billboard.forEach(item->{
Billboard b = (Billboard) item;
System.out.println(b.getId()+","+b.getTitile()+","+b.getPageView());
});
}
}
需要精确设定过期时间的应用
比如:在用户登录、注册或重置密码等场景下,生成验证码或临时凭证,并设置过期时间,以确保安全性和及时清理过期数据。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/login")
public void login(){
//用户id
String userId = "2024050600001";
//用户token
String token = "eyJhbGciOiJIUzUxMiJ9" +
".eyJzdWIiOiJhZG1pbiIsImNyZWF0ZWQiOjE3MTI0Nzc1Nzk3MDcsImV4cCI6MTcxMzA4MjM3OX0" +
".BTkKN8GyW37g2nWhTOGd4fV71KyzaNNr3EpoS1qQ-t3dp-DrQ8NTy_RQk21X75VuiFQL0tSzNOySBT8oozeM9Q";
//设置token有效期为30分钟
redisTemplate.opsForValue().set(userId,token,30,TimeUnit.MINUTES);
}
}
计数器应用
比如:计数器可以用于实现诸如统计网站访问量、计算用户点击次数、记录商品销量或者防止重复提交等功能。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private BillboardService billboardService;
@GetMapping("/counter")
public void counter() {
//1.网站访问次数
Long visitTimes = redisTemplate.opsForValue().increment("visitTimes");
System.out.println(visitTimes);//第一次输出1
//2.防止重复提交
String key = userId + "接口地址";
Long increment = redisTemplate.opsForValue().increment(key);
//设置过期时间,几秒内禁止重复提交
if(increment == 1){
redisTemplate.expire(key,5,TimeUnit.SECONDS);
}
if (increment > 1) {
System.out.println("请勿重复提交");
}
}
}
Uniq操作,获取某段时间所有数据排重值
比如:某段时间内数据的去重后的数量。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private BillboardService billboardService;
@GetMapping("/uniq")
public void uniq() {
// 示例数据
List<String> dataList = Arrays.asList("dataId1", "dataId2", "dataId3", "dataId3");
String timeSegment = "2024-05-07"; // 某段时间的标识符
// 添加数据到集合(某段时间)
for (String dataId : dataList) {
redisTemplate.opsForSet().add("data_set:" + timeSegment, dataId);
}
// 获取某段时间内所有数据的排重值
long uniqueCount = redisTemplate.opsForSet().size("data_set:" + timeSegment);
System.out.println("某段时间内所有数据的排重值:" + uniqueCount);
/** Output:
* 某段时间内所有数据的排重值:3
*/
}
}
实时系统,反垃圾系统
比如:进行简单的反垃圾处理。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/kbas")
public void kbas(){
processIncomingData("这是一个敏感数据");
System.out.println(isSpamData("这是一个敏感数据"));
/** Output
* true
*/
}
public boolean isSpam(String data) {
// 假设这里使用简单的规则进行判断
if (data.contains("敏感")) {
return true;
}
return false;
}
public void processIncomingData(String data) {
if (isSpam(data)) {
// 如果是垃圾数据,可以进行处理,比如删除或标记
redisTemplate.opsForValue().set("spam:" + data, "true");
} else {
// 不是垃圾数据,进行其他处理
redisTemplate.opsForValue().set("spam:" + data, "false");
}
}
public boolean isSpamData(String data) {
String key = "spam:" + data;
return "true".equals(redisTemplate.opsForValue().get(key));
}
}
Pub/Sub构建实时消息系统
比如:实现一个消息监听队列。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/send")
public void send(){
redisTemplate.convertAndSend("messageQueue","发送一个新消息");
}
}
@Component
public class MessageQueueListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("收到消息:"+message.toString());
}
}
@Configuration
public class MyConfig {
/**
* 初始化监听器
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//设置监听类和监听通道绑定
container.addMessageListener(listenerAdapter(), new ChannelTopic("messageQueue"));
return container;
}
/**
* 利用反射来创建监听到消息之后的执行方法
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(new MessageQueueListener());
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
//参考前面示例的序列化配置
return redisTemplate;
}
}
如图所示
除此之外你还可以监听key过期消息,实现延迟任务的效果
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/send")
public void send(){
redisTemplate.opsForValue().set("listenerKey","test",5, TimeUnit.SECONDS);
}
}
@Configuration
public class MyConfig {
/**
* 初始化监听器
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
//参考前面示例的序列化配置
return redisTemplate;
}
}
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String patternStr = new String(pattern);
System.out.println("---channel---: " + patternStr);
System.out.println("---message---: " + message.toString());
}
}
如图所示
缓存
比如:针对一些数据量大,修改操作较少的数据,我们可以放入缓存中减少数据库压力。
@Controller
public class MyController {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@GetMapping("/fetch")
public String fetch(){
//假设从数据获取到数据
String data ="[CityDTO{id=1, name='北京市', code='100000', child=[CityDTO{id=2, name='东城区', code='101010100', child=null}, CityDTO{id=3, name='西城区', code='100032', child=null}]}]";
//存入redis中
if (redisTemplate.opsForValue().get("citysData") == null) {
redisTemplate.opsForValue().set("citysData", data);
}
return (String) redisTemplate.opsForValue().get("citysData");
}
}
限流
我们可以使用redis进行限流,防止大量恶意请求,保证系统稳定性。
- 固定时间窗口算法
比如:发短信接口限制每个用户每分钟只能请求一次,防止公司财产恶意流失。
最简单的方式就是计数器(固定时间窗口算法),这里只讲方法,你可以把代码以切面的形式进行使用。
@RestController
public class MyController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/sendSms")
public void sendSms() {
String key = userId;
Long increment = redisTemplate.opsForValue().increment(key);
//设置过期时间,几秒内禁止重复提交
if(increment == 1){
redisTemplate.expire(key,60,TimeUnit.SECONDS);
}
if (increment > 1) {
System.out.println("请勿重复提交");
}
}
}
在固定的时间内出现流量溢出可以立即做出限流。每个时间窗口不会相互影响;在一个单元时间窗内前期如果很快的消耗完请求阈值。那么剩下的时间将会无法请求。这样就会因为一瞬间的流量导致一段时间内系统不可用。
- 滑动窗口算法
滑动窗口算法在固定窗口的基础上,将一个窗口分为若干个等份的小窗口,每个小窗口对应不同的时间点,拥有独立的计数器,当请求的时间点大于当前窗口的最大时间点时,则将窗口向前平移一个小窗口(将第一个小窗口的数据舍弃,第二个小窗口变成第一个小窗口,当前请求放在最后一个小窗口),整个窗口的所有请求数相加不能大于阀值。
@RestController
public class MyController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductMapper productMapper;
@GetMapping("/test")
public void test() throws InterruptedException {
//10秒钟
int intervalTime = 10000;
Long currentTime = System.currentTimeMillis();
//查询当前时间前10秒-当前时间的数据
Long count = redisTemplate.opsForZSet().count("limit", currentTime - intervalTime, currentTime);
if (count > 3) {//设置请求次数不超过4次
System.out.println("请求太频繁:"+new Date(currentTime - intervalTime)+"到"+new Date(currentTime));
return;
}
Date date = new Date(currentTime);
System.out.println("请求成功,插入时间:"+date);
redisTemplate.opsForZSet().add("limit", UUID.randomUUID().toString(), currentTime);
}
public static void main(String[] args) {
Long currentTime = System.currentTimeMillis();
Date date = new Date(currentTime);
System.out.println(date);
}
}
执行结果如图
除了上面介绍的两种限流方案,还有漏斗、令牌桶算法(redisson讲解)、lua脚本实现,另外,还有Nginx限流,网关限流等方案。
分布式锁
我们先来看一个案例,在不加锁的情况下去秒杀一个商品
@Controller
public class MyController {
@Autowired
private ProductMapper productMapper;
@GetMapping("/test")
public void test(){
Product product = productMapper.selectById(1);
System.out.println("当前线程数:"+Thread.currentThread().getName()+",商品数量:"+product.getNumber());
if(product == null){
System.out.println("商品不存在");
return;
}
if(product != null && product.getNumber().equals(0)){
System.out.println("商品库存不足");
return;
}
//修改库存
Integer number = product.getNumber()-1;
product.setNumber(number);
productMapper.updateById(product);
}
}
执行结果如下
我们可以看到,未加锁的情况下,多个线程访问会出现获取的数量相同的情况,再单机部署的情况下,我们可以使用synchronized锁或者Lock锁。
@Controller
public class MyController {
@Autowired
private ProductMapper productMapper;
private ReentrantLock lock = new ReentrantLock();
@GetMapping("/test")
public void test(){
synchronized (this) {
Product product = productMapper.selectById(1);
System.out.println("当前线程数:" + Thread.currentThread().getName() + ",商品数量:" + product.getNumber());
if (product == null) {
System.out.println("商品不存在");
return;
}
if (product != null && product.getNumber().equals(0)) {
System.out.println("商品库存不足");
return;
}
//修改库存
Integer number = product.getNumber() - 1;
product.setNumber(number);
productMapper.updateById(product);
}
}
}
执行结果如下
随着互联网的发展会将项目部署到多台服务器上,这种情况下使用synchronized锁或者Lock锁,无法保证一致性。我们可以使用redis中setnx
进行分布式锁
SETEX key seconds value
:设置指定的到期时间
SETNX key value
:仅在键不存在时设置键
示例代码如下:
@RestController
public class MyController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductMapper productMapper;
@GetMapping("/test")
public void test() {
String lockKey = "request_key";
String clientId = "pod_1";
try {
if (!tryLock(lockKey,clientId)){
System.out.println("访问人数太多,稍后重试");
return;
}
Product product = productMapper.selectById(1);
System.out.println("当前线程数:" + Thread.currentThread().getName() + ",商品数量:" + product.getNumber());
if (product == null) {
System.out.println("商品不存在");
return;
}
if (product != null && product.getNumber().equals(0)) {
System.out.println("商品库存不足");
return;
}
//修改库存
Integer number = product.getNumber() - 1;
product.setNumber(number);
productMapper.updateById(product);
} finally {
unLock(lockKey);
}
}
public boolean tryLock(String key, Object value) {
return redisTemplate.opsForValue().setIfAbsent(key, value);
}
public Object unLock(String key) {
return redisTemplate.delete(key);
}
}
执行结果如图
现在有三个问题:
- 问题一:加锁后,系统宕机,导致锁无法释放,此时系统将死锁,无法进行秒杀。【解决方法:给锁设置超时时间】
- 问题二:当前锁不唯一,导致当前锁可能被其他线程给释放。【解决办法:判断只有自己可以释放自己的锁】
- 问题三:设置超时时间后,业务执行时间大于超时时间,导致未执行完毕,锁被释放。【解决办法:开辟子线程,定期检测锁是否被释放,未释放,则重置锁时间(使用redisson更简单,后续介绍)】
@RestController
public class MyController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductMapper productMapper;
@GetMapping("/test")
public void test() throws InterruptedException {
String lockKey = "request_key";
//唯一value
String clientId = UUID.randomUUID().toString();
try {
if (!tryLock(lockKey,clientId)){
System.out.println("访问人数太多,稍后重试:"+Thread.currentThread().getName());
return;
}
Product product = productMapper.selectById(1);
System.out.println("当前线程数:" + Thread.currentThread().getName() + ",商品数量:" + product.getNumber());
if (product == null) {
System.out.println("商品不存在");
return;
}
if (product != null && product.getNumber().equals(0)) {
System.out.println("商品库存不足");
return;
}
//模拟业务操作
Thread.sleep(3000);
//修改库存
Integer number = product.getNumber() - 1;
product.setNumber(number);
productMapper.updateById(product);
} finally {
unLock(lockKey,clientId);
}
}
public boolean tryLock(String key, Object value) {
//给锁加上超时时间
return redisTemplate.opsForValue().setIfAbsent(key, value,10, TimeUnit.SECONDS);
}
public void unLock(String key,String value) {
Object o = redisTemplate.opsForValue().get(key);
//判断只有自己可以释放自己的锁
if(o != null && o.toString().equals(value)) {
System.out.println("释放锁:"+Thread.currentThread().getName());
redisTemplate.delete(key);
}
}
}
执行结果如图
为了方便在多个场景中使用,可以将redis锁操作进行封装
@Component
public class RedisLockUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 给锁设置指定过期时间
* @param key
* @param value
* @param timeout
* @param timeUnit
* @return
*/
public boolean tryLock(String key, Object value, long timeout, TimeUnit timeUnit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, timeUnit);
}
/**
* 使用默认过期时间
* @param key
* @param value
* @return
*/
public boolean tryLock(String key, Object value) {
//给锁加上超时时间
return redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
}
/**
* 解锁
* @param key
* @param value
*/
public void unLock(String key, String value) {
Object o = redisTemplate.opsForValue().get(key);
//判断只有自己可以释放自己的锁
if (o != null && o.toString().equals(value)) {
System.out.println("释放锁:" + Thread.currentThread().getName());
redisTemplate.delete(key);
}
}
}
Lua脚本实现分布式锁
Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。下面我们介绍使用Lua脚本进行分布式锁操作。
- 方法一:再
resources
目录下创建.lua
脚本
可以再idea中下载lua插件
lock.lua(加锁脚本)
local key = KEYS[1]; --KEYS[1]是 lockKey 表示获取的锁资源,比如 lock:168。
local threadId = ARGV[1];--ARGV[1] 唯一标识
local releaseTime = ARGV[2];--ARGV[2] 表示表示锁的有效时间(单位毫秒)。
if redis.call('setNx', key, threadId) then -- 判断锁释放是否存在
if redis.call('get', key) == threadId then -- 判断当前锁是不是自己的
return redis.call('expire', key, releaseTime) -- 设置有效期
else
return 0
end
end
-- 方式二:
---- 键值
--local key = KEYS[1]
---- 值
--local value = ARGV[1]
---- 过期时间
--local expire = ARGV[2]
---- SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
---- NX 仅当key不存在时才设置key
---- XX 仅当key已经存在时才设置key
--local success = redis.call('SET', key, value, 'NX', 'EX', expire)
--if success then
-- return 1
--else
-- return 0
--end
unlock.lua(解锁脚本)
local key = KEYS[1]; --KEYS[1]是 lockKey 表示获取的锁资源,比如 lock:168。
local threadId = ARGV[1];--ARGV[1] 唯一标识
if redis.call('get', key) == threadId then -- 判断当前锁是不是自己的
return redis.call('del', key) -- 释放锁
else
return 0
end
-- 方式二:
---- 键
--local key = KEYS[1]
---- 删除已存在的键,不存在的 key 会被忽略
--local success = redis.call('DEL', key)
--if success then
-- return 1
--else
-- return 0
--end
示例代码如下
@Component
public class RedisLockUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private DefaultRedisScript<Long> lockScript;
private DefaultRedisScript<Long> unlockScript;
//初始化加载文件
@PostConstruct
public void init() {
// 加载加锁的脚本
lockScript = new DefaultRedisScript<>();
this.lockScript.setLocation(new ClassPathResource("lock.lua"));
this.lockScript.setResultType(Long.class);
// 加载释放锁的脚本
unlockScript = new DefaultRedisScript<>();
this.unlockScript.setLocation(new ClassPathResource("unlock.lua"));
this.unlockScript.setResultType(Long.class);
}
/**
* 获取锁
*/
public boolean tryLock(String lockKey,String value, long releaseTime) {
// 执行脚本
Long result = (Long) redisTemplate.execute(
lockScript,
Collections.singletonList(lockKey),
value,
releaseTime);
if (result != null && result.intValue() == 1) {
return true;
} else {
return false;
}
}
/**
* 解锁
*/
public void unlock(String lockKey, String value) {
try {
redisTemplate.execute(unlockScript,
Collections.singletonList(lockKey),
value);
} catch (Exception e){
e.printStackTrace();
}
}
}
@RestController
public class MyController {
@Autowired
private ProductMapper productMapper;
@Autowired
private RedisLockUtil redisLockUtil;
@GetMapping("/test")
public void test() {
String lockKey = "request_key";
//唯一value
String clientId = UUID.randomUUID().toString();
try {
if (!redisLockUtil.tryLock(lockKey,clientId,1000)){
System.out.println("访问人数太多,稍后重试2:"+Thread.currentThread().getName());
return;
}
Product product = productMapper.selectById(1);
System.out.println("当前线程数:" + Thread.currentThread().getName() + ",商品数量:" + product.getNumber());
if (product == null) {
System.out.println("商品不存在");
return;
}
if (product != null && product.getNumber().equals(0)) {
System.out.println("商品库存不足");
return;
}
//修改库存
Integer number = product.getNumber() - 1;
product.setNumber(number);
productMapper.updateById(product);
} finally {
redisLockUtil.unlock(lockKey,clientId);
}
}
}
执行结果如图
- 方法二:在代码里编写lua脚本
@Component
public class RedisLockUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
//加锁脚本
private static final String LOCK_SCRIPT =
"-- 键值\n" +
"local key = KEYS[1]\n" +
"-- 值\n" +
"local value = ARGV[1]\n" +
"-- 过期时间\n" +
"local expire = ARGV[2]\n" +
"-- SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]\n" +
"-- NX 仅当key不存在时才设置key\n" +
"-- XX 仅当key已经存在时才设置key\n" +
"local success = redis.call('SET', key, value, 'NX', 'EX', expire)\n" +
"if success then\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
//解锁脚本
private static final String UNLOCK_SCRIPT = "-- 键\n" +
"local key = KEYS[1]\n" +
"-- 删除已存在的键,不存在的 key 会被忽略\n" +
"local success = redis.call('DEL', key)\n" +
"if success then\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
/**
* 获取锁
*
* @param lockKey redis的key
* @param expireTime redis的key 的过期时间 防止死锁,导致其他请求无法正常执行业务
* @return
*/
public boolean lock(String lockKey,String value, int expireTime) {
try {
RedisScript<Long> redisScript = new DefaultRedisScript<>(LOCK_SCRIPT, Long.class);
// 对非string类型的序列化
Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value, expireTime);
return result == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 释放锁
*
* @param lockKey redis的key
* @return
*/
public boolean unlock(String lockKey,String value) {
try {
RedisScript<Long> redisScript = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value);
return result == 1;
} catch (Exception e) {
return false;
}
}
}