【Redis】初探Redis
前言
很早之前写的文章,最近考虑到面试可能涉及到Redis,所以拿出来再看一遍
Redis概述
Redis是啥?
Redis是Remote Dicitionary Server的缩写,翻译过来就叫做远程字典服务
是开源的、使用C完成的、支持网路、基于内存、可持久化的日志型,键值对的数据库(NoSql)
目前github上有开源的由go语言实现的goredis
Redis的作用
- 内存存储、持久化(内存中的数据如果遇到断电就直接丢失了,所以持久化很重要)
- 效率高,可以用于高速缓存
- 发布订阅系统、地图信息分析…
特性
- 多样的数据类型
- 持久化
- 集群
- 事务
Redis安装
我直接在vps上用docker安装的
用的docker-compose
yml文件如下:
version: '2.1'
services:
redis:
image: redis
hostname: redis
container_name: wbs_redis
environment:
TZ: Asia/Shanghai
command: ["redis-server","/etc/redis/redis.conf"]
restart: always
ports:
- 6379:6379
volumes:
- "/root/workspace/docker/redis/data:/data"
- "/root/workspace/docker/redis/redis.conf:/etc/redis/redis.conf"
- "/root/workspace/docker/redis/logs:/logs"
这样启动直接开启了redis-server,并且绑定了配置文件redis.conf
这里的配置文件时在github上下载对应版本的,我这里把bind 127.0.0.1
给注释掉了,因为如果不注释掉就只能本地连接redis
redis的默认端口是6379
,docker文件需要配置一个端口映射
Redis配置文件
在上面docker-compose.yml中的启动cmd中,可以看到指定了redis.conf
,同时挂载volumes也映射了redis.conf
到/etc/redis/redis.conf
那么这个redis.conf
到底是何方神圣?
在搞清楚这个之前,我先提议嘴,使用docker拉取的Redis是不会有指定的配置文件的,需要去github上下载相对应版本的redis.conf
下面就对主要的Redis配置文件进行分析!
基础配置
首先找到有关bind
的配置,将其注释掉,bind 127.0.0.1
表示只能从本地访问redis,这让我从远程访问还有啥意义嘞
redis可以包含其他的配置文件,也就是说配置文件可以分开写,你这个文件写一部分,我再写一部分,我从这个主的redis.conf
中include
你这个文件就可以了
redis默认开启保护模式,咱们就不要动他了,让他保护就完事了
redis的默认端口是6379
,需要修改可以在配置文件里修改
redis默认是关闭守护进程的,也就是退出直接关闭redis了,但是我是在docker里运行的,docker一直运行redis与我宿主机没啥太大关系,所以我这里也就没动了,如果是本地安装的,建议改成yes,使用守护进程
开启
redis默认的日志级别是notice
,可以自己配置
redis日志输出的文件名,未空就直接在控制台输出
redis默认有16
个数据库,可以自行修改
快照配置
快照配置涉及到了持久化,有关rdb aof的内容会在后面的章节中进行补充
save命令在注释中说的很清楚了,在1小时如果出现了一次修改就会进行快照,5分钟出现了100次修改也会进行快照,1分钟进行10000次修改同样会进行快照
持久化如果出错,redis是否还继续工作,默认式开启的
是否压缩rdb文件,会消耗cpu资源文件,默认开启
校验rdb文件,默认开启
安全配置
redis默认没有密码,使用requirepass
设置密码,进入redis-server
使用auth 密码
进行权限校验
客户端配置
maxclients
设置最大的客户端连接数量
内存配置
配置redis的最大内存使用maxmemory
内存策略,默认是maxmemory-policy noeviction
线程配置
redis在6.0版本之前是单线程的,在6.0开始是多线程的(默认关闭),使用io-threads-do-reads
来设置是否开启多线程,用io-threads
来设置线程数量
AOF配置
默认aof是不打开的
appendfsync always
表示每次修改都会进行同步,非常消耗性能
appendfsync everysec
表示每秒执行一次同步,可能会丢失这1s的数据
appendfsync no
表示不执行铜鼓吧,这个时候操作系统会自动同步数据,这样速度最快
默认是everysec
Redis常用命令
redis默认使用第一个数据库,使用select x
切换数据库,x表示某个数据库(select 3,表示使用3号数据库)
keys *
可以查看数据库中所有的键
flushdb
清空当前数据库
flushall
清空所有数据库
set key value
表示将key这个键设置值为value
get key
表示获取key这个键对应的值
del key
表示删除key,可以携带多个key
exists key
表示判断当前的key是否存在,存在返回1,不存在返回0
move key 1
表示将key这个键值对移动到1号数据库中
expire key 10
表示将key这个键设置10秒后失效
tll key
表示查看key这个键的即将失效的时间,单位是秒
五大基础数据类型
String
大部分使用场景都是操作string,例如最常用的set,set了一个string之后,可以使用append
进行字符串的追加
如果append
的键是不存在的,那么会创建一个新的键
同理,字符串还有查看长度的strlen
,还有字符串截取getrange
有了get
当然有对应的setrange offset val
还有一个比较关键的自增操作incr
同理还有自减操作decr
那么如果想控制增长量和减少量呢,使用incrby key 10
或decrby key 10
就可以设置自增和自减的值了
下面是setex
和setnx
,setex
表示存在某个键就设置,setnx
表示不存在这个键才设置
setex key seconds value
需要设置一个seconds来表示过期时间
setnx key value
只能设置不存在的键
然后是设置多个keys,使用mset
指令
mset k1 v1 k2 v2 k3 v3
同理可以使用mget
去请求多个键
List
可以在redis中,将list想象成一个双向队列
使用lpush
可以向队头存放数据,再使用lrange list start end
来指定查看键为list的数据的范围,这里的0 -1
表示查看所有
同理,使用rpush
向队列尾存放数据
push相对应的就是pop了,使用lpop
和rpop
可以将队头或队尾的元素弹出
使用lindex key index
获取列表中对应的下标,下标从0
开始
使用llen
查看列表的长度
使用lrem key count val
来删除指定count个数的val精确值,我这里是删除了两个“2”字符串
使用ltrim key start end
来进行截取列表中指定下标的元素,这里截取1坐标到2坐标,只剩下b和c了
使用rpoplpush source target
将source列表中的队尾弹出,将这个元素插入到target列表的队头
使用lset key index value
将列表中index的值更新为value,如果这个key不存在就会报错!
使用linsert key before/after val newval
来插入一元素,在某个元素之前或者之后,如果key不存在或者val不存在,就插入无效
Set
无法重复的list
sadd key value
插入键值对
smembers key
查看所有的值
sismember key value
查看这个value是否存在于key中
scard key
查看键所具有的值的长度
srem key value
删除键值
srandmember key [x]
随机抽选出x个元素
spop key
弹出集合第一个元素
smove key1 key2 value
将key1中的value移动到key2中
sdiff key1 key1
以key1为参照物,比较key1与key2的不同元素(差集)
sinter key1 key2
以key1为参照物,获得key1与key2的交集
sunion key1 key2
获得key1与key2的并集
HashMap
在Redis这个键值对数据库里存放K-V是否有点…
hset key field value
设置一个键,它的值是一个键值对
hget key field
获取一个键中的键值对
hgetall key
获取所有的键值对
hdel key field
删除某个键,这个键所指向的键值对被删除
hlen key
获取某个key的长度
hexists key field
判断key中的某个字段是否存在
hkeys key
获取key中的所有键
hvals key
获取key中的所有值
hincrby key field count
给某个属性自增count
hsetnx key field value
如果不存在则可以设置
Zset
在set的基础上,可以进行排序,即有序集合
zadd key score member
添加一个值,价值为score
zrange key start end
查看start到end范围内的所有值,0 -1 可以列出所有,默认是sroce从小到大的排序方式
zrangebyscore key (startscore (endscroe [withscores]
通过score排序,查看startscore到endscore区间内的值,左括号表示闭区间,不带左括号就是开区间,inf表示无限,withscores是一个可选参数,表示携带score输出
与顺序排序相反的就是逆序,使用zrevrange
就可以了
zrem key value
删除zset中的某个元素
zcard key
查看key集合的长度
zcount key (min (max
查看在min和max区间内的集合长度,括号表示闭区间
三大特殊数据类型
Geospatial
和地理位置挂钩的一个数据类型,底层是基于zset的,可以用zset的指令来操作相关的key
geoadd key 经度 纬度 地点名称
添加一个地点的经纬度
geopos key 地点
获取某个地点的经纬度
geodist key 地点1 地点2 单位
查看两个地点置间的直线距离,单位如下
-
m 米
-
km 千米
-
mi 英里
-
ft 英尺
georadius key 经度 纬度 半径 单位 [withdist / withcoord] [count x]
以某个经纬度为圆心,找在半径面积区域内圆的地点,withdist 表示显示直线距离,withcoord表示显示经纬度, count x表示限制限制x个地点
georadiusbymember key 地点 半径 单位
以某个地点为圆心,在半径面积区域内找点
Hyperloglog
Redis中的Hyperloglog是基于统计基数的算法实现的
Hyperloglog相对于set的优势是:在对很大的数据量进行计数处理的时候,内存占用小,缺点是:因为hyperloglog是近似计算,在数据量小的时候,误差会较大
Hyperloglog可以用于文章阅读量或者网站点击量等情景,目的是计数而非统计每个用户的ip,只注重最后的点击量或浏览量
Hyperloglog占用的内存固定,2^64的不同元素只需要12kb的内存就可以存储
Hyperloglog的错误率为0.81%
pfadd key values
给key添加元素,可以多个
pfmerge newkey key1 key2
将key1和key2进行set去重后合并到newkey中
pfcount key
查询key中的数量(不重复的元素)
Bigmaps
对位进行操作,可以非常的节约空间,但是一般用来处理0、1这种两种情况的变量
例如,如果需要统计一年365天打卡的次数,可以创建一个365位的空间,每一天对一位进行赋值处理,如果打卡了就给那一天的那一位置位1,反之就是0
setbit key 第几位 0或1
给key的某一位置为0/1
getbit key 第几位
获取key的某一位
bitcount key [start end]
获取start位到end位上1的数量
Redis事务
Redis中的单条命令是保证原子性的,但是Redis中的事务是不保证原子性的
Redis事务没有隔离(独立)的概念,所有的命令只有在最后发起执行的时候才会统一进行执行
Redis中事务的本质就是一组命令的集合,一个事务中的命令都会被序列化,在事务执行的过程中严格按照顺序执行
- 开启事务(multi)
- 命令入队(…)
- 执行事务(exec) / 放弃事务(discard)
编译时
异常:(使用了错误的语法,所有的命令都不会被执行)
运行时
异常:(运行中,对某些操作进行了错误的使用,例如自增一个字符串,这样的异常不会导致事务的失败,仅仅是那一句话的失败,所以说redis的事务不保证原子性)
Redis监视模式
先引入两个锁的概念:
- 悲观锁:
- 认为什么时候都会出问题,无论做什么都会加锁(非常影响性能)
- 乐观锁:
- 认为什么时候都不会出现问题,所以不会上锁。更新数据的时候判断一下,在此期间是否有人修改过这个数据
正常情况下的操作,使用watch money
没有发现money有其他的变动,可以正常执行事务中的命令,让money 减少60, out增加60
异常情况下的操作(多个IO对Redis进行同时处理),注意看下图的时间,首先设置原始的money 为 100, out为0
,再在右边给watch money
对其监视,开启事务,输入两条指令但不输入exec
,这个时候到左边输入set money 1000
,这样就对money这个键进行了更改,再去右边exec
事务,发现返回nil
,其实就是事务执行失败,原因就是添加了watch
,相当于给money增加了一个乐观锁
unwatch
命令可以将所有的watch
给取消
Java操作Redis
在springboot2之前,springboot内置的操作redis的工具类是jedis
在springboot2之后,springboot内置的操作redis的工具类是lettuce
两者的区别就是
jedis
使用的是BIO
,在高并发的情况下即使有线程池也可能造成崩溃的情况lettuce
使用的是NIO
,可以让单个线程去轮流干事情,更高效
这里就不提使用最基础的jedis,直接说说springboot整合redis吧
首先pom.xml
文件导入依赖
<!--redis数据库模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
阅读源码可以发现,springboot中的redis默认使用jdk的序列化操作,这样会让在redis-server中看到的key和value存在编码问题,无法直接显示字符
解决的方式就是咱们自己使用@Configuration
注解写一个RedisTemplate
的Bean来使用StringRedisSerializer
序列化
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String,Object> template = new RedisTemplate<>();
//关联
template.setConnectionFactory(factory);
//设置key的序列化器
template.setKeySerializer(new StringRedisSerializer());
//设置value的序列化器
template.setValueSerializer(new StringRedisSerializer());
return template;
}
}
同时,为了更好的使用redis的操作,在网上搜索而来使用非常广的RedisUtils工具类
/**
* Java版本RedisUtil
*/
@Component
public class RedisUtils {
@Resource
private RedisTemplate<String,Object> redisTemplate;
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
* @return boolean
*/
private boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
*
* @param key 键
* @param delta 要增加几(大于0)
* @return long
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
*
* @param key 键
* @param delta 要减少几(小于0)
* @return long
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
private boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
* @return double
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
* @return
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
/**
* 根据key获取Set中的所有值
*
* @param key 键
* @return Set
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
* @return long
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
* @return long
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return Object
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return boolean
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return boolean
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return boolean
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return boolean
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return boolean
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
return redisTemplate.opsForList().remove(key, count, value);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public void expire(int i, int i1, int i2) {
}
}
这样使用springboot开发有关redis读写的操作就非常简单了
最后是springboot的application的配置文件,我这里是参考了网络上配置lettuce的一些操作,如果不需要就直接使用port、host、password进行连接就行了
spring:
redis:
port: your port
host: your ip
password: your passwd
lettuce:
pool:
min-idle: 0
max-wait: 5s
max-active: 10
max-idle: 10
time-between-eviction-runs: 1s
Redis持久化
Redis属于内存型数据库,一旦服务器断电,如何保证内存中的数据不会损失呢?这就涉及到了Redis的持久化
了!
Redis的持久化有两种模式
- RDB
- AOF
下面分别来对两种模式进行简单分析
RDB(Redis Database)
在之前的redis.conf
中,有对于rdb的设置,包括什么情况进行rdb文件的生成,还有rdb文件的名称等等
网上大部分都说默认的redis的save策略是
save 900 1 -- 900秒内至少有一个改动(就触发
save 300 10 -- 300秒内至少有10个改动
save 60 10000 -- 60秒内至少有10000个改动
但是我下载的最新版本7.0.3的redis.conf其中的配置都被注释了,我也不懂是不是新版本与旧版本配置文件不同的原因
# save 3600 1 300 100 60 10000
默认写的文件配置是,也就是当前redis-server目录下保存的rdb文件的名称
dbfilename dump.rdb
Redis使用了linux系统的CopyOnWrite
技术(写时复制),在生成快照文件的时候,仍然可以接收命令来处理数据。
简单来说就是,在开始进行持久化的时候,会从主线程fork出一个子线程,同时这个子线程会共享主线程的所有内存数据,同时我们也知道redis是基于内存的数据库,共享内存数据就等于共享了数据库中的所有的信息,这样这个子线程从主线程读取内存数据,将内存数据写入到dump.rdb
中。
此时,如果主线程处理的命令都是read
,那么子线程不会受影响
如果,主线程是处理了write
,那么会对该命令操作的数据copy
一份,生成副本,子线程会把这个副本写入到dump.rdb
中,这个过程中,主线程仍然可以执行命令
那么如何出发redis的rdb持久化?
- 触发了配置文件中save的情况
- 执行flushall或save命令
- 退出redis,使用shutdown会自动触发save
如何导入rdb文件?
- 直接将redis-server所在的目录下导入
dump.rdb
,redis开启会自动读取
优点:
- dump.rdb是二进制文件,能快速导入
缺点:
- rdb每次持久化需要将所有内存数据写入文件,并替换源文件,当内存数据率很大,频繁生产快照非常消耗性能
- save策略如果设置的时间间隔较大,在没备份时候宕机会导致数据丢失
弄了张网图过来,逻辑一看就懂:
AOF(Append Only File)
redis默认不开启AOF,需要在配置项appendonly
改成yes
AOF是redis提供的另一种数据持久化方式,它会记录客户端对redis服务端的每一次write
操作,并将这些写操作以redis协议追加保存到后缀为aof的文件末尾。在redis服务器重启时,会读取并加载aof文件,达到恢复数据的目的。
也就是对比rbd文件来说,rdb文件是一个二进制文件,而aof是一个记录文件,类似history一样,是可以看见命令的
如果我们人为在aof文件中写入了redis不可识别的命令,可以使用redis-check-aof
文件来进行修复,但是可能会丢失一部分数据
redis-check-aof --fix appendonly.aof
同时可以在redis.conf
中设置aof文件名和文件目录
appendfilename "appendonly.aof"
appenddirname "appendonlydir"
接下来就是aof的三种写入策略
,默认使用的是everysec
# appendfsync always
appendfsync everysec
# appendfsync no
-
always
客户端对每次的redis写入都会被记录到aof文件中,这种操作非常影响redis的性能,相当于每一次
write
的操作都进行了一次磁盘的io
,好处就是可以记录所有的写数据,基本不会造成数据的丢失,但是代价太大了 -
everysec
每秒刷新一次buf中的数据到aof中,是aof的默认策略,理论上这种方式最多丢失1s的数据
-
no
redis服务器不会将数据写入到aof文件中,而是交给操作系统来判断什么时候写入,这种方式是最快的一种策略,但是丢失数据的可能性最大
aof既然是通过append(追加)的方式来存储redis的写记录的,如果对同一个key进行多次的write
操作,会产生大量的对同一个key的操作记录,这就会使aof文件非常大,为了避免这个问题,配置文件中可以通过设置auto-aof-rewrite-percentage
和auto-aof-rewrite-min-size
,默认参数如下
auto-aof-rewrite-percentage 100
表示当aof文件大小到达原先文件(上次启动redis时候记录的aof文件大小)的两倍,就会进行文件重写
auto-aof-rewrite-min-size 64mb
:aof文件低于64mb不会被重写
这两个指标同时满足,就会触发aof的重写机制,我们也可以通过命令bgrewriteaof
手动触发重写。
重写的原理就是重写最新的键,之前设置的键都不记录了,当作垃圾数据。同时重写是主线程fork出一条子线程来讲文件进行重写,遍历这个子线程中的共享的内存数据去将内存转为操作指令,再序列化成一个新的aof文件。
注意:重写的aof文件不会去读取旧的aof文件,而是将当前redis的内存序列化成指令而已,类似于快照
但是考虑到一个问题,就是重写的时候,如果主线程被执行了write
的操作,这样新生成的指令如何被写入新的aof中呢?
所以为了保证主线程与子线程的一致性,使用了两个buf
完成
AOF文件的重写流程如下:
(1)bgrewriteaof触发重写,判断是否存在bgsave或者bgrewriteaof正在执行,存在则等待其执行结束再执行;
(2)主进程fork子进程,防止主进程阻塞无法提供服务;
(3)子进程遍历Redis内存快照中数据写入临时AOF文件,同时会将新的写指令写入aof_buf和aof_rewrite_buf两个重写缓冲区,前者是为了写回旧的AOF文件,后者是为了后续刷新到临时AOF文件中,防止快照内存遍历时新的写入操作丢失;
(4)子进程结束临时AOF文件写入后,通知主进程;
(5)主进程会将上面的aof_rewirte_buf缓冲区中的数据写入到子进程生成的临时AOF文件中;
(6)主进程使用临时AOF文件替换旧AOF文件,完成整个重写过程。
Redis发布订阅
redis的发布订阅是一种消息通信模式
- 发布者发送消息(pub)
- 订阅者接收消息(sub)
- 消息通道(channel)
虽然redis可以做这种发布订阅,但是主流市场根本没人用(汗
基本上都是使用消息队列的框架,Kafka、rabbitmq之类的
有关redis发布订阅的相关操作语句直接去菜鸟教程
中看就行了,我估计也不咋用,这里就不写了
Redis主从复制
TODO