redis与分布式锁浅谈
1.高并发下缓存失效问题
1.1 缓存穿透:
缓存穿透:
指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
风险:
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃
解决:
null结果缓存,并加入短暂过期时间
1.2 缓存雪崩
缓存雪崩:
缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩
风险:
DB瞬时压力过重雪崩
解决:
原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件
1.3缓存穿透
缓存穿透:
对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。
风险:
数据库瞬时压力增大,最终导致崩溃
解决:
加锁大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db
2.分布式锁
分布式锁需要解决的问题:
问题1:
setnx占好了位,业务代码异常或者程序在页面过程中宕机,没有执行删除锁逻辑,这就造成了死锁
解决:
设置锁的自动过期,即使没有删除,会自动删除
问题2:
setnx设置好,正要去设置过期时间,宕机。又死锁了。
解决:
设置过期时间和占位必须是原子的。redis支持使用setnx ex命令
问题3:
删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。
解决:
占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除
问题4:
如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁
解决:
删除锁必须保证原子性。使用redis+Lua脚本完成。 String script = “if redis.call(‘get’,
KEYS[1]) == ARGV[1] then return redis.call(‘del’, KEYS[1]) else return 0 end”;
保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 更难的事情,锁的自动续期
背景:最近遇到一个生产问题,分布式部署了十几条服务器,有个业务过期的定时任务会每天发邮件提醒用户,然而用户最近反馈,每天收到好几封提醒邮件,于是排查多发的原因。这个分布式锁很重要,大概率是没锁住
2.1 RedisTemplate(或stringRedisTemplate) 实现
public void doSendEmail() {
//1.生成随机数
String uuid = UUID.randomUUID().toString();
//2.设置分布式锁,并设置过期时间120秒
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock_grant_expire_notice", uuid, 120, TimeUnit.SECONDS);
//3.获取到锁
try {
if (lock) {
//4.抢到锁把计数归零 RECURSIVE_CALL_TIMES 是定义在类下面的常量 private int RECURSIVE_CALL_TIMES = 0;
RECURSIVE_CALL_TIMES = NumberUtils.INTEGER_ZERO;
//5.获取邮件是否已发送标识
Object isSendFlag = redisTemplate.opsForValue().get("mail_is_send_flag");
//6.没有值,就是未发送邮件
if (isSendFlag == null || StringUtils.isEmpty(isSendFlag)) {
//加锁成功...执行业务
//7.发送邮件的真正业务
//sendMail();
redisTemplate.opsForValue().set("mail_is_send_flag", UUID.randomUUID().toString(), 23 * 60 * 60, TimeUnit.SECONDS);
} else {
//8.邮件已发送,无需重复发送
log.info("mail already send,there's no need to send it twice");
}
//9.释放分布式锁,对比uuid值是为了只删除自己的锁,且对比值和删锁是原子操作
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList("lock_grant_expire_notice"), uuid);
} else {
//10.未获取到分布式锁,尝试自璇,每10秒递归调用一次,尝试获取分布式锁,最多尝试5次
Thread.sleep(10000);
if (RECURSIVE_CALL_TIMES <= 5) {
RECURSIVE_CALL_TIMES += NumberUtils.INTEGER_ONE;
doSendEmail();
}
}
} catch (Exception e) {
log.error("execute send mail fail,message:" + e);
}
}
2.2 JedisCluster 实现
Jedis初始化类,连接redis,以及一些常用的方法
@Component("redisClusterConfig")
public class RedisClusterConfig {
private static Log log = LogFactory.getLog(RedisClusterConfig.class);
private volatile JedisCluster jedisCluster;
public RedisClusterConfig() {
initCluster();
}
public JedisCluster getClusterResource() {
initCluster();
return jedisCluster;
}
private void initCluster() {
if (null != jedisCluster) {
return;
}
synchronized (this) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//redis的相关配置文件(把redis的相关信息配置在一个文件中,读取文件)
Properties properties = PropertyUtils.loadProperty("redis-context.properties");
jedisPoolConfig.setMaxTotal(PropertyUtils.getIntegerProperty(properties, "redis.pool.maxTotal", 50));
jedisPoolConfig.setMaxIdle(PropertyUtils.getIntegerProperty(properties, "redis.pool.maxIdle", 20));
jedisPoolConfig.setMinIdle(PropertyUtils.getIntegerProperty(properties, "redis.pool.minIdle", 10));
jedisPoolConfig.setMaxWaitMillis(PropertyUtils.getIntegerProperty(properties, "redis.pool.maxWaitMillis", 10000));
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(PropertyUtils.getIntegerProperty(properties, "redis.pool.timeBetweenEvictionRunsMills", 60000));
jedisPoolConfig.setTestOnBorrow(false);
jedisPoolConfig.setTestOnReturn(false);
jedisPoolConfig.setTestWhileIdle(true);
String clusterHost = properties.getProperty("redis.pool.cluster.host");
int timeout = PropertyUtils.getIntegerProperty(properties, "redis.timeout", 2000);
int sockettime = PropertyUtils.getIntegerProperty(properties, "redis.pool.cluster.sockettimeout", 2000);
int maxAttempts = PropertyUtils.getIntegerProperty(properties, "redis.pool.cluster.maxAttempts", 2000);
String password = properties.getProperty("redis.pool.cluster.password");
Set<HostAndPort> nodes = new HashSet<>();
String[] hosts = clusterHost.split(",");
for (String ipPort : hosts) {
String[] ipPortArr = ipPort.split(":");
String ip = ipPortArr[0];
int port = Integer.parseInt(ipPortArr[1]);
nodes.add(new HostAndPort(ip, port));
}
jedisCluster = new JedisCluster(nodes, timeout, sockettime, maxAttempts, password, jedisPoolConfig);
}
}
@Override
protected void finalize() throws Throwable {
super.finalize();
if (jedisCluster != null) {
jedisCluster.close();
}
}
/**
* 获取分布式锁
*
* @param lockKey
* @param value
* @param expireTime
* @return
*/
public boolean getLock(String lockKey, String value, int expireTime) {
String LOCK_SUCCESS = "OK";
boolean clusterRtnValue = false;
lockKey = replace4set(lockKey);
JedisCluster clusterResource = getClusterResource();
try {
String result = clusterResource.set(lockKey, value, "NX", "EX", expireTime);
if (LOCK_SUCCESS.equalsIgnoreCase(result)) {
clusterRtnValue = true;
}
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtnValue;
}
public boolean releaseLock(String lockKey, String value) {
Long RELEASE_SUCCESS = 1L;
boolean clusterRtnValue = false;
lockKey = replace4set(lockKey);
JedisCluster clusterResource = getClusterResource();
try {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = clusterResource.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value));
if (RELEASE_SUCCESS.equals(result)) {
clusterRtnValue = true;
}
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtnValue;
}
public String set(String key, String value) {
String clusterRtnValue = null;
JedisCluster clusterResource = getClusterResource();
try {
clusterRtnValue = clusterResource.set(key, value);
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtnValue;
}
public String setEx(String key, String value, int seconds) {
String clusterRtValue = null;
key = replace4set(key);
JedisCluster clusterResource = getClusterResource();
try {
clusterRtValue = clusterResource.setex(key, seconds, value);
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtValue;
}
public String get(String key) {
String clusterRtValue = null;
key = replace4set(key);
JedisCluster clusterResource = getClusterResource();
try {
clusterRtValue = clusterResource.get(key);
} catch (Exception e) {
log.error("exception:" + e);
}
return clusterRtValue;
}
private String replace4set(String str) {
return str.replaceAll("\\{", "[").replaceAll("}", "]");
}
}
redis-context.properties配置内容
redis.pool.maxTotal=50
redis.pool.maxIdle=20
redis.pool.minIdle=10
redis.pool.maxWaitMillis=10000
redis.pool.timeBetweenEvictionRunsMills=60000
redis.pool.cluster.host=192.168.10.128:6379,192.168.10.131:6379
redis.timeout=2000
redis.pool.cluster.sockettimeout=2000
redis.pool.cluster.maxAttempts=2000
redis.pool.cluster.password=420188
分布式锁发送邮件
public void doSendEmail() {
//1.生成随机数
String redisLockValue = UUID.randomUUID().toString();
//2.设置分布式锁,并设置过期时间120秒
boolean lock = redisClusterConfig.getLock("lock_grant_expire_notice", redisLockValue, 120);
try {
if (lock) {
//抢到锁把计数归零
RECURSIVE_CALL_TIMES = NumberUtils.INTEGER_ZERO;
//获取邮件是否已发送标识
String isSendFlag = redisClusterConfig.get("mail_is_send_flag");
//未发送
if (StringUtils.isBlank(isSendFlag)) {
//发送邮件的真正业务
//sendMail();
redisClusterConfig.setEx("mail_is_send_flag", UUID.randomUUID().toString(), 23 * 60 * 60);
} else {
//已发送
log.info("mail already send,there's no need to send it twice");
}
//释放分布式锁
redisClusterConfig.releaseLock("lock_grant_expire_notice", redisLockValue);
return;
} else {
//未获取到分布式锁
Thread.sleep(10000);
if (RECURSIVE_CALL_TIMES <= 5) {
RECURSIVE_CALL_TIMES += NumberUtils.INTEGER_ONE;
doSendEmail();
}
}
} catch (Exception e) {
log.error("execute send mail fail,message:" + e);
}
}
2.3 Redisson 实现
以上两种方法都差不多,但无法解决redis续期问题,如果业务执行时间超过了分布式锁的过期时间,会有问题。当然 把分布式锁时间设置稍长一点一般也没什么大问题。redisson在业务未执行完会自动续期
public void doSendEmail() {
//创建分布式锁
RLock lock = redisson.getLock("lock_grant_expire_notice");
try {
//获取分布式锁(参数1:等待时间,参数2:过期时间 参数3:时间单位)
if (lock.tryLock(0, 120000, TimeUnit.MILLISECONDS)) {
//抢到锁把计数归零
RECURSIVE_CALL_TIMES = NumberUtils.INTEGER_ZERO;
//获取邮件是否已发送标识
String isSendFlag = redisClusterConfig.get("mail_is_send_flag");
//未发送
if (StringUtils.isBlank(isSendFlag)) {
//发送邮件的真正业务
//sendMail();
redisClusterConfig.setEx("mail_is_send_flag", UUID.randomUUID().toString(), 23 * 60 * 60);
} else {
//已发送
log.info("mail already send,there's no need to send it twice");
}
//释放分布式锁
lock.unlock();
return;
} else {
//未获取到分布式锁
Thread.sleep(10000);
if (RECURSIVE_CALL_TIMES <= 5) {
RECURSIVE_CALL_TIMES += NumberUtils.INTEGER_ONE;
doSendEmail();
}
}
} catch (Exception e) {
log.error("execute send mail fail,message:" + e);
}
}