BitMap位图
Redis提供了Bitmaps这个“数据类型”可以实现对位的操作:
(1) Bitmaps本身不是一种数据类型, 实际上它就是字符串(key-value) , 但是它可以对字符串的位进行操作。
(2) Bitmaps单独提供了一套命令, 所以在Redis中使用Bitmaps和使用字符串的方法不太相同。 可以把Bitmaps想象成一个以位为单位的数组, 数组的每个单元只能存储0和1, 数组的下标在Bitmaps中叫做偏移量。
常用命令
1.setbit 设置位图
setbit设置Bitmaps中某个偏移量的值(0或1)
- offset:偏移量
2.getbit 获取位图
getbit获取Bitmaps中某个偏移量的值
3.bitcount 获取指定范围内值为1的个数
bitcount key start end
4、bitop
bitop [ operations ] [ result ] [key1] [keyn…]
- bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中。
应用场景
Bitmap 类型非常适合二值状态统计的场景,这里的二值状态就是指集合元素的取值就只有 0 和 1 两种,在记录海量数据时,Bitmap 能够有效地节省内存空间。
统计打卡情况
对于一天,如果签到了,就使用1表示,否则就用0表示。
比如下面,我们统计uid为1000的用户在2022年12月份的签到情况:
用setbit设置一天是否签到;用getbit获取一天是否签到。
获取12月份一共签到了几天
HyperLogLog
在实际需求中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby实现。
但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题。
- 在MySQL中,使用distinct count计算不重复个数
- 在Redis中,可以使用hash、set、bitmaps等数据结构来处理
以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,对于海量数据是不切实际的。
HyperLogLog概念
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。
HyperLogLog 是统计规则是基于概率完成的,不是非常准确,标准误算率是 0.81%。
在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时。
但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
什么是基数?
比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数。
命令
1、pfadd
pfadd < element> [element …] 添加指定元素到 HyperLogLog 中
2、pfcount
pfcount [key …] 计算HyperLogLog的近似基数,可以计算多个HLL。
3、pfmerge
pfmerge [sourcekey …] 将一个或多个HLL合并后的结果存储在另一个HLL中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得
应用场景
网页UV计算
使用pfadd把访问网页的用户添加到HLL中。
统计第一天的访问基数
合并前三天的访问基数
Geospatial
Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型,就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作。
GEO 本身并没有设计新的底层数据结构,而是直接使用了 Sorted Set 集合类型。GEO 类型使用 GeoHash 编码方法实现了经纬度到 Sorted Set 中元素权重分数的转换。
常用命令
1、geoadd
geoadd < longitude> [longitude latitude member…] 添加地理位置(经度,纬度,名称)
实例:
geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing
2、geopos
geopos [member…] 获得指定地区的坐标值
实例:
geopos china:city chongqing
3、geodist
geodist [m|km|ft|mi ] 获取两个位置之间的直线距离
实例:
geodist china:city chongqing beijing km
4、georadius
georadius< longitude>radius m|km|ft|mi 以给定的经纬度为中心,找出某一半径内的元素
实例:
georadius china:city 110 30 1000 km
应用场景
比如我们在地图上计算两个城市之间的地理关系:
Stream
Redis 专门为消息队列设计的数据类型。
在Stream没有出来以前。消息队列的实现方式或多或少都有一些缺陷:
- 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
- List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。
Stream类型,用于完美的实现消息队列;
支持消息的持久化,支持自动生成的全局唯一ID,支持ack确认消息模式,支持消费组模式等。提供了稳定和可靠的消息队列实现。
常用命令
# 插入消息,保证有序,可以自动生成的全局唯一ID
XADD
# 查询消息长度
XLEN
#用于读取信息,可以按照ID读取数据
XREAD:用于读取消息,可以按 ID 读取数据;
#根据消息 ID 删除消息
XDEL
#删除整个Stream
DEL
#读取区间消息
XRANGE
#按消费组形式读取消息
XREADGROUP
# 用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息;
XPENDING
#用于向消息队列确认消息处理已完成;
XACK
应用场景
生产者插入一条消息
# * 表示让Redis为插入数据自动生成一个全局唯一的ID
# 往命令为mymq的消息队列中插入一条消息。消息键是name 值是west
>XADD mymq * name west
"1670987339049-0"
消息ID的组成
- 第一部分“1670987339049”是数据插入时,以毫秒为单位计算的当前服务器时间
- 第二部分,“-0”表示当前毫秒内,插入的第一条消息,序号从0开始。
消费者通过 XREAD 命令从消息队列中读取消息
读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入ID的消息)。
# 因为我们上面只插入了一条消息,所以消息ID一定要小于想要读取的ID
XREAD STREAMS mymq 1670987339048-0
- 如果想要实现阻塞读取,可以调用 XRAED 时设定 BLOCK 配置项,单位是毫秒
- 当消息队列中没有数据时,XREAD也会发生阻塞
XREAD BLOCK 10000 STREAMS mymq $
XREAD和XADD的通信模型
使用 xadd 存入消息和 xread 循环阻塞读取消息的方式可以实现简易版的消息队列
Stream特性
消费组
Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。
# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取
127.0.0.1:6379> XGROUP CREATE mymq group1 0-0
OK
# 创建一个名为 group2 的消费组
127.0.0.1:6379> XGROUP CREATE mymq group2 0-0
OK
消费组 group1 内的消费者 consumer1 从 mymq 消息队列中读取所有消息:
# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
127.0.0.1:6379> XREADGROUP Group group1 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1670987339049-0"
2) 1) "name"
2) "west"
- 消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。
比如我们再执行上面的命令:
- 不同消费组的消费者可以消费同一条消息(前提条件:创建消息组的时候,不同消费组指定了相同位置开始读取消息)。
127.0.0.1:6379> XREADGROUP Group group2 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1670987339049-0"
2) 1) "name"
2) "west"
- 使用消费组的目的是实现负载均衡,因此往往同一个消费组的不同消费者读取不同的消息,从而实现消息读取负载在多个消费者间是均衡分布的:
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
127.0.0.1:6379> XREADGROUP Group group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1670989392669-0"
2) 1) "field1"
2) "value1"
# 让 group2 中的 consumer2 从 mymq 消息队列中消费2条消息
127.0.0.1:6379> XREADGROUP Group group2 consumer2 COUNT 2 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1670989404047-0"
2) 1) "field2"
2) "value2"
2) 1) "1670989416512-0"
2) 1) "field3"
2) "value3"
消息的可靠性
基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?
Streams会使用PENDING List留存消费组每个消费者读取的消息,直到消费者使用XACK命令通知Streams消息已经被处理。
- 如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。
- 此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
查看某个消费者具体读取了哪些数据,可以执行下面的命令:
XPENDING mymq group2 - + 10 consumer1
消息处理完后,使用XACK通知消息队列,这条消息就会被删除
127.0.0.1:6379> XACK mymq group2 1670987339049-0 1670989200775-0 1670989392669-0
(integer) 3
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer1
(empty array)