项目收获总结--Redis的知识收获

news2024/11/15 17:30:05
一、概述

最近几天公司项目开发上线完成,做个收获总结吧~ 今天记录Redis的收获和提升。
在这里插入图片描述

二、Redis异步队列

Redis做异步队列一般使用 list 结构作为队列,rpush 生产消息,lpop 消费消息。当 lpop 没有消息的时候,要适当sleep再重试。若不用 sleep,list 还有个指令叫 blpop,在没有消息的时候,它会阻塞队列直到消息到来。

若要生产一次消费多次的需求,可以使用 pub/sub 主题订阅者模式,可以实现 1:N 的消息队列。但是缺点在消费者下线的情况下,生产的消息会丢失,还得使用专业的消息队列如 RabbitMQ等。

若要redis实现延时队列,使用 sortedset,拿时间戳作为score,消息内容作为 key 调用 zadd 来生产消息,消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。

三、Redis分布式锁

核心思路是:先用setnx来争抢锁,抢到之后,再用 expire 给锁加一个过期时间防止锁忘记释放。若在 setnx 之后执行expire之前进程意外崩溃或者要重启维护,导致锁永远得不到释放,就要采用 set 指令配合非常复杂的参数,可以同时把 setnx 和 expire 合成一条指令来用:

SET key value NX EX 10
3.1 Redis分布式锁的实现

pom.xml引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.7.RELEASE</version>
</dependency>

配置Redis:

 
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.password=123456
spring.redis.timeout=10000
 
# 设置jedis连接池
spring.redis.jedis.pool.max-active=50
spring.redis.jedis.pool.min-idle=20

配置RedisConfig属性:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
 
@Configuration
public class RedisConfig {
 
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws Exception {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 3.创建 序列化类
        GenericToStringSerializer genericToStringSerializer = new GenericToStringSerializer(Object.class);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(genericToStringSerializer);
//        redisTemplate.setKeySerializer(new StringRedisSerializer());
//        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

RedisLock工具类:

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
@Slf4j
@Component
public class RedisLock {
    @Resource
    private RedisTemplate redisTemplate;
    
    private static Map<String, LockInfo> lockInfoMap = new ConcurrentHashMap<>();
    private static final Long SUCCESS = 1L;
    
    @Data
    public static class LockInfo {
        private String key;
        private String value;
        private int expireTime;
        //更新时间
        private long renewalTime;
        //更新间隔
        private long renewalInterval;
        public static LockInfo getLockInfo(String key, String value, int expireTime) {
            LockInfo lockInfo = new LockInfo();
            lockInfo.setKey(key);
            lockInfo.setValue(value);
            lockInfo.setExpireTime(expireTime);
            lockInfo.setRenewalTime(System.currentTimeMillis());
            lockInfo.setRenewalInterval(expireTime * 2000 / 3);
            return lockInfo;
        }
    }
    
    /**
     * Lua脚本
     * // 加锁
     * if
     *     redis.call('setNx',KEYS[1],ARGV[1])
     *   then
     *     if redis.call('get',KEYS[1])==ARGV[1]
     *     return redis.call('expire',KEYS[1],ARGV[2])
     *   else
     *     return 0
     *   end
     * end
     *
     * // 解锁
     *   redis.call('get', KEYS[1]) == ARGV[1]
     * then
     *   return redis.call('del', KEYS[1])
     * else
     *   return 0
     *
     *   //更新时间
     *   if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end
     */
     
    /**
     * 使用lua脚本加锁
     *
     * @param lockKey    锁
     * @param value      身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @Desc 注意事项,redisConfig配置里面必须使用 genericToStringSerializer序列化,否则获取不了返回值
     */
    public boolean tryLock(String lockKey, String value, int expireTime) {
 
        String luaScript = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";
 
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        //Object result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey),value,expireTime + "");
        // Object result = redisTemplate.execute(redisScript, new StringRedisSerializer(), new StringRedisSerializer(), Collections.singletonList(lockKey), identity, expireTime);
 
        Object result = redisTemplate.execute(redisScript, keys, value, expireTime);
        log.info("已获取到{}对应的锁!", lockKey);
 
        if (expireTime >= 10) {
            lockInfoMap.put(lockKey + value, LockInfo.getLockInfo(lockKey, value, expireTime));
        }
 
        return (boolean) result;
    }
 
    /**
     * 使用lua脚本释放锁
     *
     * @param lockKey
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean unlock(String lockKey, String value) {
 
        lockInfoMap.remove(lockKey + value);
 
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        Object result = redisTemplate.execute(redisScript, keys, value);
        log.info("解锁成功:{}", result);
        return (boolean) result;
    }
 
 
    /**
     * 使用lua脚本更新redis锁的过期时间
     *
     * @param lockKey
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean renewal(String lockKey, String value, int expireTime) {
 
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Object result = redisTemplate.execute(redisScript, keys, value, expireTime);
        log.info("更新redis锁的过期时间:{}", result);
        return (boolean) result;
    }
 
 
    /**
     * redisTemplate加锁
     *
     * @param lockKey    锁
     * @param value      身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @return 成功返回true, 失败返回false
     */
    public boolean lock(String lockKey, String value, long expireTime) {
        return redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);
    }
 
    /**
     * redisTemplate解锁
     *
     * @param key
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean unlock2(String key, String value) {
        Object currentValue = redisTemplate.opsForValue().get(key);
        boolean result = false;
        if (StringUtils.isNotEmpty(String.valueOf(currentValue)) && currentValue.equals(value)) {
            result = redisTemplate.opsForValue().getOperations().delete(key);
        }
        return result;
    }
 
 
    /**
     * 定时去检查redis锁的过期时间
     * @Param
     * @Return
     */
    @Scheduled(fixedRate = 5000L)
    @Async("redisExecutor")
    public void renewal() {
        long now = System.currentTimeMillis();
        for (Map.Entry<String, LockInfo> lockInfoEntry : lockInfoMap.entrySet()) {
            LockInfo lockInfo = lockInfoEntry.getValue();
            if (lockInfo.getRenewalTime() + lockInfo.getRenewalInterval() < now) {
                renewal(lockInfo.getKey(), lockInfo.getValue(), lockInfo.getExpireTime());
                lockInfo.setRenewalTime(now);
                log.info("lockInfo {}", JSON.toJSONString(lockInfo));
            }
        }
    }
 
    /**
     * 分布式锁设置单独线程池
     * @Param
     * @Return
     */
    @Bean("redisExecutor")
    public Executor redisExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("redis-renewal-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        return executor;
    }
}

开启定时任务:在启动类上加 @EnableScheduling 注解

@SpringBootApplication
@MapperScan(value = "com.example.recordlog.mapper")
@EnableAspectJAutoProxy(proxyTargetClass = true)
//开启定时任务
@EnableScheduling
public class RecordLogApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(RecordLogApplication.class, args);
    }
}

Controller测试接口:

 
 
import com.example.recordlog.tools.RedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.List;
import java.util.UUID;
 
@RestController
@RequestMapping("/api")
public class OperateController {

@Autowired
private RedisLock redisLock;

@RequestMapping(value = "/locked", method = {RequestMethod.GET})
public void lockedTest() {
        String key = String.format("data-mining:task_statistic:%d", System.currentTimeMillis());
        String requestId = UUID.randomUUID().toString();
 
        boolean locked = false;
        try {
            locked = redisLock.tryLock(key, requestId, 30);
            if (!locked) {
                return;
            }
            //执行业务逻辑
            System.out.println("---------------->>>执行业务逻辑");
        } finally {
            if (locked) {
                redisLock.unlock(key, requestId);
            }
        }
    }
}
3.2 锁过期问题

上述代码提到使用lua脚本处理锁更新,就顺带记录当Redis 分布式锁过期但任务未完成时的三种处理方案。

锁过期问题是:在使用Redis分布式锁时锁恰好到过期时间,但业务逻辑还没有处理完毕,这可能导致多个进程同时进入临界区,造成数据不一致或业务逻辑冲突、资源浪费等其他问题。
假设分布式锁使用的简单逻辑如下:

获取锁:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

public class RedisLock {
    private Jedis jedis;
    private String lockKey;
    private int lockExpire;

    public RedisLock(Jedis jedis, String lockKey, int lockExpire) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockExpire = lockExpire;
    }

    public boolean tryLock(String requestId) {
        SetParams params = new SetParams();
        params.nx().px(lockExpire);
        String result = jedis.set(lockKey, requestId, params);
        return "OK".equals(result);
    }

    public void unlock(String requestId) {
        String script =
                "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                "return redis.call('del', KEYS[1]) " +
                "else return 0 end";
        jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
    }
}

使用锁:


Jedis jedis = new Jedis("localhost");
RedisLock redisLock = new RedisLock(jedis, "my_lock", 5000);

String requestId = UUID.randomUUID().toString();
if (redisLock.tryLock(requestId)) {
    try {
        // 业务逻辑
        Thread.sleep(6000); // 模拟业务逻辑处理时间超过锁过期时间
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        redisLock.unlock(requestId);
    }
} else {
    System.out.println("获取锁失败");
}

解决方案:
(1)自动续期:当锁接近过期时,自动延长锁的过期时间,确保业务逻辑在锁持有期间不会被其他进程获取。使用 ScheduledExecutorService 定期检查并延长锁的过期时间,确保锁在业务逻辑处理完之前不会过期。提供简单代码思路:

public class RedisLockWithRenewal extends RedisLock {
    private ScheduledExecutorService scheduler;

    public RedisLockWithRenewal(Jedis jedis, String lockKey, int lockExpire) {
        super(jedis, lockKey, lockExpire);
        scheduler = Executors.newScheduledThreadPool(1);
    }

    @Override
    public boolean tryLock(String requestId) {
        boolean locked = super.tryLock(requestId);
        if (locked) {
            startAutoRenewal(requestId);
        }
        return locked;
    }

    private void startAutoRenewal(String requestId) {
        scheduler.scheduleAtFixedRate(() -> {
            String script =
                    "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "return redis.call('pexpire', KEYS[1], ARGV[2]) " +
                    "else return 0 end";
            jedis.eval(script, Collections.singletonList(lockKey), Arrays.asList(requestId, String.valueOf(lockExpire)));
        }, lockExpire / 3, lockExpire / 3, TimeUnit.MILLISECONDS);
    }

    @Override
    public void unlock(String requestId) {
        scheduler.shutdown();
        super.unlock(requestId);
    }
}

(2)锁重入机制:锁重入机制允许持有锁的线程可以再次获取锁而不被阻塞,这可以通过记录锁持有者的信息来实现。同一线程可以多次获取锁,直到所有业务逻辑执行完毕,最后一次调用 unlock 时才真正释放锁。

import java.util.concurrent.ConcurrentHashMap;

public class ReentrantRedisLock extends RedisLock {
    private ConcurrentHashMap<String, Integer> lockHolderMap = new ConcurrentHashMap<>();

    public ReentrantRedisLock(Jedis jedis, String lockKey, int lockExpire) {
        super(jedis, lockKey, lockExpire);
    }

    @Override
    public synchronized boolean tryLock(String requestId) {
        if (lockHolderMap.containsKey(requestId)) {
            lockHolderMap.put(requestId, lockHolderMap.get(requestId) + 1);
            return true;
        } else {
            boolean locked = super.tryLock(requestId);
            if (locked) {
                lockHolderMap.put(requestId, 1);
            }
            return locked;
        }
    }

    @Override
    public synchronized void unlock(String requestId) {
        if (lockHolderMap.containsKey(requestId)) {
            int count = lockHolderMap.get(requestId);
            if (count > 1) {
                lockHolderMap.put(requestId, count - 1);
            } else {
                lockHolderMap.remove(requestId);
                super.unlock(requestId);
            }
        }
    }
}

(3)使用 Redisson 库:Redisson是Redis客户端,提供分布式锁的实现,支持自动续期、锁重入等高级特性,简化分布式锁的使用。

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedissonLockExample {
    public static void main(String[] args) {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient = Redisson.create(config);

        RLock lock = redissonClient.getLock("my_lock");
        try {
            if (lock.tryLock(0, 10, TimeUnit.SECONDS)) {
                try {
                    // 业务逻辑
                    Thread.sleep(6000); // 模拟业务逻辑处理时间
                } finally {
                    lock.unlock();
                }
            } else {
                System.out.println("获取锁失败");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            redissonClient.shutdown();
        }
    }
}
四、Redis不立刻删除已过期数据的原因

比较有印象的经历,曾经尝试监听redis key过期来实现延迟消息发送,后来发现这个延迟非常不稳定,明明key已经过期,但还是没有触发,后来了解到过期key不会立马删除…这个问题需要对Redis 内存管理有了解,下面通过四个问题来考虑吧!

4.1 Redis 给缓存数据设置过期时间有什么用?

内存是有限且珍贵的,如果不对缓存数据设置过期时间,那内存占用就会一直增长,最终可能会导致 OOM 问题。通过设置合理的过期时间,Redis 会自动删除暂时不需要的数据,为新的缓存数据腾出空间。

Redis 自带给缓存数据设置过期时间的功能,比如:

127.0.0.1:6379> expire key 60 # 数据在 60s 后过期
(integer) 1
127.0.0.1:6379> setex key 60 value # 数据在 60s 后过期 (setex:[set] + [ex]pire)
OK
127.0.0.1:6379> ttl key # 查看数据还有多久过期
(integer) 56

Redis 中除字符串类型是独有设置过期时间的命令 setex 外,其他类型都需要依靠 expire 命令来设置过期时间 。另外, persist 命令可以移除一个键的过期时间。

当然,除缓解内存的消耗,还有其他用途,很多时候,业务场景就是需要某个数据只在某一时间段内存在,比如短信验证码可能只在 1 分钟内有效,用户登录的 Token 可能只在 1 天内有效,秒杀系统等。

若用传统的数据库来处理的话,基本是SQL判断过期,更麻烦且性能很差。

4.2 Redis 如何判断数据是否过期?

Redis 通过过期字典(可看作是 hash 表)来保存数据过期的时间。过期字典的键指向 Redis 数据库中的某个 key(键),过期字典的值是一个 long long 类型的整数,这个整数保存 key 所指向的数据库键的过期时间(毫秒精度的 UNIX 时间戳)。
在这里插入图片描述
过期字典是存储在 redisDb 这个结构里的:

typedef struct redisDb {
    ...
    dict *dict;     //数据库键空间,保存着数据库中所有键值对
    dict *expires   // 过期字典,保存着键的过期时间
    ...
} redisDb;

在查询一个 key 的时候,Redis 首先检查该 key 是否存在于过期字典中(时间复杂度为 O(1)),如果不在就直接返回,在的话需要判断一下这个 key 是否过期,过期直接删除 key 然后返回 null。

4.3 Redis 过期 key 删除策略?

如果设置一批 key 只能存活 1 分钟,那1分钟后,Redis 对这批 key 进行删除,就需要遵循删除策略。常用的过期数据的删除策略有四种:

1)惰性删除:只会在取出/查询 key 的时候才对数据进行过期检查。这种方式对CPU最友好,但是可能会造成太多过期key没有被删除。
(2)定期删除:周期性地随机从设置过期时间的 key 中抽查一批,然后逐个检查这些 key 是否过期,过期就删除 key。相比于惰性删除,定期删除对内存更友好,对 CPU 不太友好。
(3)延迟队列:把设置过期时间的 key 放到一个延迟队列里,到期之后就删除 key。这种方式可以保证每个过期 key 都能被删除,但维护延迟队列太麻烦,队列本身也要占用资源。
(4)定时删除:每个设置过期时间的 key 都会在设置的时间到达时立即被删除。这种方法可以确保内存中不会有过期的键,但是它对 CPU 的压力最大,因为它需要为每个键都设置一个定时器。

Redis 采用的是 定期删除+惰性/懒汉式删除 结合的策略,这也是大部分缓存框架的选择。定期删除对内存更加友好,惰性删除对 CPU 更加友好。两者各有优势,结合起来使用既能兼顾 CPU 友好,又能兼顾内存友好。

Redis 的定期删除过程是随机的(周期性地随机从设置过期时间的 key 中抽查一批),也因此并不保证所有过期键都会被立即删除。这也就解释为什么有的 key 已经过期,但并没有被删除。并且,Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。
另外,定期删除还受到执行时间和过期 key 的比例的影响:

  • 执行时间已经超过阈值,那就中断本次定期删除循环,避免使用过多占用CPU 。
  • 如果这批过期 key比例超过一个比例,就会重复执行此删除流程,更积极地清理过期 key。相应地,若过期的 key 比例低于这个比例,就会中断这次定期删除循环,避免做过多的工作而获得很少的内存回收。

查看源码所知:Redis7.2版本的执行时间阈值是25ms,过期key比例设定值是 10%

#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which we do extra efforts. */

每次随机抽查数量在expire.c中定义,Redis 7.2 版本为 20 ,即每次会随机选择 20 个已设置过期时间的 key 判断是否过期。

#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */

那如何控制定期删除的执行频率?

在 Redis 中,定期删除的频率是由 hz 参数控制的。hz 默认为 10,代表每秒执行 10 次,也就是每秒钟进行 10 次尝试来查找并删除过期的 key

hz 的取值范围为 1~500。增大 hz 参数的值会提升定期删除的频率。如果想要更频繁地执行定期删除任务,可以适当增加 hz 的值,但这会加 CPU 的使用率。根据 Redis 官方建议,hz 的值不建议超过100,其实使用默认的 10 就已足够。

下面是 hz 参数的官方注释(Redis 7.2 版本redis.conf中)的重要信息。
在这里插入图片描述
类似的参数还有 “dynamic-hz”(redis.conf中),这个参数开启之后 Redis 就会在 hz 的基础上动态计算一个值。Redis 提供并默认启用使用自适应 hz 值的能力:

# 默认为 10
hz 10
# 默认开启
dynamic-hz yes

另外:hz 参数控制着定期删除过期 key 的定期任务之外,还控制一些其他定期任务例如关闭超时的客户端连接、更新统计信息等。

最后,定期删除不是把所有过期 key 都删除, 若key 数量非常庞大的话,挨个遍历检查是非常耗时的,会严重影响性能。Redis 设计这种策略的目的就是平衡内存和性能。
而 key 过期之后不立马删除是因为成本太高不太好办到,就算使用延迟队列作为删除策略,这样存在问题:

1)队列本身的开销可能很大:key 多的情况下,一个延迟队列可能无法容纳。
(2)维护延迟队列太麻烦:修改 key 的过期时间就需要调整期在延迟队列中的位置,并且,还需要引入并发控制。
4.4 如何防止大量 key 集中过期?

如果存在大量 key 集中过期的问题,可能会使 Redis 的请求延迟变高。可选方案:

1)尽量避免 key 集中过期,在设置键的过期时间时尽量随机一点。
(2)对过期的 key 开启 lazyfree 机制(修改 redis.conf 中的 lazyfree-lazy-expire参数即可),这样会在后台异步删除过期的 key,不会阻塞主线程的运行。
五、Redis bigkey处理

若key 对应的 value 所占用的内存比较大,那这个 key 就可以看作是 bigkey。
在这里插入图片描述

String 类型的 value 超过 1MB
复合类型(List、Hash、Set、Sorted Set 等)的 value 包含的元素超过 5000

bigkey会引发:客户端超时阻塞、网络阻塞、工作线程阻塞

1.使用 Redis 自带的 --bigkeys 参数来查找

# redis-cli -p 6379 --bigkeys

# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).

[00.00%] Biggest string found so far '"ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20"' with 4437 bytes
[00.00%] Biggest list   found so far '"my-list"' with 17 items

-------- summary -------

Sampled 5 keys in the keyspace!
Total key length in bytes is 264 (avg len 52.80)

Biggest   list found '"my-list"' has 17 items
Biggest string found '"ballcat:oauth:refresh_auth:f6cdb384-9a9d-4f2f-af01-dc3f28057c20"' has 4437 bytes

1 lists with 17 items (20.00% of keys, avg size 17.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
4 strings with 4831 bytes (80.00% of keys, avg size 1207.75)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
0 sets with 0 members (00.00% of keys, avg size 0.00)
0 zsets with 0 members (00.00% of keys, avg size 0.00

2.使用 Redis 自带的 SCAN 命令
3.借助开源工具分析 RDB 文件
4.借助公有云的 Redis 分析服务

处理方案:

(1)分割 bigkey:将一个 bigkey 分割为多个小 key。例如,将一个含有上万字段数量的 Hash 按照一定策略(比如二次哈希)拆分为多个 Hash。
(2)手动清理:Redis 4.0+ 可以使用 UNLINK 命令来异步删除一个或多个指定的 key。Redis 4.0 以下可以考虑使用 SCAN 命令结合 DEL 命令来分批次删除。
(3)采用合适的数据结构:例如,文件二进制数据不使用 String 保存、使用 HyperLogLog 统计页面 UV、Bitmap 保存状态信息(0/1)。
(4)开启 lazy-free(惰性删除/延迟释放) :lazy-free 特性是 Redis 4.0 开始引入的,指的是让 Redis 采用异步方式延迟释放 key 使用的内存,将该操作交给单独的子线程处理,避免阻塞主线程。
六、Redis持久化机制

使用缓存的时候,需要对内存中的数据进行持久化,也就是将内存中的数据写入到硬盘中。主要是为之后重用数据(比如重启机器、机器故障之后恢复数据),或者是为做数据同步(比如 Redis 集群的主从节点通过 RDB 文件同步数据)。
Redis 支持持久化方式:

快照(snapshotting,RDB)
只追加文件(append-only file, AOF)
RDB和AOF混合持久化(Redis4.0新增,通过配置项 aof-use-rdb-preamble 开启,AOF 重写的时候就直接把 RDB 的内容写到 AOF 文件开头) 
6.1 RDB持久化

通过创建快照来获得存储在内存里面的数据在 某个时间点 上的副本,然后将快照复制到其他服务器从而创建具有相同数据的服务器副本(Redis 主从结构,主要用来提高 Redis 性能),也可将快照留存本地以便重启服务器时使用。

快照持久化是 Redis 默认采用的持久化方式,在 redis.conf 配置文件中配置:

save 900 1           #在900(15分钟)之后,如果至少有1个key发生变化,Redis就会自动触发bgsave命令创建快照。
save 300 10          #在300(5分钟)之后,如果至少有10个key发生变化,Redis就会自动触发bgsave命令创建快照。
save 60 10000        #在60(1分钟)之后,如果至少有10000个key发生变化,Redis就会自动触发bgsave命令创建快照。

Redis 提供两个命令来生成 RDB 快照文件:

save : 同步保存操作,会阻塞 Redis 主线程(Redis启动后是通过单线程的方式工作);
bgsave : fork 出一个子进程,子进程执行,不会阻塞 Redis 主线程,默认选项。
6.2 AOF持久化

AOF持久化的实时性比RDB好,通过 appendonly 参数开启:

appendonly yes

开启 AOF 持久化后每执行一条会更改 Redis 中的数据的命令,Redis 就会将该命令写入到 AOF 缓冲区 server.aof_buf 中,然后再写入到 AOF 文件中(此时还在系统内核缓存区未同步到磁盘,依然存在数据丢失的风险),最后再根据持久化方式( fsync策略)的配置来决定何时将系统内核缓存区的数据同步到硬盘中完成持久化保存。

AOF 文件的保存位置和 RDB 文件的位置相同,都通过 dir 参数设置,默认的文件名是 appendonly.aof。

6.3 AOF工作流程

AOF 持久化功能的实现可以简单分为 5 步:

1)命令追加(append):所有的写命令会追加到 AOF 缓冲区中。
(2)文件写入(write):将 AOF 缓冲区的数据写入到 AOF 文件中。这一步需要调用write函数(系统调用),write将数据写入系统内核缓冲区之后直接返回(延迟写)。注意此时并没有同步到磁盘。
(3)文件同步(fsync):AOF 缓冲区根据对应的持久化方式( fsync 策略)向硬盘做同步操作。这一步需要调用 fsync 函数(系统调用), fsync 针对单个文件操作,对其进行强制硬盘同步,fsync 将阻塞直到写入磁盘完成后返回,保证了数据持久化。
(4)文件重写(rewrite):随着 AOF 文件越来越大,需要定期对 AOF 文件进行重写,达到压缩的目的。
(5)重启加载(load):当 Redis 重启时,可以加载 AOF 文件进行数据恢复。

名词解释:

1)系统调用(syscall):Linux系统直接提供用于对文件和设备进行访问和控制的函数;
(2)write:写入系统内核缓冲区之后直接返回(仅是写到缓冲区),不会立即同步到硬盘。虽然提高效率,但也有数据丢失的风险。同步硬盘操作通常依赖于系统调度机制,Linux 内核通常为 30s 同步一次,具体值取决于写出的数据量和 I/O 缓冲区的状态。
(3)fsync:fsync用于强制刷新系统内核缓冲区(同步到到磁盘),确保写磁盘操作结束才会返回。

AOF 工作流程表示为:
在这里插入图片描述

6.4 AOF持久化方式

Redis的配置文件中存在三种不同的 AOF 持久化方式( fsync策略),主要区别在于 fsync 同步 AOF 文件的时机(刷盘)。

1)appendfsync always:主线程调用 write 执行写操作后,后台线程(aof_fsync线程)立即
    调用fsyn 函数同步AOF文件(刷盘),fsync完成后线程返回,这样会严重降低Redis的性能(write+fsync).2)appendfsync everysec:主线程调用write执行写操作后立即返回,由后台线程(aof_fsync线程)每秒钟
    调用fsync函数(系统调用)同步一次 AOF文件(write+fsync,fsync间隔为1秒)
    
(3)appendfsync no:主线程调用write执行写操作后立即返回,让操作系统决定何时进行同步,
    Linux下一般为30秒一次(write但不fsync,fsync的时机由操作系统决定)

因此要兼顾数据和写入性能,可以考虑 appendfsync everysec 策略 ,让Redis每秒同步一次 AOF 文件,Redis 性能受到的影响较小。而且这样即使出现系统崩溃,最多只会丢失一秒之内产生的数据。当硬盘忙于执行写入操作时,Redis 还会适当放慢速度以适应硬盘的最大写入速度。

ps:有个了解知识点,详情参考:《Redis 7.0 Multi Part AOF 的设计和实现》

从Redis7.0开始,Redis使用Multi Part AOF机制,将原来的单个AOF文件拆分成多个AOF文件。在Multi Part AOF中,AOF文件被分为三种类型,分别为:
(1)BASE:表示基础AOF文件,一般由子进程通过重写产生,该文件最多只有一个。
(2)INCR:表示增量AOF文件,一般会在AOFRW开始执行时被创建,该文件可能存在多个。
(3)HISTORY:表示历史AOF文件,它由BASE和INCR AOF整合,每次AOFRW成功完成时,本次 AOFRW 前对应的BASE和INCR AOF 将变为HISTORY,HISTORY类型的AOF会被Redis自动删除。

相关 issue:Redis 的 AOF 方式 #783

6.5 AOF日志记录

关系型数据库如 MySQL,通常都是执行命令之前记录日志,方便故障恢复,而 Redis AOF 持久化机制是在执行完命令之后再记录日志。
在这里插入图片描述
后记录日志的原因:

避免额外的检查开销,AOF记录日志不会对命令进行语法检查;
在命令执行完之后再记录,不会阻塞当前的命令执行。

当然,前面提到过风险:

如果刚执行完命令 Redis 就宕机会导致对应的修改丢失;
可能会阻塞后续其他命令的执行(AOF 记录日志是在 Redis 主线程中进行的)。
6.6 AOF重写

当AOF存储量太大时,Redis能够在后台自动重写AOF产生一个新AOF文件,这个新AOF文件和原本AOF文件所保存的数据库状态一样,但体积更小。

Redis 将AOF重写程序放到子进程里执行,避免大量的写入操作对Redis正常处理命令请求造成影响,这个重写操作是redis通过读取数据库中的键值对来实现的,与Java描述的重写不同。
在这里插入图片描述
AOF文件重写期间,Redis还会维护一个AO 重写缓冲区,该缓冲区会在子进程创建新 AOF文件期间,记录服务器执行的所有写命令。当子进程完成创建新AOF文件的工作之后,服务器会将重写缓冲区中的所有内容追加到新AOF文件末尾,使得新AOF件保存的数据库状态与现有的数据库状态一致。最后,服务器用新AOF文件替换旧的AOF文件,来完成AOF文件重写操作。

开启AOF重写功能,可以调用BGREWRITEAOF命令手动执行,也可设置两个配置项,让程序自动决定触发时机:

auto-aof-rewrite-min-size:如果 AOF 文件大小小于该值,则不会触发 AOF 重写。默认值为 64 MB;
auto-aof-rewrite-percentage:执行 AOF 重写时,当前 AOF 大小(aof_current_size)和上一次重写时 AOF 大小(aof_base_size)的比值。如果当前 AOF 文件大小增加了这个百分比值,将触发 AOF 重写。
                            将此值设置为 0 将禁用自动 AOF 重写。默认值为 100

AOF重写机制优化改进可参考:《从 Redis7.0 发布看 Redis 的过去与未来》

6.7 AOF校验机制

AOF校验机制是Redis在启动时对AOF文件进行检查,判断文件是否完整,是否有损坏或者丢失的数据。通过使用校验和(checksum) 对整个 AOF 文件内容进行 CRC64 算法计算得出的数字来验证 AOF 文件。如果文件内容发生变化,那么校验和也会随之改变。因此,Redis 在启动时会比较计算出的校验和与文件末尾保存的校验和(计算的时候会把最后一行保存校验和的内容忽略),从而判断 AOF 文件是否完整。若发现文件有问题,Redis就会拒绝启动并提供相应的错误信息。AOF校验机制简单有效提高 Redis 数据的可靠性。

类似地,RDB 文件也有类似的校验机制来保证 RDB 文件的正确性。

6.8 如何选择 RDB 和 AOF
1)Redis保存的数据丢失一些也没什么影响的话,可以选择使用RDB。
(2)不建议单独使用AOF,因为随时创建一个RDB快照可以进行数据库备份、更快的重启以及解决AOF引擎错误。
(3)如果保存的数据要求安全性比较高的话,建议同时开启RDB和AOF两种持久化或者开启RDB和AOF混合持久化。

其实,了解计算机底层原理的都知道宕机数据丢失无法避免,要么是以性能很差的方式来解决。没有完美的方案只有均衡的方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1910174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

土壤检测仪器:精确地检测土壤元素

在农业生产的广阔天地中&#xff0c;土壤检测仪器如同一把钥匙&#xff0c;打开了我们认识土壤、了解土壤元素的大门。这些看似平凡却功能强大的设备&#xff0c;能够精确地检测出土壤中的各种元素&#xff0c;为农业生产提供科学、准确的数据支持。 一、土壤检测仪器的重要性 …

大气热力学(5)——绝热过程

本篇文章源自我在 2021 年暑假自学大气物理相关知识时手写的笔记&#xff0c;现转化为电子版本以作存档。相较于手写笔记&#xff0c;电子版的部分内容有补充和修改。笔记内容大部分为公式的推导过程。 文章目录 5.1 气块的概念5.2 热力学第一定律的几种微分形式5.3 干绝热过程…

为什么要进行学术会议投稿?

为什么要进行学术会议投稿&#xff1f; 学术会议投稿有以下几个重要的用途&#xff1a; 学术会议投稿有什么用 1. 学术交流与分享&#xff1a;学术会议是学者们交流和分享最新研究成果、观点和发现的平台。通过投稿并获得口头或海报展示的机会&#xff0c;您可以向其他学者介…

网络祭祀人物微信小程序模板源码

模板介绍 手机端网络祭祀&#xff0c;在线祭祀&#xff0c;创建纪念历史人物小程序前端模板下载。包含&#xff1a;人物列表、详情、创建人物、个人中心等等页面。 图片演示 网络祭祀人物微信小程序模板源码

【Kubernetes安装】从零开始使用kubeadm命令工具部署K8S v1.28.2 集群

文章目录 一、虚拟机配置参数说明二、kubernetes v1.28.2版本介绍三、CentOS 7.9 系统初始化配置3.1 配置CentOS系统基础环境3.1.1 配置hosts3.1.2 永久关闭selinux3.1.3 关闭swap分区3.1.4 所有节点全部关闭防火墙3.1.5 配置ntp server同步时间3.1.6 添加kubernetes镜像源 3.2…

代码随想录算法训练营第二天|【数组】59.螺旋矩阵II

这两天工作的事情有点多&#xff0c;周末又比较懒&#xff0c;所以没有跟上进度。这两天开始补进度。 题目 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&a…

centos7升级到欧拉openeule

centos7升级到欧拉openeule 一、准备工作 1、安装迁移工具&#xff08;安装迁移工具的机器不能给自己升级&#xff0c;请用其他机器作为迁移母机&#xff09; wget https://repo.oepkgs.net/openEuler/rpm/openEuler-20.03-LTS-SP1/contrib/x2openEuler/x86_64/Packages/x2…

使用webrtc-streamer查看rtsp实时视频

1.下载webrtc-streamer 2.解压运行webrtc-streamer.exe 在浏览器访问127.0.0.1:8000&#xff0c;点击窗口可以看到本机上各窗口实时状态&#xff0c;点击摄像头可以显示摄像头画面。 5.安装phpstudy&#xff0c;并建立网站。&#xff08;具体过程自己网上搜&#xff09; 6.打开…

护眼灯什么价位的好?好用又实惠的护眼灯推荐

护眼灯&#xff0c;简单来说就是保护视力的台灯&#xff0c;专业的护眼台灯的光线与自然光光线相似&#xff0c;有亮度稳定、不闪烁&#xff0c;发光面积大等这些特点。那么&#xff0c;护眼灯什么价位的好&#xff1f;市面上所出现的护眼台灯良莠不齐&#xff0c;价格低的质量…

NB!小哥竟然绕过了安全启动,Dump了SoC的BootROM。

原文&#xff1a;Amlogic S905 SoC: bypassing the (not so) Secure Boot to dump the BootROM译者&#xff1a;TrustZone 推荐语&#xff1a; 这是一篇关于如何绕过安全启动&#xff0c;然后实现破解BootRom的文章。通过这篇文章&#xff0c;可以让你对于ATF、安全启动等有个…

什么牌子的灯具性价比高?五款必入的灯具品牌推荐

随着科技的发展&#xff0c;生活质量水平的不断提升&#xff0c;大家对于生活的要求也在不断拔高。护眼台灯进入众多家庭里面&#xff0c;成为不可或缺的产品。然而&#xff0c;灯具在市面上&#xff0c;种类颇多&#xff0c;其质量也是参差不齐。那么&#xff0c;我们该如何选…

4.1 操作系统

大纲 进程管理重点&#xff0c;占本章历年考试一半分数&#xff0c; 前趋图、信号量和PV操作、死锁和银行家算法 出计算题 作业管理历年考试从来没有考过 操作系统概述 进程管理 进程的组成和状态 前趋图 进程资源图 真题 1

【概念介绍】Signed Distance Function(SDF)

三维空间的表示形式可以分为显式和隐式 显式&#xff1a; 体素Voxel&#xff0c;点云Point Cloud&#xff0c;三角面片Mesh隐式&#xff1a;符号距离函数Signed Distance Funciton(SDF)&#xff0c;占用场Occupancy Field&#xff0c;神经辐射场Neural Radiance Field&#xff…

简单仿写MVC

代码地址&#xff08;需要自取&#xff09;&#xff1a;mvc_Imitation: 简单仿写实现MVC (gitee.com) 项目目录 先把架子搭好 Controller注解 Documented Retention(RetentionPolicy.RUNTIME) Target(ElementType.TYPE) public interface Controller { }RequestMapping Do…

走拼箱货必看海运拼箱的实用技巧

在国际海运运输中&#xff0c;海运拼箱适用于货物数量较少或体积不足以填满整个集装箱的情况。 海运拼箱货物通常由物流公司或货代进行组织和管理。多个货主的货物通过合理拼装&#xff0c;使集装箱空间得到充分利用。 那么&#xff0c;在海运拼箱和整柜有哪些不同&#xff0c…

Xilinx Vitis 2020工程源目录修改

目录 1 背景2 分析3 解决4 使用4.1 修改路径4.2 编译工程4.2.1 清理工程4.2.2 编译工程 1 背景 Xilinx Vitis可以做standalone程序开发,不过其工程中使用的路径为绝对路径。工程更换位置后编译将会显示错误。例如&#xff1a;源目录为D:/work,复制到同事电脑上放到C:/work(同事…

无忧易售:在线刊登配备定价计算器,精准定价赢市场

在跨境电商的浩瀚宇宙里&#xff0c;精准定价是通往成功的关键钥匙。无忧易售&#xff0c;推出在线产品刊登页面的定价计算器功能&#xff0c;精准定价&#xff0c;轻松管理&#xff0c;让您的商品在海外市场脱颖而出&#xff0c;让每一次销售都精准高效&#xff0c;利润满满。…

Selenium 的基本操作你知道哪些?

1. 前言 今天的推文&#xff0c;我们就来说说看&#xff0c;怎么实现模拟真人去打开微信读书网站。 2.需求分析和准备 整体的需求大致可以分为以下步骤&#xff1a; 打开chrome浏览器 打开百度网页 搜索“微信读书” 点击进入“微信读书”官网 搜索关键词“长安的荔枝” 点…

浮动刹车盘和固定刹车盘有什么区别

在讨论刹车系统的设计理念时&#xff0c;浮动和固定刹车盘无疑是两个重要的分支。 它们各自拥有独特的设计哲学、工作原理以及适用场景&#xff0c;这些差异直接影响到了制动系统的性能和耐久性。 浮动刹车盘和固定刹车盘在设计和工作原理上有显著的区别&#xff0c;主要体现在…

现代化木工装备建设新颖校园木工创客室

校园木工创客室是一个集木工制作、创意设计、科技融合与教育实践于一体的多功能空间。它为学生提供了一个动手实践、创新创造的平台&#xff0c;旨在培养学生的动手能力、创新思维、解决问题的能力以及团队协作能力。 木工创客室的设备选择应综合考虑需求、预算、品牌、质量、安…