高并发缓存实战RedisSon、性能优化
分布式锁性能提升
1.数据冷热分离
对于经常访问的数据保留在redis缓存当中,不用带数据设置超时时间定期删除控制redis的大小
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, 30000, TimeUnit.SECONDS); //读延期
}
2.缓存击穿(失效)
缓存击穿数据库没有被击穿
redisUtil.expire(productCacheKey, 30000, TimeUnit.SECONDS);
如果商家是批量导入的数据,呢么就会同时存到redis中,设置固定的时间就会导致缓存在一瞬间失效,用户访问不到就会将流量打到数据库上造成数据库段时间内抖动。
解决办法:
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
private Integer genProductCacheTimeout() {
//过期时间是不一样的
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
3.缓存穿透
缓存和数据库都没有结果
缓存穿透是指查询一个一定不存在的数据,由于缓存不命中,导致请求直接访问数据库或后端服务,从而影响系统性能甚至崩溃。其发生的原因可能是恶意攻击、不合理的业务逻辑和数据分布等。
例如,一个ID为负数或者非法字符的请求,即使被缓存,也是无效的,每次都需要访问数据库,造成了服务器资源的浪费。如果遇到大量的这种请求,就有可能导致缓存失效,直接访问数据库,甚至造成服务器瘫痪。
为了防止缓存穿透,我们可以采取以下措施:
- 在业务层面对恶意攻击进行限制,如合法字符过滤、请求频率限制等。
- 对于查询结果为空的情况,在缓存中设置一个空对象。
- 使用布隆过滤器(Bloom Filter)等技术来预先过滤掉不合法的请求,减轻数据库的压力。
- 将热点数据(频繁访问的数据)预先加载到缓存中,以提高缓存命中率。
- 设置合理的缓存过期时间,防止缓存中一直存在无效数据。
通过以上措施,可以有效地减少缓存穿透问题的发生,提高系统的性能和稳定性。
3.1对于查询结果为空的情况,在缓存中设置一个空对象
对于数据库也无法访问到数据的,首次访问后设置“{}”到缓存中防止每次都访问数据库增加数据库的io压力,第二次直接可以命中缓存,针对黑客使用多个商品id对redis进行攻击的状况,我们可以设置一个超时时间并延期,时间设置60-90s,防止大量的id撑大redis的存储
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
4.DCL解决突发热点的新增缓存
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
不能使用synchronized,原因:
1.只在单节点的jvm中生效,每个节点都会新增一次(非主要问题)
2.this会锁住这个类,当product0001需要加锁新建缓存的时候,product0001的所有进程都必须要等待,这没有问题,但是product0003、product0004。。。也都需要等待,这就导致了业务被阻塞了,效率低下
所以需要分布式锁
5.缓存数据库双些不一致
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
//synchronized (this) 不是分布式的只能锁住当前jvm进程
//synchronized (this){
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
加锁限制
控制查和写的中间不能插入执行其他的逻辑,在查询和修改的代码中都需要加锁
6.代码的复杂度
其实大部分情况下只有很小一块会被执行,大部分代码块是在完善各种逻辑,但是也是在取舍寻找一种最合适的方案
7.锁优化
1.分段锁
库存1000 则可分为 produc_1 - produc_10 10个线程可以同时执行这段逻辑
2.读写锁
@Transactional
public Product update(Product product) {
Product productResult = null;
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), product);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL 加分布式锁解决热点缓存并发重建问题
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock rLock = readWriteLock.readLock();
rLock.lock();
//synchronized (this) 不是分布式的只能锁住当前jvm进程
//synchronized (this){
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
return product;
}
对同一个id的商品加上同一把锁,读操作加读锁 ,写操作加鞋锁,读锁遇到读锁不会阻断
读写锁是一种用于并发编程中的同步机制。与普通锁相比,读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这样就可以提高并发性和系统吞吐量。
读写锁通常由读锁和写锁两部分组成。当一个线程想要读取共享资源时,它必须获取读锁,并且当没有写线程占用锁时,可以同时有多个读线程同时获取读锁。当一个线程想要修改共享资源时,它必须获取写锁,而不允许其他任何读或写线程获取锁,直到它释放写锁。
使用读写锁可以有效地减少锁竞争,提高程序性能,特别是在多读少写的情况下。但是,过度使用读写锁也会导致一些问题,例如读线程饥饿、写线程优先级过低等。因此,在使用时需要谨慎考虑锁的使用场景和锁的粒度。
- 读操作时,每个线程可以直接获取锁,不需要等待其他线程释放读锁,因为读操作是并发执行的,不会破坏数据的完整性。
- 写操作时,只有一个线程可以获取写锁,其他线程需要等待写锁释放后才能继续执行。Redisson中通过分布式锁的方式来实现写锁的互斥执行。当一个线程请求获取写锁时,Redisson会在Redis服务器上创建一个对应的分布式锁,只有该线程能够成功获取该分布式锁,其他线程则需要等待。
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
-
首先获取指定键(KEYS[1])的值,并获取该键对应哈希表中mode字段的值,判断是否存在。如果不存在,则设置mode为read,表明当前锁状态为读锁;并将当前线程加入到指定键对应的哈希表中,并创建用于记录当前线程的计数器的哈希表,并设置过期时间(ARGV[1])。最后返回nil,表示当前线程成功获取读锁。
-
如果mode字段的值为read,或者为write并且当前线程已经获取了该锁,则将哈希表中该线程的计数器加1,并在单独的哈希表中创建一个记录当前线程持有读锁的键值对,同样设置过期时间。最后返回nil,表示当前线程成功续订读锁。否则,等待其他线程释放读锁。
-
如果mode字段的值为write但是当前线程没有获取该锁,则表明当前锁状态为写锁,不能获取读锁。此时返回当前锁的剩余过期时间(即当前线程等待获取锁的时间),以便客户端等待。
@Override <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId)); }
-
首先获取指定键(KEYS[1])的值,并获取该键对应哈希表中mode字段的值,判断是否存在。如果不存在,则设置mode为write,表明当前锁状态为写锁;并将当前线程加入到指定键对应的哈希表中,并设置过期时间(ARGV[1])。最后返回nil,表示当前线程成功获取写锁。
-
如果mode字段的值为write,则说明当前锁状态为写锁,需要判断当前线程是否已经获取了该锁。如果已经获取了该锁,则将哈希表中该线程的计数器加1,并更新锁的过期时间;最后返回nil,表示当前线程成功续订写锁。否则,等待其他线程释放写锁。
-
如果mode字段的值为非write,则说明当前锁状态为读锁,不能获取写锁。此时返回当前锁的剩余过期时间(即当前线程等待获取锁的时间),以便客户端等待。
后续需要研究读写锁的性能问题
3.串行转并发
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
等待1s之后,锁返回false,如果缓存此时已经将需要的数据加载到缓存当中则可以直接命中缓存返回
但是如果1s内没有完成数据的加载会导致缓存击穿
8.redis多级缓存
一个key会存在redis的一个单节点上,一个redis节点优化后可抗10万并发,会导致各个微服务,功能出现问题,导致大规模的缓存问题,
可以进行限流、降级控制,如果限流出现问题,代码也可以做响应的处理,—多级缓存
JVM的抗并发100万级别,需要不同节点同步(队列) 会出现段时间的数据不一致
Redis多级缓存指的是将Redis和其他缓存系统(如本地内存缓存、Memcached等)结合使用,实现分层缓存的机制。多级缓存通常由两个或多个层次组成:
- 一级缓存:本地内存缓存,主要用于缓存热点数据,可以快速响应客户端请求,减轻Redis服务器的压力。
- 二级缓存:Redis缓存,主要用于持久化缓存数据,保证数据的可靠性,并将数据分布在多个节点上,提高缓存的并发访问能力。
使用多级缓存的好处在于可以充分利用各个缓存的优势,快速响应客户端请求,提高访问效率。具体操作步骤如下:
- 对于读取数据的请求,先从本地内存缓存中读取数据,如果没有则从Redis缓存中读取,并更新本地内存缓存,以便下一次请求时直接从本地内存缓存中获取。
- 对于写入数据的请求,先更新本地内存缓存,再同步更新Redis缓存中的数据。
需要注意的是,在使用多级缓存的过程中,需要保证缓存数据的一致性和可靠性。当二级缓存中的数据发生变化时,需要及时更新一级缓存中的数据,否则会导致数据不一致。此外,需要考虑数据的失效策略和缓存节点选择的问题,避免缓存雪崩或缓存穿透等问题的发生。
对于这种热点中的热点商品,需要一个专门的系统进行实时的维护
9.完整代码
@Service
public class ProductService {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24;
public static final String EMPTY_CACHE = "{}";
public static final String LOCK_PRODUCT_HOT_CACHE_PREFIX = "lock:product:hot_cache:";
public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
//Map 伪代码 图一乐 真实使用多级缓存框架 热点系统
public static Map<String, Product> productMap = new ConcurrentHashMap<>();
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = null;
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), product);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//DCL 加分布式锁解决热点缓存并发重建问题
RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
hotCacheLock.lock();
//boolean result = hotCacheLock.tryLock(3, TimeUnit.SECONDS);
try {
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//RLock updateProductLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock rLock = readWriteLock.readLock();
rLock.lock();
//synchronized (this) 不是分布式的只能锁住当前jvm进程
//synchronized (this){
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
productMap.put(productCacheKey, product);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCacheLock.unlock();
}
return product;
}
private Integer genProductCacheTimeout() {
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
public static void main(String[] args) {
System.out.println( new Random().nextInt(5) * 60 * 60);
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
private Product getProductFromCache(String productCacheKey) {
Product product = productMap.get(productCacheKey);
if (product != null) {
return product;
}
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS); //读延期
}
return product;
}
}