文章目录
- 一、NoSQL的基本介绍
- 二、为什么要使用NoSQL,难道SQL不够你用吗?
- 三、Redis的基本概念
- 四、Redis基本操作命令
- 五、Redis五大数据类型及其操作命令
- 六、三种特殊的数据类型及其操作命令
- 七、 Redis事务
- 八、Redis对key的监控
- 九、Redis数据库密码
- 十、Jedis = 使用Java来操作Redis
- 十一、SpringBoot整合Redis
- 十二、Redis在SpringBoot中的操作函数大全
- 十三、自定义Redis工具类
- 十四、Redis实现持久化
- 十五、Redis发布与订阅
- 十六、缓存雪崩、缓存穿透、缓存击穿
一、NoSQL的基本介绍
四大 非关系型数据库NoSQL 与 关系型数据库SQL 的对比
目前对于非关系型数据库主要有四种数据存储类型:键值对存储(key-value),文档存储(document store),基于列的数据库(column-oriented),还有就是图形数据库(graph database)。每一种都会解决相应的问题,这些问题是关系型数据库所不能解决的。而在实际应用中都会将这几种情况结合起来实现相应的功能。
关系型数据库
确保每一条数据都只被存储一次。标准化是其结构设置的规范。例如:如果你想存储一个人的信息和这个人的爱好这样的数据,你可以创建两个表:一个用来存储这个人的信息,另一个表用来存储这个人的爱好。正如下图中看到的,你必须有一张额外的映射表,这张表将人的信息表和爱好表建立其对应的关系。这是因为他们的关系是多对多的关系,一个人可以有多个爱好,并且多个人可能会有相同的爱好。
一个完整的关系型数据库会由很多的实体表和关系映射表构成,现在你已经有了和NoSQL数据库进行比较的东西了,下面让我们看看这些不同的存储类型。
基于列的数据库
基于列的数据库出现的原因
实操中我们会发现,行式数据库在读取数据的时候,会存在一个固有的“缺陷”比如,所选择查询的目标即使只涉及少数几项属性,但由于这些目标数据埋藏在各行数据单元中,而行单元往往又特别大,应用程序必须读取每一条完整的行记录,从而使得读取效率大大降低**,对此,行式数据库给出的优化方案是加“索引”,在OLTP类型的应用中,通过索引机制或给表分区等手段,可以简化查询操作步骤,并提升查询效率。
但针对海量数据背景的OLAP应用(例如分布式数据库、数据仓库等等),行式存储的数据库就有些“力不从心”了,行式数据库建立索引和物化视图,需要花费大量时间和资源,因此还是得不偿失,无法从根本上解决查询性能和维护成本等问题,也不适用于数据仓库等应用场景,所以后来出现了基于列式存储的数据库。
对于数据仓库和分布式数据库来说,大部分情况下它会从各个数据源汇总数据然后进行分析和反馈,其操作大多是围绕同一列属性的数据进行的,而当查询某属性的数据记录时,列式数据库只需返回与列属性相关的值,在大数据量查询场景中,列式数据库可在内存中高效组装各列的值,最终形成关系记录集,因此可以显著减少IO消耗,并降低查询响应时间,非常适合数据仓库和分布式的应用。
基于列的数据库的概念模型
基于列的数据库会将每一列分开单独存放,当查找一个数量较小的列的时候其查找速度是很快的。
这种设计看起来很像基于行的数据库在每一列上都加了索引一样。数据库索引这种数据结构以牺牲存储空间和更多的写文件(索引更新)为代价使查找速度得到提升。索引是将行号映射到数据上,而基于列数据库是将数据映射到行号上面,这样的方式用于计算是很简单的。例如上面的例子,查找有多少人的爱好包含archery(箭术)是很容易计算出来的。除此之外将每一列单独存放可以优化压缩因为每张表中只存一类数据。
应用场景
基于行的数据库
- 适合随机的CRUD操作
- 需要在行中选取所有属性的查询操作
- 需要频繁插入或更新,其操作与索引和行的大小更为相关
基于列的数据库
- 在数据量巨大的数据表中,大部分操作都是查询操作
键值对存储
键值对中存储的数据的类型是不受限制的,可以是一个字符串,也可以是一个数字,甚至是由一系列的键值对封装成的对象等。下图向我们展示了一个比较复杂的键值对结构。使用价值对存储的数据库有Redis,Voldemort,Riak,Amazon’s Dynamo。
文档存储
文档存储是基于键值对存储的,其结构较之于键值对存储更为复杂,可以说在键值对的基础上更深入了一步。文档存储是假定一个特定文档的结构可以使用一种特定的模式来说明,它的出现较之于其他的NoSQL数据库类型来说是最自然的,因为设计这种方式的最初的目的就是用来存储日常文档的,并且这种方式支持对于那些通常已经聚合的数据进行复杂的查询和计算。使用关系型数据库存储数据的方式在标准化的角度看是很有意义的:每条数据只被存储一次并且通过外键来进行联系。文档存储不会去关心那些所谓的标准化,只要数据在该结构下是有意义的就可以。所以说关系型数据库不能很好的适应特定企业的案例,只能用来做那些比较通用的案例。
例如:报纸和杂志包含有文章,如果想在关系型数据库中存储这些文章,首先你需要将这些文章给拆分开来,文章的内容在一个表中,文章的作者以及关于作者的信息要存在另一张表中,对于发布在网络上的文章的评论也需要额外的一张表来存储。正如图七所展示的那样,报纸上的一篇文章可以被存储为一个实例,这样在处理那些总是被查看的数据时可以减少查找的时间。使用文档存储的NoSQL数据库包含MongoDB和CouchDB。
图形数据库
现在剩下的是最后一个NoSQL数据库存储类型,也是最复杂的一个,主要使用一种高效的方式来存储各个实体之间的关系。当数据之间是紧密联系的,例如社会关系、科学论文的引文抑或是资本资产定价模型等等,使用图形数据库时最好的选择。图形或者网络数据有两部分组成:
Node-:实体本身,在一个社会关系中可以认为是一个人。
Edge-:实体之间的关系。这个关系可以用一条线来表示,这条线有它自己的属性。这条线可以有方向,箭头可以表明谁是谁的上级。
如果给予足够的关系和实体类型,图形会变得非常的复杂,其复杂程度简直难以置信。图八已经展示了仅有有限几个实体的复杂图形。像Neo4j图形数据库声称支持ACID,然而文档存储数据库和键值对数据库坚持BASE。
二、为什么要使用NoSQL,难道SQL不够你用吗?
有兴趣的话,可以去了解一下NoSQL的发展过程,这里就总结一句话,传统的SQL无法应付现在爆炸式增长的数据,而NoSQL能够很好地处理这个问题。
上面提到了以键值对存储的非关系型数据库主要有Redis
,我们接下来就要好好学习Redis
三、Redis的基本概念
1. Redis是什么
Redis:作为Key-Value数据库,为了保证效率,数据都是缓存在内存中的,并且Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,同时也会实现master-slave主从同步
2. 安装Redis(Windows)
安装地址:https://github.com/dmajkic/redis
解压安装包:
开启redis-server.exe
启动redis-cli.exe测试
3. Redis有16个数据库
Redis一共有16个数据库(DB 0~ DB 15),默认使用DB 0,可以使用select n
切换到DB n,可以通过dbsize
查看当前数据库的大小,与key
数量有关。并且不同数据库之前的数据不能相互访问。
127.0.0.1:6379> config get databases # 命令行查看数据库数量databases
1) "databases"
2) "16"
127.0.0.1:6379> select 8 # 切换数据库 DB 8
OK
127.0.0.1:6379[8]> dbsize # 查看数据库大小
(integer) 0
# 不同数据库之间 数据是不能互通的,并且dbsize 是根据库中key的个数。
127.0.0.1:6379> set name sakura
OK
127.0.0.1:6379> SELECT 8
OK
127.0.0.1:6379[8]> get name # db8中并不能获取db0中的键值对。
(nil)
127.0.0.1:6379[8]> DBSIZE
(integer) 0
127.0.0.1:6379[8]> SELECT 0
OK
127.0.0.1:6379> keys *
1) "counter:__rand_int__"
2) "mylist"
3) "name"
4) "key:__rand_int__"
5) "myset:__rand_int__"
127.0.0.1:6379> DBSIZE # size和key个数相关
(integer) 5
4. 单线程Redis也能高速
Redis是单线程的,但是为什么单线程还这么快呢?其实这里我们存在严重误区:多线程一定比单线程效率高
其实并不是这样的,Redis是将所有的数据放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个CPU上的,在内存存储数据情况下,单线程就是最佳的方案。
四、Redis基本操作命令
exists key
del key
move key db
expire key second
type key
127.0.0.1:6379> keys * # 查看当前数据库所有key
(empty list or set)
127.0.0.1:6379> set name qinjiang # 设置指定 key 的值
OK
127.0.0.1:6379> set age 20
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> move age 1 # 将键值对移动到指定数据库move key db
(integer) 1
127.0.0.1:6379> EXISTS age # 判断键是否存在exists key
(integer) 0 # 不存在
127.0.0.1:6379> EXISTS name
(integer) 1 # 存在
127.0.0.1:6379> SELECT 1
OK
127.0.0.1:6379[1]> keys *
1) "age"
127.0.0.1:6379[1]> del age # 删除键值对del key
(integer) 1 # 删除个数
127.0.0.1:6379> set age 20
OK
127.0.0.1:6379> EXPIRE age 15 # 设置键值对的过期时间expire key second
(integer) 1 # 设置成功 开始计数
127.0.0.1:6379> ttl age # 查看key的过期剩余时间
(integer) 13
127.0.0.1:6379> ttl age
(integer) 11
127.0.0.1:6379> ttl age
(integer) 9
127.0.0.1:6379> ttl age
(integer) -2 # -2 表示key过期,-1表示key未设置过期时间
127.0.0.1:6379> get age # 过期的key 会被自动delete
(nil)
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> type name # 查看value的数据类型type key
string
TTL
:Redis的key,通过TTL命令返回key的过期时间,一般来说有3种:
- 当前key没有设置过期时间,所以会返回-1.
- 当前key有设置过期时间,而且key已经过期,所以会返回-2.
- 当前key有设置过期时间,且key还没有过期,故会返回key的正常剩余时间.
RENAME key newKey
:修改key的名称
RENAMENX Key newKey
:仅当 newkey 不存在时,将 key 改名为 newkey
五、Redis五大数据类型及其操作命令
Redis支持字符串、哈希表、列表、集合、有序集合、位图、hyperloglogs等数据类型
String数据类型
# APPEND key value 向指定的key的value后追加字符串
127.0.0.1:6379> set msg hello
OK
127.0.0.1:6379> append msg " world"
(integer) 11
127.0.0.1:6379> get msg “hello world”
#DECR/INCR key 将指定key的value数值进行+1/-1(仅对于数字)
127.0.0.1:6379> set age 20
OK
127.0.0.1:6379> incr age
(integer) 21
127.0.0.1:6379> decr age
(integer) 20
# set key/ get key
127.0.0.1:6379> SET msg "hello world"
OK
127.0.0.1:6379> GET msg
hello world
127.0.0.1:6379> SET number 10086
OK
127.0.0.1:6379> GET number
10086
# INCRBY/DECRBY key n 按指定的步长对数值进行加减
127.0.0.1:6379> INCRBY age 5
(integer) 25
127.0.0.1:6379> DECRBY age 10
(integer) 15
# INCRBYFLOAT key n 为数值加上浮点型数值
127.0.0.1:6379> INCRBYFLOAT age 5.2
“20.2”
# STRLEN key 获取key保存值的字符串长度
127.0.0.1:6379> get msg “hello world”
127.0.0.1:6379> STRLEN msg
(integer) 11
# GETRANGE key start end 按起止位置获取字符串(闭区间,起止位置都取)
127.0.0.1:6379> get msg “hello world”
127.0.0.1:6379> GETRANGE msg 3 9
“lo worl”
# SETRANGE key offset value 用指定的value 替换key中 offset开始的值
127.0.0.1:6379> SETRANGE msg 2 hello
(integer) 7
127.0.0.1:6379> get msg “tehello”
# GETSET key value 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
127.0.0.1:6379> GETSET msg test
“hello world”
# SETNX key value 仅当key不存在时进行set
127.0.0.1:6379> SETNX msg test
(integer) 0
127.0.0.1:6379> SETNX name sakura
(integer) 1
# SETEX key seconds value set 键值对并设置过期时间
127.0.0.1:6379> setex name 10 root
OK 127.0.0.1:6379> get name
(nil)
# MSET key1 value1 [key2 value2..] 批量set键值对
127.0.0.1:6379> MSET k1 v1 k2 v2 k3 v3
OK
# MSETNX key1 value1 [key2 value2..] 批量设置键值对,仅当参数中所有的key都不存在时执行
127.0.0.1:6379> MSETNX k1 v1 k4 v4
(integer) 0
# MGET key1 [key2..] 批量获取多个key保存的值
127.0.0.1:6379> MGET k1 k2 k3
1) “v1” 2) “v2” 3) “v3”
List列表
Redis列表是简单的字符串列表,你可以将元素插入列表左边,也可以插入列表右边,所以我们可以经过规则定义将其变为队列、栈、双端队列等。
另外,一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
---------------------------LPUSH---RPUSH---LRANGE--------------------------------
127.0.0.1:6379> LPUSH mylist k1 # LPUSH mylist=>{1}
(integer) 1
127.0.0.1:6379> LPUSH mylist k2 # LPUSH mylist=>{2,1}
(integer) 2
127.0.0.1:6379> RPUSH mylist k3 # RPUSH mylist=>{2,1,3}
(integer) 3
127.0.0.1:6379> get mylist # 普通的get是无法获取list值的
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> LRANGE mylist 0 4 # LRANGE 获取起止位置范围内的元素
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> LRANGE mylist 0 2
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> LRANGE mylist 0 1
1) "k2"
2) "k1"
127.0.0.1:6379> LRANGE mylist 0 -1 # 获取全部元素
1) "k2"
2) "k1"
3) "k3"
---------------------------LPUSHX---RPUSHX-----------------------------------
127.0.0.1:6379> LPUSHX list v1 # list不存在 LPUSHX失败
(integer) 0
127.0.0.1:6379> LPUSHX list v1 v2
(integer) 0
127.0.0.1:6379> LPUSHX mylist k4 k5 # 向mylist中 左边 PUSH k4 k5
(integer) 5
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k5"
2) "k4"
3) "k2"
4) "k1"
5) "k3"
---------------------------LINSERT--LLEN--LINDEX--LSET----------------------------
127.0.0.1:6379> LINSERT mylist after k2 ins_key1 # 在k2元素后 插入ins_key1
(integer) 6
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k5"
2) "k4"
3) "k2"
4) "ins_key1"
5) "k1"
6) "k3"
127.0.0.1:6379> LLEN mylist # 查看mylist的长度
(integer) 6
127.0.0.1:6379> LINDEX mylist 3 # 获取下标为3的元素
"ins_key1"
127.0.0.1:6379> LINDEX mylist 0
"k5"
127.0.0.1:6379> LSET mylist 3 k6 # 将下标3的元素 set值为k6
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k5"
2) "k4"
3) "k2"
4) "k6"
5) "k1"
6) "k3"
---------------------------LPOP--RPOP--------------------------
127.0.0.1:6379> LPOP mylist # 左侧(头部)弹出
"k5"
127.0.0.1:6379> RPOP mylist # 右侧(尾部)弹出
"k3"
---------------------------RPOPLPUSH--------------------------
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
3) "k6"
4) "k1"
127.0.0.1:6379> RPOPLPUSH mylist newlist # 将mylist的最后一个值(k1)弹出,加入到newlist的头部
"k1"
127.0.0.1:6379> LRANGE newlist 0 -1
1) "k1"
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
3) "k6"
---------------------------LTRIM--------------------------
127.0.0.1:6379> LTRIM mylist 0 1 # 截取mylist中的 0~1部分
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "k4"
2) "k2"
# 初始 mylist: k2,k2,k2,k2,k2,k2,k4,k2,k2,k2,k2
---------------------------LREM--------------------------
127.0.0.1:6379> LREM mylist 3 k2 # 从头部开始搜索 至多删除3个 k2
(integer) 3
# 删除后:mylist: k2,k2,k2,k4,k2,k2,k2,k2
127.0.0.1:6379> LREM mylist -2 k2 #从尾部开始搜索 至多删除2个 k2
(integer) 2
# 删除后:mylist: k2,k2,k2,k4,k2,k2
---------------------------BLPOP--BRPOP--------------------------
mylist: k2,k2,k2,k4,k2,k2
newlist: k1
127.0.0.1:6379> BLPOP newlist mylist 30 # 从newlist中弹出第一个值,mylist作为候选
1) "newlist" # 弹出
2) "k1"
127.0.0.1:6379> BLPOP newlist mylist 30
1) "mylist" # 由于newlist空了 从mylist中弹出
2) "k2"
127.0.0.1:6379> BLPOP newlist 30
(30.10s) # 超时了
127.0.0.1:6379> BLPOP newlist 30 # 我们连接另一个客户端向newlist中push了test, 阻塞被解决。
1) "newlist"
2) "test"
(12.54s)
Set集合
Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。
集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。
---------------SADD--SCARD--SMEMBERS--SISMEMBER--------------------
127.0.0.1:6379> SADD myset m1 m2 m3 m4 # 向myset中增加成员 m1~m4
(integer) 4
127.0.0.1:6379> SCARD myset # 获取集合的成员数目
(integer) 4
127.0.0.1:6379> smembers myset # 获取集合中所有成员
1) "m4"
2) "m3"
3) "m2"
4) "m1"
127.0.0.1:6379> SISMEMBER myset m5 # 查询m5是否是myset的成员
(integer) 0 # 不是,返回0
127.0.0.1:6379> SISMEMBER myset m2
(integer) 1 # 是,返回1
127.0.0.1:6379> SISMEMBER myset m3
(integer) 1
---------------------SRANDMEMBER--SPOP----------------------------------
127.0.0.1:6379> SRANDMEMBER myset 3 # 随机返回3个成员
1) "m2"
2) "m3"
3) "m4"
127.0.0.1:6379> SRANDMEMBER myset # 随机返回1个成员
"m3"
127.0.0.1:6379> SPOP myset 2 # 随机移除并返回2个成员
1) "m1"
2) "m4"
# 将set还原到{m1,m2,m3,m4}
---------------------SMOVE--SREM----------------------------------------
127.0.0.1:6379> SMOVE myset newset m3 # 将myset中m3成员移动到newset集合
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "m4"
2) "m2"
3) "m1"
127.0.0.1:6379> SMEMBERS newset
1) "m3"
127.0.0.1:6379> SREM newset m3 # 从newset中移除m3元素
(integer) 1
127.0.0.1:6379> SMEMBERS newset
(empty list or set)
# 下面开始是多集合操作,多集合操作中若只有一个参数默认和自身进行运算
# setx=>{m1,m2,m4,m6}, sety=>{m2,m5,m6}, setz=>{m1,m3,m6}
-----------------------------SDIFF------------------------------------
127.0.0.1:6379> SDIFF setx sety setz # 等价于setx-sety-setz
1) "m4"
127.0.0.1:6379> SDIFF setx sety # setx - sety
1) "m4"
2) "m1"
127.0.0.1:6379> SDIFF sety setx # sety - setx
1) "m5"
-------------------------SINTER---------------------------------------
# 共同关注(交集)
127.0.0.1:6379> SINTER setx sety setz # 求 setx、sety、setx的交集
1) "m6"
127.0.0.1:6379> SINTER setx sety # 求setx sety的交集
1) "m2"
2) "m6"
-------------------------SUNION---------------------------------------
127.0.0.1:6379> SUNION setx sety setz # setx sety setz的并集
1) "m4"
2) "m6"
3) "m3"
4) "m2"
5) "m1"
6) "m5"
127.0.0.1:6379> SUNION setx sety # setx sety 并集
1) "m4"
2) "m6"
3) "m2"
4) "m1"
5) "m5"
Hash哈希
Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。
Set就是一种简化的Hash,只变动key,而value使用默认值填充。可以将一个Hash表作为一个对象进行存储,表中存放对象的信息。
------------------------HSET--HMSET--HSETNX----------------
127.0.0.1:6379> HSET studentx name sakura # 将studentx哈希表作为一个对象,设置name为sakura
(integer) 1
127.0.0.1:6379> HSET studentx name gyc # 重复设置field进行覆盖,并返回0
(integer) 0
127.0.0.1:6379> HSET studentx age 20 # 设置studentx的age为20
(integer) 1
127.0.0.1:6379> HMSET studentx sex 1 tel 15623667886 # 设置sex为1,tel为15623667886
OK
127.0.0.1:6379> HSETNX studentx name gyc # HSETNX 设置已存在的field
(integer) 0 # 失败
127.0.0.1:6379> HSETNX studentx email 12345@qq.com
(integer) 1 # 成功
----------------------HEXISTS--------------------------------
127.0.0.1:6379> HEXISTS studentx name # name字段在studentx中是否存在
(integer) 1 # 存在
127.0.0.1:6379> HEXISTS studentx addr
(integer) 0 # 不存在
-------------------HGET--HMGET--HGETALL-----------
127.0.0.1:6379> HGET studentx name # 获取studentx中name字段的value
"gyc"
127.0.0.1:6379> HMGET studentx name age tel # 获取studentx中name、age、tel字段的value
1) "gyc"
2) "20"
3) "15623667886"
127.0.0.1:6379> HGETALL studentx # 获取studentx中所有的field及其value
1) "name"
2) "gyc"
3) "age"
4) "20"
5) "sex"
6) "1"
7) "tel"
8) "15623667886"
9) "email"
10) "12345@qq.com"
--------------------HKEYS--HLEN--HVALS--------------
127.0.0.1:6379> HKEYS studentx # 查看studentx中所有的field
1) "name"
2) "age"
3) "sex"
4) "tel"
5) "email"
127.0.0.1:6379> HLEN studentx # 查看studentx中的字段数量
(integer) 5
127.0.0.1:6379> HVALS studentx # 查看studentx中所有的value
1) "gyc"
2) "20"
3) "1"
4) "15623667886"
5) "12345@qq.com"
-------------------------HDEL--------------------------
127.0.0.1:6379> HDEL studentx sex tel # 删除studentx 中的sex、tel字段
(integer) 2
127.0.0.1:6379> HKEYS studentx
1) "name"
2) "age"
3) "email"
-------------HINCRBY--HINCRBYFLOAT------------------------
127.0.0.1:6379> HINCRBY studentx age 1 # studentx的age字段数值+1
(integer) 21
127.0.0.1:6379> HINCRBY studentx name 1 # 非整数字型字段不可用
(error) ERR hash value is not an integer
127.0.0.1:6379> HINCRBYFLOAT studentx weight 0.6 # weight字段增加0.6
"90.8"
Zset(有序集合)
不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。
score相同:按字典顺序排序
有序集合的成员是唯一的,但分数(score)却可以重复。
-------------------ZADD--ZCARD--ZCOUNT--------------
127.0.0.1:6379> ZADD myzset 1 m1 2 m2 3 m3 # 向有序集合myzset中添加成员m1 score=1 以及成员m2 score=2..
(integer) 2
127.0.0.1:6379> ZCARD myzset # 获取有序集合的成员数
(integer) 2
127.0.0.1:6379> ZCOUNT myzset 0 1 # 获取score在 [0,1]区间的成员数量
(integer) 1
127.0.0.1:6379> ZCOUNT myzset 0 2
(integer) 2
----------------ZINCRBY--ZSCORE--------------------------
127.0.0.1:6379> ZINCRBY myzset 5 m2 # 将成员m2的score +5
"7"
127.0.0.1:6379> ZSCORE myzset m1 # 获取成员m1的score
"1"
127.0.0.1:6379> ZSCORE myzset m2
"7"
--------------ZRANK--ZRANGE-----------------------------------
127.0.0.1:6379> ZRANK myzset m1 # 获取成员m1的索引,索引按照score排序,score相同索引值按字典顺序顺序增加
(integer) 0
127.0.0.1:6379> ZRANK myzset m2
(integer) 2
127.0.0.1:6379> ZRANGE myzset 0 1 # 获取索引在 0~1的成员
1) "m1"
2) "m3"
127.0.0.1:6379> ZRANGE myzset 0 -1 # 获取全部成员
1) "m1"
2) "m3"
3) "m2"
#testset=>{abc,add,amaze,apple,back,java,redis} score均为0
------------------ZRANGEBYLEX---------------------------------
127.0.0.1:6379> ZRANGEBYLEX testset - + # 返回所有成员
1) "abc"
2) "add"
3) "amaze"
4) "apple"
5) "back"
6) "java"
7) "redis"
127.0.0.1:6379> ZRANGEBYLEX testset - + LIMIT 0 3 # 分页 按索引显示查询结果的 0,1,2条记录
1) "abc"
2) "add"
3) "amaze"
127.0.0.1:6379> ZRANGEBYLEX testset - + LIMIT 3 3 # 显示 3,4,5条记录
1) "apple"
2) "back"
3) "java"
127.0.0.1:6379> ZRANGEBYLEX testset (- [apple # 显示 (-,apple] 区间内的成员
1) "abc"
2) "add"
3) "amaze"
4) "apple"
127.0.0.1:6379> ZRANGEBYLEX testset [apple [java # 显示 [apple,java]字典区间的成员
1) "apple"
2) "back"
3) "java"
-----------------------ZRANGEBYSCORE---------------------
127.0.0.1:6379> ZRANGEBYSCORE myzset 1 10 # 返回score在 [1,10]之间的的成员
1) "m1"
2) "m3"
3) "m2"
127.0.0.1:6379> ZRANGEBYSCORE myzset 1 5
1) "m1"
2) "m3"
--------------------ZLEXCOUNT-----------------------------
127.0.0.1:6379> ZLEXCOUNT testset - +
(integer) 7
127.0.0.1:6379> ZLEXCOUNT testset [apple [java
(integer) 3
------------------ZREM--ZREMRANGEBYLEX--ZREMRANGBYRANK--ZREMRANGEBYSCORE--------------------------------
127.0.0.1:6379> ZREM testset abc # 移除成员abc
(integer) 1
127.0.0.1:6379> ZREMRANGEBYLEX testset [apple [java # 移除字典区间[apple,java]中的所有成员
(integer) 3
127.0.0.1:6379> ZREMRANGEBYRANK testset 0 1 # 移除排名0~1的所有成员
(integer) 2
127.0.0.1:6379> ZREMRANGEBYSCORE myzset 0 3 # 移除score在 [0,3]的成员
(integer) 2
# testset=> {abc,add,apple,amaze,back,java,redis} score均为0
# myzset=> {(m1,1),(m2,2),(m3,3),(m4,4),(m7,7),(m9,9)}
----------------ZREVRANGE--ZREVRANGEBYSCORE--ZREVRANGEBYLEX-----------
127.0.0.1:6379> ZREVRANGE myzset 0 3 # 按score递减排序,然后按索引,返回结果的 0~3
1) "m9"
2) "m7"
3) "m4"
4) "m3"
127.0.0.1:6379> ZREVRANGE myzset 2 4 # 返回排序结果的 索引的2~4
1) "m4"
2) "m3"
3) "m2"
127.0.0.1:6379> ZREVRANGEBYSCORE myzset 6 2 # 按score递减顺序 返回集合中分数在[2,6]之间的成员
1) "m4"
2) "m3"
3) "m2"
127.0.0.1:6379> ZREVRANGEBYLEX testset [java (add # 按字典倒序 返回集合中(add,java]字典区间的成员
1) "java"
2) "back"
3) "apple"
4) "amaze"
-------------------------ZREVRANK------------------------------
127.0.0.1:6379> ZREVRANK myzset m7 # 按score递减顺序,返回成员m7索引
(integer) 1
127.0.0.1:6379> ZREVRANK myzset m2
(integer) 4
# mathscore=>{(xm,90),(xh,95),(xg,87)} 小明、小红、小刚的数学成绩
# enscore=>{(xm,70),(xh,93),(xg,90)} 小明、小红、小刚的英语成绩
-------------------ZINTERSTORE--ZUNIONSTORE-----------------------------------
127.0.0.1:6379> ZINTERSTORE sumscore 2 mathscore enscore # 将mathscore enscore进行合并 结果存放到sumscore
(integer) 3
127.0.0.1:6379> ZRANGE sumscore 0 -1 withscores # 合并后的score是之前集合中所有score的和
1) "xm"
2) "160"
3) "xg"
4) "177"
5) "xh"
6) "188"
127.0.0.1:6379> ZUNIONSTORE lowestscore 2 mathscore enscore AGGREGATE MIN # 取两个集合的成员score最小值作为结果的
(integer) 3
127.0.0.1:6379> ZRANGE lowestscore 0 -1 withscores
1) "xm"
2) "70"
3) "xg"
4) "87"
5) "xh"
6) "93"
六、三种特殊的数据类型及其操作命令
1. Geospatial(地理位置)
使用经纬度定位地理坐标并用一个有序集合zset保存,所以zset命令也可以使用
相关数据
有效经纬度
- 有效的经度从-180度到180度。
- 有效的纬度从-85.05112878度到85.05112878度。
指定单位的参数 unit
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
关于GEORADIUS的参数
通过georadius就可以完成 附近的人功能
- withcoord:带上坐标
- withdist:带上距离,单位与半径单位相同
- COUNT n : 只显示前n个(按距离递增排序)
----------------georadius---------------------
127.0.0.1:6379> GEORADIUS china:city 120 30 500 km withcoord withdist # 查询经纬度(120,30)坐标500km半径内的成员
1) 1) "hangzhou"
2) "29.4151"
3) 1) "120.20000249147415"
2) "30.199999888333501"
2) 1) "shanghai"
2) "205.3611"
3) 1) "121.40000134706497"
2) "31.400000253193539"
------------geohash---------------------------
127.0.0.1:6379> geohash china:city yichang shanghai # 获取成员经纬坐标的geohash表示
1) "wmrjwbr5250"
2) "wtw6ds0y300"
2. Hyperloglog(基数统计)
传统实现基数统计
存储用户的id,然后每次进行比较。当用户变多之后这种方式及其浪费空间,而我们的目的只是计数,Hyperloglog就能帮助我们利用最小的空间完成。
Hyperloglog实现基数统计
Redis HyperLogLog 是用来做基数统计的算法。(基数就是数据集合中不重复的元素的个数)
HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。
因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
其底层使用string数据类型
----------PFADD--PFCOUNT---------------------
127.0.0.1:6379> PFADD myelemx a b c d e f g h i j k # 添加元素
(integer) 1
127.0.0.1:6379> type myelemx # hyperloglog底层使用String
string
127.0.0.1:6379> PFCOUNT myelemx # 估算myelemx的基数
(integer) 11
127.0.0.1:6379> PFADD myelemy i j k z m c b v p q s
(integer) 1
127.0.0.1:6379> PFCOUNT myelemy
(integer) 11
----------------PFMERGE-----------------------
127.0.0.1:6379> PFMERGE myelemz myelemx myelemy # 合并myelemx和myelemy 成为myelemz
OK
127.0.0.1:6379> PFCOUNT myelemz # 估算基数
(integer) 17
3. BitMaps(位图)
使用位存储,信息状态只有 0 和 1。Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset),在bitmap上可执行AND,OR,XOR,NOT以及其它位操作。
------------setbit--getbit--------------
127.0.0.1:6379> setbit sign 0 1 # 设置sign的第0位为 1
(integer) 0
127.0.0.1:6379> setbit sign 2 1 # 设置sign的第2位为 1 不设置默认 是0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> type sign
string
127.0.0.1:6379> getbit sign 2 # 获取第2位的数值
(integer) 1
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 4 # 未设置默认是0
(integer) 0
-----------bitcount----------------------------
127.0.0.1:6379> BITCOUNT sign # 统计sign中为1的位数
(integer) 4
七、 Redis事务
参考文章:Redis的事务满足原子性吗?
Redis的单条命令是保证原子性的,但是redis事务不能保证原子性
和数据库事务类似,redis事务也是用来一次性地执行多条命令。使用起来也很简单:
#开启事务(multi)
#命令入队
#执行事务(exec)
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set name Hydra
QUEUED
127.0.0.1:6379> set age 18
QUEUED
127.0.0.1:6379> incr age
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) (integer) 19
Redis事务满足原子性吗?(原子性:要求一个事务中的所有操作,要么全部完成,要么全部不完成)
如果要验证redis事务是否满足原子性,那么需要在redis事务执行发生异常的情况下进行,下面我们分两种不同类型的错误分别测试。
省流
存在语法错误的情况下,所有命令都不会执行存在运行错误的情况下,除执行中出现错误的命令外,其他命令都能正常执行
语法错误
首先测试命令中有语法错误的情况,这种情况多为命令的参数个数不正确或输入的命令本身存在错误。下面我们在事务中输入一个存在格式错误的命令,开启事务并依次输入下面的命令:
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set name Hydra
QUEUED
127.0.0.1:6379> incr
(error) ERR wrong number of arguments for 'incr' command
127.0.0.1:6379> set age 18
QUEUED
输入的命令incr
后面没有添加参数,属于命令格式不对的语法错误,这时在命令入队时就会立刻检测出错误并提示error
。使用exec执行事务,查看结果输出:
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
在这种情况下,只要事务中的一条命令有语法错误,在执行exec后就会直接返回错误,包括语法正确的命令在内的所有命令都不会被执行。
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> get age
(nil)
运行错误
运行错误是指输入的指令格式正确,但是在命令执行期间出现的错误,典型场景是当输入参数的数据类型不符合命令的参数要求时,就会发生运行错误。例如下面的例子中,对一个string类型的值执行列表的操作,报错如下:
127.0.0.1:6379> set key1 value1
OK
127.0.0.1:6379> lpush key1 value2
(error) WRONGTYPE Operation against a key holding the wrong kind of value
这种错误在redis实际执行指令前是无法被发现的,只能当真正执行才能够被发现,因此这样的命令是可以被事务队列接收的,不会和上面的语法错误一样立即报错。
具体看一下当事务中存在运行错误的情况,在下面的事务中,尝试对string类型数据进行incr自增操作:
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set name Hydra
QUEUED
127.0.0.1:6379> set age eighteen
QUEUED
127.0.0.1:6379> incr age
QUEUED
127.0.0.1:6379> del name
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) (error) ERR value is not an integer or out of range
4) (integer) 1
运行结果可以看到,虽然incr age这条命令出现了错误,但是它前后的命令都正常执行了,再看一下这些key对应的值,确实证明了其余指令都执行成功:
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> get age
"eighteen"
八、Redis对key的监控
使用watch key监控指定数据,相当于乐观锁加锁。
1. 单线程情况下,观察watch key
127.0.0.1:6379> set money 100 # 设置余额:100
OK
127.0.0.1:6379> set use 0 # 支出使用:0
OK
127.0.0.1:6379> watch money # 监视money (上锁)
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> INCRBY use 20
QUEUED
127.0.0.1:6379> exec # 监视值没有被中途修改,事务正常执行
1) (integer) 80
2) (integer) 20
2. 多线程情况下,使用watch key
线程1:
127.0.0.1:6379> watch money # money上锁
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> INCRBY use 20
QUEUED
127.0.0.1:6379> # 此时事务并没有执行
线程2,模拟线程插队:
127.0.0.1:6379> INCRBY money 500 # 修改了线程一中监视的money
(integer) 600
线程1,执行事务:
127.0.0.1:6379> EXEC # 执行之前,另一个线程修改了我们的值,这个时候就会导致事务执行失败
(nil) # 没有结果,说明事务执行失败
127.0.0.1:6379> get money # 线程2 修改生效
"600"
127.0.0.1:6379> get use # 线程1事务执行失败,数值没有被修改
"0"
解决:unwatch解锁获取最新值,然后再加锁进行事务。
每次提交执行exec后都会自动释放锁,不管是否成功
九、Redis数据库密码
默认情况下redis数据库是无口令直接登录的:直接运行redis-cli 即可登录。
当然我知道有人还是有设置Redis数据库密码的需求,它来了
在redis.windows.conf文件中设置密码的命令中添加requirepass 123456一行,将Redis数据库的密码设置为123456,设置完成。
测试
运行Redis-server.exe程序
运行Redis-cli.exe程序
输入 key * 命令
需要输入密码获取权限
进行远程登录:redis-cli -h host -p port -a password # 在远程登录redis
[root@localhost src]# redis-cli -p 6379 -a root123
Warning: Using a password with '-a' option on the command line interface may not be safe.
127.0.0.1:6379>
(不建议使用)因为这样明文密码会留存到 history 里面,不安全。
所以远程登录还是不要输入密码,使用AUTH命令
redis-cli -p 6379
redis 127.0.0.1:6379> auth test123456
OK
虽然AUTH命令跟其他redis命令一样,是没有加密的;阻止不了攻击者在网络上窃取你的密码;认证层的目标是提供多一层的保护。如果防火墙或者用来保护redis的系统防御外部攻击失败的话,外部用户如果没有通过密码认证还是无法访问redis的。
十、Jedis = 使用Java来操作Redis
在Java中,Redis对应于Jedis就相当于关系数据库对应于JDBC。
常规操作:要使用一个工具就要导入依赖,需要时还要配置相关数据
public class TestTX {
public static void main(String[] args) {
Jedis jedis = new Jedis("39.99.xxx.xx", 6379);
JSONObject jsonObject = new JSONObject();
jsonObject.put("hello", "world");
jsonObject.put("name", "kuangshen");
// 开启事务
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
// jedis.watch(result)
try {
multi.set("user1", result);
multi.set("user2", result);
// 执行事务
multi.exec();
}catch (Exception e){
// 放弃事务
multi.discard();
} finally {
// 关闭连接
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
jedis.close();
}
}
}
十一、SpringBoot整合Redis
在用户只需要在配置文件application.*
中配置少量参数就可以使用官方默认提供RedisTemplate
和StringRedisTemplate
来操作redis。由于官方提供的RedisTemplate
提供的功能有限,难以针对java的复杂数据类型进行序列化,且采用直连的方式以及没有对连接数进行限制等诸多因素在现代引用中制约较大,所以项目中一般需要提供一个RedisConfig
类来针对redisTemplate做进一步配置。
- pom.xml导入依赖
<!--Redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 编辑配置文件application.*,具体格式看你文件类型(
这里使用application.properties
)
springboot 2.x后 ,原来使用的 Jedis 被 lettuce 替换。还有一些连接池相关的配置。注意使用时一定使用Lettuce的连接池。
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=@redis.host@
# Redis服务器连接端口
spring.redis.port=@redis.port@
# Redis服务器连接密码(默认为空)
spring.redis.password=@redis.password@
# 连接超时时间(毫秒)
spring.redis.timeout=20000
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=500
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
- 乱码问题
@SpringBootTest
class SpringBoot06JdbcApplicationTests {
@Autowired
private RedisTemplate redisTemplate; //操作k-v都是对象的
@Autowired
private RedisTemplate MyRedisTemplate;
@Test
void test2(){
Role role = new Role(1,"1515","小明",1,new Date(),2,new Date());
ValueOperations valueOperations = redisTemplate.opsForValue();
valueOperations.set("role", role);
}
}
出现了乱码问题
解决问题
改变RedisTemplate类的默认序列化格式,这就需要通过RedisConfig来针redisTemplate做进一步配置。
更改相关序列化,只是为了使得存储的key和value不出现乱码
我们来看一下RedisAutoConfiguration:通过源码可以看出,SpringBoot自动帮我们在容器中生成了一个RedisTemplate和一个StringRedisTemplate。
看到这个@ConditionalOnMissingBean注解后,就知道如果Spring容器中有了RedisTemplate对象了,这个自动配置的RedisTemplate不会实例化。因此我们可以直接自己写个配置类,配置RedisTemplate。
public class RedisAutoConfiguration {
@Bean
@ConditionalOnMissingBean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(
RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(
RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
创建一个Bean加入容器,就会触发RedisTemplate上的条件注解,使默认的RedisTemplate失效。
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
// 将template 泛型设置为 <String, Object>
RedisTemplate<String, Object> template = new RedisTemplate();
// 连接工厂,不必修改
template.setConnectionFactory(redisConnectionFactory);
/*
* 序列化设置
*/
// key、hash的key 采用 String序列化方式
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// value、hash的value 采用 Jackson 序列化方式
template.setValueSerializer(RedisSerializer.json());
template.setHashValueSerializer(RedisSerializer.json());
template.afterPropertiesSet();
return template;
}
}
重新测试,得到结果
十二、Redis在SpringBoot中的操作函数大全
参考文章:RedisTemplate操作函数大全
十三、自定义Redis工具类
使用RedisTemplate需要频繁调用.opForxxx然后才能进行对应的操作,这样使用起来代码效率低下,工作中一般不会这样使用,而是将这些常用的公共API抽取出来封装成为一个工具类,然后直接使用工具类来间接操作Redis,不但效率高并且易用。
package com.zxy.demo.redis;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
/**
* Redis工具类
* @author ZENG.XIAO.YAN
* @date 2018年6月7日
*/
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
* @return
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
* @return
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
* @return
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
* @return
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
* @param key 键
* @return
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
* @param key 键
* @return
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
十四、Redis实现持久化
省流:通过RDB直接把数据持久化到数据库,通过AOF把执行过的命令记录下来,然后在执行一遍
1. RDB
在指定时间间隔后,将内存中的数据集快照写入数据库RDB文件 ;在恢复时候,直接读取快照文件RDB文件,进行数据的恢复 ;
在进行 RDB 的时候,redis 的主线程是不会做 io 操作的,主线程会 fork 一个子线程来完成该操作;
- Redis 调用forks。同时拥有父进程和子进程。
- 子进程将数据集写入到一个临时 RDB 文件中。
- 当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件。
这种工作方式使得 Redis 可以从写时复制(copy-on-write)机制中获益,因为是使用子进程进行写操作,而父进程依然可以接收来自客户端的请求。
触发机制
save 命令
,会立刻对当前内存中的数据进行持久化 ,但是会阻塞,也就是不接受其他操作
flushall命令
:可以通过配置文件对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时, 自动进行数据集保存操作。
bgsave命令
是异步进行,进行持久化的时候,redis 还可以将继续响应客户端请求 ;
- 退出redis,也会自动产生rdb文件
2. AOF
将我们所有的命令都记录下来,history,恢复的时候就把这个文件全部再执行一遍
快照功能(RDB)并不是非常耐久(durable): 如果 Redis 因为某些原因而造成故障停机, 那么服务器将丢失最近写入、以及未保存到快照中的那些数据。 从 1.1 版本开始, Redis 增加了一种完全耐久的持久化方式: AOF 持久化。
如果要使用AOF,需要修改配置文件:
appendonly no yes
则表示启用AOF
默认是不开启的,我们需要手动配置,然后重启redis,就可以生效了!
3. RDB与AOF两者的选择
AOF文件远远大于RDB,修复比RDB慢,运行效率也慢,基本上都比RDB差,唯一的优点是比RDB安全。所以如果你非常关心你的数据运行效率, 但仍然可以承受数分钟以内的数据丢失, 那么你可以只使用 RDB 持久化。
十五、Redis发布与订阅
每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构,该结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息,其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。
客户端订阅,就被链接到对应频道的链表的尾部,退订则就是将客户端节点从链表中移除。
------------订阅端----------------------
127.0.0.1:6379> SUBSCRIBE sakura # 订阅sakura频道
Reading messages... (press Ctrl-C to quit) # 等待接收消息
1) "subscribe" # 订阅成功的消息
2) "sakura"
3) (integer) 1
1) "message" # 接收到来自sakura频道的消息 "hello world"
2) "sakura"
3) "hello world"
1) "message" # 接收到来自sakura频道的消息 "hello i am sakura"
2) "sakura"
3) "hello i am sakura"
--------------消息发布端-------------------
127.0.0.1:6379> PUBLISH sakura "hello world" # 发布消息到sakura频道
(integer) 1
127.0.0.1:6379> PUBLISH sakura "hello i am sakura" # 发布消息
(integer) 1
-----------------查看活跃的频道------------
127.0.0.1:6379> PUBSUB channels
1) "sakura"
缺点
- 如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
- 这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。
十六、缓存雪崩、缓存穿透、缓存击穿
1. 内存放生的三种问题
缓存雪崩
:在短时间内,有大量缓存同时过期,导致大量的请求直接查询数据库(一大片)缓存穿透
:查询数据库和缓存都无数据,因为数据库查询无数据,出于容错考虑,不会将结果保存到缓存中,因此每次请求都会去查询数据库缓存击穿
:某个热点缓存,在某一时刻恰好失效了,然后此时刚好有大量的并发请求,此时这些请求将会给数据库造成巨大的压力,
2. 缓存雪崩的解决办法
- 加锁排队:先排队查询数据库,再放入缓存中。防止大量的请求同时操作数据库,但是会降低系统性能
- 随机化过期时间:顾名思义
- 设置二级缓存:二级缓存指的是除了 Redis 本身的缓存,再设置一层缓存,当 Redis 失效之后,先去查询二级缓存。
- 及时更新缓存:给每一个缓存数据增加相应的缓存标记,记录缓存的是否失效,如果缓存标记失效,则更新数据缓存。
3. 缓存穿透
- 设置缓存有效时间:从缓存取不到的数据,在数据库中也没有取到,这时也可以将key-value对写为key-null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)
- 布隆过滤器:将所有可能存在的数据哈希到一个足够大的 bitmap 中,一个一定不存在的数据会被这个 bitmap 拦截掉,从而避免了对底层存储系统的查询压力。
4. 缓存击穿
- 设置热点数据永远不过期
- 加互斥锁,互斥锁:此处理方式和缓存雪崩加锁排队的方法类似,都是在查询数据库时加锁排队,缓冲操作请求以此来减少服务器的运行压力