一.前言
在之前的学习中,我们已经介绍了Redis
中常见的五种基本的数据结构,而今天我们就要开始介绍Redis
的四种特殊的数据结构,它们分别是bitmap(位图)
, HyperLogLog(基数统计)
,Geospatial(地理信息)
,Stream
。
二.位图(Bitmap)
2.1 什么是位图
Bitmap(位图)
,是一串连续的二进制数组(0和1),可以通过偏移量(offset)定位元素,通过对最小单位bit
进行0|1
的设置来表示某个元素的值或者状态,由于 bit 是计算机中最小的单位,使用它进行储存将非常节省空间,特别适合一些数据量大且使用二值统计的场景。
2.2 位图的内部实现
Bitmap
本身是用 String
类型作为底层数据结构实现的一种统计二值状态的数据类型。String 类型是会保存为二进制的字节数组,所以,Redis 就把字节数组的每个 bit 位利用起来,用来表示一个元素的二值状态,我们可以把 Bitmap 看作是一个 bit 数组
拓展:BitMap的内部占用
# 首先将偏移量是0的位置设为1
# SETBIT key offset value
127.0.0.1:6379> SETBIT sid10t 0 1
(integer) 0
# 通过 STRLEN 命令,我们可以看到字符串的长度是1
# STRLEN key
127.0.0.1:6379> # STRLEN sid10t
(integer) 1
# 将偏移量是1的位置设置为1
127.0.0.1:6379> SETBIT sid10t 1 1
(integer) 0
# 此时字符串的长度还是为1,以为一个字符串有8个比特位,不需要再开辟新的内存空间
127.0.0.1:6379> STRLEN sid10t
(integer) 1
# 将偏移量是8的位置设置成1
127.0.0.1:6379> setbit sid10t 8 1
(integer) 0
# 此时字符串的长度变成2,因为一个字节存不下9个比特位,需要再开辟一个字节的空间
127.0.0.1:6379> STRLEN sid10t
(integer) 2
上面的例子可以说明,BitMap
所占用的空间和底层字符串占用的空间一致,假如 BitMap 偏移量的最大值是 OFFSET
,那么它底层所占用的空间为:
(
O
F
F
S
E
T
/
8
)
+
1
(OFFSET/8)+1
(OFFSET/8)+1
单位为字节,不过由于Redis中String
的最大长度为512M,所以BitMap
的offset的值也有其上限,它的最大值为
8
∗
1024
∗
1024
∗
512
=
2
3
2
8 * 1024 * 1024 * 512 = 2^32
8∗1024∗1024∗512=232
由于 C语言中字符串的末尾都要存储一位分隔符,所以实际上 BitMap 的 offset 值上限是:
(
8
∗
1024
∗
1024
∗
512
)
−
1
=
2
3
2
−
1
(8 * 1024 * 1024 * 512) -1 = 2^32 - 1
(8∗1024∗1024∗512)−1=232−1
2.3常用命令
SETBit key offset value # 设置值,其中 value 只能是 0 和 1
GETBit key offset # 获取值
BITCOUNT key [start end [BYTE|BIT]] #计算指定键(key)中位图的 1 的个数
# 参数说明
# key:要操作的键名。
# start 和 end(可选):指定计算范围的起始和结束偏移量(以字节为单位)。如果省略,则计算整个键的 1 的个数。
# BYTE 或 BIT(可选):指定计算的单位。BYTE 表示按字节计算,BIT 表示按位计算。如果不指定,则默认为按位计算。
# 返回指定 key 中第一次出现指定 value(0/1) 的位置
BITPOS [key] [value]
go-redis操作位图
位图非常适合用来统计一些而知状态统计的场景,在这种场景下,位图的使用能够最大限度的实现对内存空间的节省,而在我们使用redis
的Bitmap
时一般会有以下几个比较常见的场景:
签到统计
判断用户的登录态
统计连续签到用户总数
首先我们来看一下签到如何去实现它
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
type BitMap struct {
}
var rdb *redis.Client
var ctx context.Context
func main() {
InitRedis()
//以下测试为测试案例
user1 := BitMap{}
//假设是统计其六月的签到情况(这里实际实现我们可以考虑获取服务器时间并且利用正则表达式将其进行提取)
time := fmt.Sprintf("user1:%s", "2024-6")
//6-1签到
user1.AddBitMap(time, 0)
//6-2签到
user1.AddBitMap(time, 1)
//6-3签到
user1.AddBitMap(time, 2)
//6-4签到
user1.AddBitMap(time, 3)
//删除6-1签到
user1.DeleteBit(time, 0)
//统计签到次数
count, err := user1.CountBitMap(time)
if err != nil {
fmt.Println("统计签到次数失败:err", err)
} else {
fmt.Println("签到次数为:", count)
}
}
// AddBitMap 签到成功
func (bitmap *BitMap) AddBitMap(key string, offset int64) error {
err := rdb.SetBit(ctx, key, offset, 1).Err()
return err
}
// GetBitMap 获取签到情况
func (bitmap *BitMap) GetBitMap(key string, offset int64) (int64, error) {
return rdb.GetBit(ctx, key, offset).Result()
}
// DeleteBitMap 删除用户的签到记录
func (bitmap *BitMap) DeleteBitMap(key string) error {
return rdb.Del(ctx, key).Err()
}
// DeleteBit 删除用户的某一天签到记录
func (bitmap *BitMap) DeleteBit(key string, offset int64) error {
return rdb.SetBit(ctx, key, offset, 0).Err()
}
// CountBitMap 统计签到次数
func (bitmap *BitMap) CountBitMap(key string) (int64, error) {
return rdb.BitCount(ctx, key, &redis.BitCount{
Start: 0,
End: -1,
}).Result()
}
func InitRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
ctx = context.Background()
}
运行结果:
统计用户登录态就比较简单了,我们只需要获取用户对应位图的位置获取其状态进行比较就可以了,比较麻烦的主要还是统计连续签到用户总数,代码如下:
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
type BitMap struct {
}
var rdb *redis.Client
var ctx context.Context
func main() {
InitRedis()
//模拟当天签到的时间
time1 := "2024-06-01"
time2 := "2024-06-02"
time3 := "2024-06-03"
//模拟用户
user1 := 1
user2 := 2
user3 := 3
user4 := 4
bitmap := BitMap{}
bitmap.AddBitMap(time1, int64(user1))
bitmap.AddBitMap(time1, int64(user2))
bitmap.AddBitMap(time1, int64(user3))
bitmap.AddBitMap(time1, int64(user4))
bitmap.AddBitMap(time2, int64(user3))
bitmap.AddBitMap(time2, int64(user1))
bitmap.AddBitMap(time3, int64(user4))
bitmap.AddBitMap(time3, int64(user1))
res := "2024-06-01-2024-06-03"
bitmap.TopBitMap(2, res, time1, time2, time3)
count, err := bitmap.CountBitMap(res)
if err != nil {
fmt.Println(err)
}
fmt.Println("连续三天签到的人数为:", count)
//获取该用户的id
fmt.Println(bitmap.GetOffset(0, 100, res))
}
// AddBitMap 签到成功
func (bitmap *BitMap) AddBitMap(key string, offset int64) error {
err := rdb.SetBit(ctx, key, offset, 1).Err()
return err
}
// GetBitMap 获取签到情况
func (bitmap *BitMap) GetBitMap(key string, offset int64) (int64, error) {
return rdb.GetBit(ctx, key, offset).Result()
}
// DeleteBitMap 删除用户的签到记录
func (bitmap *BitMap) DeleteBitMap(key string) error {
return rdb.Del(ctx, key).Err()
}
// DeleteBit 删除用户的某一天签到记录
func (bitmap *BitMap) DeleteBit(key string, offset int64) error {
return rdb.SetBit(ctx, key, offset, 0).Err()
}
// CountBitMap 统计签到次数
func (bitmap *BitMap) CountBitMap(key string) (int64, error) {
return rdb.BitCount(ctx, key, &redis.BitCount{
Start: 0,
End: -1,
}).Result()
}
func (bitmap *BitMap) TopBitMap(option int, resultkey string, key ...string) error {
switch {
case option == 1: // 或
return rdb.BitOpOr(ctx, resultkey, key...).Err()
case option == 2: // 与
return rdb.BitOpAnd(ctx, resultkey, key...).Err()
case option == 3: // 非
return rdb.BitOpNot(ctx, resultkey, key[0]).Err()
case option == 4: // 异或
return rdb.BitOpXor(ctx, resultkey, key...).Err()
default:
return rdb.BitOpOr(ctx, resultkey, key...).Err()
}
}
// GetOffset 获取连续签到的offset
func (bitmap *BitMap) GetOffset(start, end int, key string) []int {
var offset []int
for i := start; i <= end; i++ {
res, _ := bitmap.GetBitMap(key, int64(i))
if res == 1 {
offset = append(offset, i)
}
}
return offset
}
func InitRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
ctx = context.Background()
}
这个代码的实现思路主要是我们将每一天的签到情况作为一个bitmap
,然后我们将一定天数内的bitmap
进行&
操作并且将最终结果放在res
中,这时候我们只需要统计bitmap
中1的个数即可。
三.基数统计
3.1 什么是基数统计
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 264 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
3.2 基数统计的常用命令
PFADD key element [element ...] #添加指定元素到 HyperLogLog 中。
PFCOUNT key [key ...] #返回给定 HyperLogLog 的基数估算值。
PFMERGE destkey sourcekey [sourcekey ...] #将多个 HyperLogLog 合并为一个 HyperLogLog ,合并后的 HyperLogLog 的基数估算值是通过对所有 给定 HyperLogLog 进行并集计算得出的
四.Geospatial(地理信息)
4.1 Geospatial的常用命令
Redis GEO 主要用于存储地理位置信息,并对存储的信息进行操作,该功能在 Redis 3.2 版本新增,它常见的命令主要有以下几种:
GEOADD key longitude latitude member [longitude latitude member ...]# 用于存储指定的地理空间位置,可以将一个或多个经度(longitude)、纬度(latitude)、位置名称(member)添加到指定的 key 中。
GEOPOS key member [member ...] #用于从给定的 key 里返回所有指定名称(member)的位置(经度和纬度),不存在的返回 nil。
GEODIST key member1 member2 [m|km|ft|mi] # 用于返回两个给定位置之间的距离。
GEOHASH key member [member ...] # 用于获取一个或多个位置元素的 geohash 值
4.2 Geospatial的内部实现
GEO 本身并没有设计新的底层数据结构,而是直接使用了 Sorted Set 集合类型。
GEO 类型使用 GeoHash 编码方法实现了经纬度到 Sorted Set 中元素权重分数的转换,这其中的两个关键机制就是「对二维地图做区间划分」和「对区间进行编码」。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为 Sorted Set 元素的权重分数。
这样一来,我们就可以把经纬度保存到 Sorted Set 中,利用 Sorted Set 提供的“按权重进行有序范围查找”的特性,实现 LBS 服务中频繁使用的“搜索附近”的需求。多用于像附近朋友
,滴滴打车
等相关业务
五.Stream
5.1 什么是Stream
Redis Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。
在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:
发布订阅模式
,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;List
实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。
基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容,每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
上图参数说明:
Consumer Group
:消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。lastdeliveredid
:游标,每个消费组会有个游标 lastdeliveredid,任意一个消费者读取了消息都会使游标 lastdeliveredid 往前移动。pendingids
:消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pendingids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
5.2 Stream的常用命令
消息队列相关命令
XADD
添加消息到末尾XTRIM
对流进行修剪,限制长度XDEL
删除消息XLEN
获取流包含的元素数量,即消息长度XRANGE
获取消息列表,会自动过滤已经删除的消息XREVRANGE
反向获取消息列表,ID 从大到小XREAD
以阻塞或非阻塞方式获取消息列表
消费者组织的相关命令
XGROUP CREATE
创建消费者组XREADGROUP GROUP
读取消费者组中的消息XACK
将消息标记为"已处理"XGROUP SETID
为消费者组设置新的最后递送消息IDXGROUP DELCONSUMER
删除消费者XGROUP DESTROY
删除消费者组XPENDING
显示待处理消息的相关信息XCLAIM
转移消息的归属权XINFO
查看流和消费者组的相关信息;XINFO GROUPS
打印消费者组的信息;XINFO STREAM
打印流信息
6.go-redis操作Stream实现消息队列
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
// CustomGroup 消费者组
type CustomGroup struct {
Name string //消费者组名称
Queues []MessageQueue //一个消息队列下有多个消费者组
Custom []Custom //消费者组下有多个消费者
}
// MessageQueue 消息队列
type MessageQueue struct {
Name string //消息队列名称
Groups []CustomGroup //一个消息队列下有多个消费者组
}
type Custom struct {
Name string
}
var (
rdb *redis.Client
ctx context.Context
)
func main() {
InitRedis()
// 初始化消息队列和消费者组
queue := MessageQueue{Name: "my-queue"}
group := CustomGroup{Name: "my-group", Queues: []MessageQueue{queue}}
custom := Custom{Name: "my-consumer"}
if err := queue.AddMessage(queue, "", ""); err != nil { //创建消息队列
fmt.Println("Failed to create message queue:", err)
return
}
// 创建消费者组
if err := group.NewCustomGroup(group, queue); err != nil {
fmt.Println("Failed to create custom group:", err)
return
}
// 生产消息
go func() {
i := 0
for {
err := queue.AddMessage(queue, "field1", fmt.Sprintf("message %d", i))
if err != nil {
fmt.Println("Failed to add message:", err)
return
}
i++
time.Sleep(3 * time.Second)
}
}()
// 设置消息队列最大长度
if err := queue.SetMaxLen(queue, 100); err != nil {
fmt.Println("Failed to set max length:", err)
return
}
// 启动消费者协程
messageChan := make(chan []redis.XMessage)
go custom.StartConsumer(messageChan, group, queue, custom)
// 主线程消费消息
for {
select {
case messages := <-messageChan:
for _, msg := range messages {
fmt.Printf("Received message: %s\n", msg.Values["field1"])
}
case <-time.After(10 * time.Second):
fmt.Println("Timeout: No messages received")
}
}
}
// AddMessage 添加消息
func (q *MessageQueue) AddMessage(queue MessageQueue, filed, value string) error {
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: queue.Name,
Values: map[string]interface{}{
filed: value,
},
}).Result()
return err
}
// SetMaxLen 设置消息队列最大长度
func (q *MessageQueue) SetMaxLen(queue MessageQueue, maxlen int64) error {
err := rdb.XTrimMaxLen(ctx, queue.Name, maxlen).Err()
return err
}
// DeleteMessage 删除消息
func (q *MessageQueue) DeleteMessage(queue MessageQueue, id string) error {
err := rdb.XDel(ctx, queue.Name, id).Err()
return err
}
// GetMessageNum 获取消息队列长度
func (q *MessageQueue) GetMessageNum(queue MessageQueue) (int64, error) {
num, err := rdb.XLen(ctx, queue.Name).Result()
return num, err
}
// GetMessageList 获取消息队列列表
func (q *MessageQueue) GetMessageList(queue MessageQueue) ([]redis.XMessage, error) {
msgs, err := rdb.XRange(ctx, queue.Name, "-", "+").Result()
return msgs, err
}
// ReadMessage 读取消息
func (q *MessageQueue) ReadMessage(queue MessageQueue, count int, time time.Duration, idlist []string) {
for _, id := range idlist {
rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{queue.Name},
Count: int64(count),
Block: time,
ID: id,
})
}
}
// Info 获取消息队列信息
func (q *MessageQueue) Info(queue MessageQueue) (info *redis.XInfoStream, err error) {
return rdb.XInfoStream(ctx, queue.Name).Result()
}
// NewCustomGroup 创建消费者组
func (g *CustomGroup) NewCustomGroup(group CustomGroup, queue MessageQueue) error {
err := rdb.XGroupCreate(ctx, queue.Name, group.Name, "$").Err()
return err
}
// ReadGroupMessage 读取消费者组消息
func (g *CustomGroup) ReadGroupMessage(group CustomGroup, queue []MessageQueue, count int, time time.Duration, Ack bool) ([]redis.XStream, error) {
streams := []string{}
for _, q := range queue {
streams = append(streams, q.Name)
}
streams = append(streams, ">")
return rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group.Name,
//Consumer: custom.Name, //这里我们还可以指定读取消息的消费者,具体实现思路我们可以开多个协程运行多个消费者,给某个协程
//起一个协程名来作为消费者的名称来用于指定消费者组中的消费者
Streams: streams,
Count: int64(count),
Block: time, //阻塞时间,如果为0,则表示不阻塞,直接返回,如果为-1,则表示一直阻塞,直到有消息为止
NoAck: Ack,
}).Result()
}
// AckMessage 确认消息
func (g *CustomGroup) AckMessage(id string) error {
err := rdb.XAck(ctx, "stream", "group", id).Err()
return err
}
// DeleteConsumer 删除消费者
func (g *CustomGroup) DeleteConsumer(group CustomGroup, queue MessageQueue, custom Custom) error {
return rdb.XGroupDelConsumer(ctx, queue.Name, group.Name, custom.Name).Err()
}
// DeleteGroup 删除消费者组
func (g *CustomGroup) DeleteGroup(group CustomGroup, queue MessageQueue) error {
return rdb.XGroupDestroy(ctx, queue.Name, group.Name).Err()
}
// PendingMessage 获取待确认消息
func (g *CustomGroup) PendingMessage(group CustomGroup, queue MessageQueue) ([]redis.XPendingExt, error) {
return rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: queue.Name,
Group: group.Name,
Start: "-",
End: "+",
Count: 100,
}).Result()
}
// ClaimMessage 转移消息的归属权
func (g *CustomGroup) ClaimMessage(group CustomGroup, queue MessageQueue, custom Custom, id []string) error {
return rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: queue.Name,
Group: group.Name,
Consumer: custom.Name,
Messages: id,
MinIdle: time.Second, //最小空闲时间,只有那些在此时间内未被处理的消息才会被重新分配
}).Err()
}
// InfoGroup 获取消费者组信息
func (g *CustomGroup) InfoGroup(group CustomGroup, queue MessageQueue) ([]redis.XInfoGroup, error) {
return rdb.XInfoGroups(ctx, queue.Name).Result()
}
func (c *Custom) StartConsumer(messageChan chan<- []redis.XMessage, group CustomGroup, queue MessageQueue, custom Custom) {
for {
message, err := group.ReadGroupMessage(group, []MessageQueue{queue}, 1, time.Second, true)
if err != nil {
fmt.Println("custom message failed,err:", err)
}
if len(message) != 0 {
messageChan <- message[0].Messages
}
//确认消息
err = group.AckMessage(message[0].Messages[0].ID)
if err != nil {
fmt.Println("ack message failed,err:", err)
}
time.Sleep(5 * time.Second)
}
}
func InitRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
ctx = context.Background()
}
如上,我创建了三个结构体分别为Custom
,MessageQueue
和CustomGroup
,由于参考了面向对象的写法,所以整体我们来看这个代码主要就分成三个部分:
Custom
:它这里主要是StartCustom
,考虑到在实际生产时一般会有多个消费者所以我们可以用协程的方式来运行该函数MessageQueue
:封装了Redis
中有关信息队列的相关命令CustomGroup
: 封装了Redis
中有关消费者组的相关命令
最后我们在main函数中实现了一个比较简单的测试案例,主函数中执行了两个子协程,一个负责生产消息,一个负责消息的消费,同时通过channel
进行通信,让主函数可以接受到消费者消费的消息。
最后补充一下Stream
来做消息队列的优势:
持久化存储
:Stream中的消息可以被持久化存储,确保数据不会丢失,即使在Redis服务器重启后也能恢复消息。有序性
:消息按照产生顺序生成消息ID, 被添加到Stream中,并且可以按照指定的条件检索消息,保证了消息的有序性。多播与分组消费
:支持多个消费者同时消费同一流中的消息,并且可以将消费者组织成消费组,实现消息的分组消费。消息确认机制
:消费者可以通过XACK
命令确认是否成功消费消息,保证消息至少背消费一次
,确保消息不会被重复处理
。阻塞读取
:消费者可以选择阻塞读取模式,当没有新消息时,消费者会等待直至新消息到达。消息可回溯
: 方便补数、特殊数据处理, 以及问题回溯查询