这个redis和上篇rabbitMQ一样,在之前我用Java从原理上进行了剖析,这里呢,我做项目的时候,也需要用到redis,所以这里也将去从怎么用的角度去写这篇文章。
文章目录
- 安装redis以及原理
- redis概念
- redis的应用场景有很多
- redis常用的数据类型
- 使用redis
- 连接
- 普通连接模式
- TLS连接模式
- Redis Sentinel模式
- Redis Cluster模式
- 操作数据
- 特殊
- 数据结构的处理
- Pipeline(缓冲读写)
- 使用`Pipelined` 方法,它会在函数退出时调用` Exec`
- 事务
- Watch
- GET 、SET和WATCH 的示例
安装redis以及原理
安装redis其实很简单,我这里用的是阿里云,所以给出阿里云的安装文件文档,但是在没有用阿里云的时候,其实也有其他的方式。
这里献上,安装redis及其原理的一篇文章,这篇文章是以Java来讲解的,但是本质是没有区别的。想看原理的朋友可以看这里。
redis概念
Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。
redis的应用场景有很多
缓存系统
,减轻主数据库(MySQL)的压力。计数场景
,比如微博、抖音中的关注数和粉丝数。热门排行榜
,需要排序的场景特别适合使用ZSET。- 利用
LIST
可以实现队列的功能。 - 利用
HyperLogLog
统计UV、PV等数据。 - 使用
geospatial index
进行地理位置相关查询。 会话存储
:保存用户的登录信息,原本的session里存储会话,只支持一次,诺是在分布式下,会使得,用户登录需要将这些分布式服务器都登录才可以,而redis就能很好的解决这个问题。存储普通缓存
:列如详情页等数据的缓存信息存储。实现分布式锁
:redis 可以非常方便的实现微服务选的分布式锁,redis 天然支持分布式服务。也可以使用zookeeper实现分布式锁
。简单的消息队列
:redis 自身提供的发布订阅模式,可以用来实现简单的消息队列
redis常用的数据类型
String
:(字符串):
常见的使用场景是存储session信息,存储缓存信息如详情页缓存,存储整数信息可使用incr实现整数 + 1和使用decr实现整数 - 1。list
:(列表类型):
常见使用场景式简单的消息队列,存储某项列表数据。Hash
:(哈希表):
常见使用场景是存储session
信息存储商品的购物车,购物车非常适合用哈希字典表示,使用人员唯一编号作为字典的key value
值,可以存储商品的ID和数量等信息,存储详情页信息。Set
:(集合):
一个无序并唯一的兼职集合,它的常见使用场景是实现关注功能,比如关注我的人,我关注的人使用集合存储可以保证人员不重复。Sorted Set
:(有序集合):
相比于set集合类型多了一个排序属性的 score (分值),它的常见使用场景是可以用来存储排名信息,关注列表功能,这样就可以根据关注实现排序展示了。
带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index)和流(stream)等数据结构。
使用redis
注意:
redis 7
对应 v9
,redis 6
对应 v8
安装v8版本:
go get github.com/redis/go-redis/v8
导入:
import "github.com/redis/go-redis/v9"
连接
你敢信这东西的链接手法多种多样
普通连接模式
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 密码
DB: 0, // 数据库
PoolSize: 20, // 连接池大小
})
使用 redis.ParseURL
函数从表示数据源的字符串
中解析得到 Redis 服务器的配置信息
opt, err := redis.ParseURL("redis://<user>:<pass>@localhost:6379/<db>")
if err != nil {
panic(err)
}
rdb := redis.NewClient(opt)
TLS连接模式
使用的是 TLS
连接方式,则需要使用 tls.Config
配置
rdb := redis.NewClient(&redis.Options{
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
// Certificates: []tls.Certificate{cert},
// ServerName: "your.domain.com",
},
})
Redis Sentinel模式
搭建模式:哨兵模式
下面的命令连接到由Redis Sentinel
管理的 Redis
服务器
rdb := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "master-name",
SentinelAddrs: []string{":9126", ":9127", ":9128"},
})
Redis Cluster模式
搭建模式:集群模式
命令连接到 Redis Cluster
,go-redis
支持按延迟或随机路由命令。
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
// 若要根据延迟或随机路由命令,请启用以下命令之一
// RouteByLatency: true,
// RouteRandomly: true,
})
例子:
package main
import (
"fmt"
"github.com/go-redis/redis"
)
// 声明一个全局的redisDb变量
var redisDb *redis.Client
// 根据redis配置初始化一个客户端
func initClient() (err error) {
redisDb = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // redis地址
Password: "", // redis密码,没有则留空
DB: 0, // 默认数据库,默认是0
})
//通过 *redis.Client.Ping() 来检查是否成功连接到了redis服务器
_, err = redisDb.Ping().Result()
if err != nil {
return err
}
return nil
}
func main() {
err := initClient()
if err != nil {
//redis连接错误
panic(err)
}
fmt.Println("Redis连接成功")
}
Options参数详解
Network
:网络类型 tcp 或者 unix, 默认是 tcp。Addr
:redis地址,格式 host:portOnConnect
:新建一个redis连接的时候,会回调这个函数Password
: redis密码,redis server没有设置可以为空DB
: redis数据库,序号从0开始,默认是0,可以不用设置MaxRetries
:redis操作失败最大重试次数,默认不重试。MinRetryBackoff
:最小重试时间间隔,默认是 8ms ; -1 表示关闭MaxRetryBackoff
:最大重试时间间隔,默认是 512ms; -1 表示关闭.DialTimeout
:redis连接超时时间,默认是 5 秒.ReadTimeout
:socket读取超时时间,默认 3 秒.WriteTimeout
:socket写超时时间PoolSize
:redis连接池的最大连接数,默认连接池大小等于 cpu个数 * 10MinIdleConns
:redis连接池最小空闲连接数。MaxConnAge
:redis连接最大的存活时间,默认不会关闭过时的连接.PoolTimeout
:从redis连接池获取一个连接之后,连接池最多等待这个拿出去的连接多长时间,默认是等待 ReadTimeout + 1 秒.IdleTimeout
:edis连接池多久会关闭一个空闲连接,默认是 5 分钟. -1 则表示关闭这个配置项IdleCheckFrequency
:多长时间检测一下,空闲连接,默认是 1 分钟. -1 表示关闭空闲连接检测readOnly
:只读设置,如果设置为true, redis只能查询缓存不能更新。
操作数据
redis
基本的key/value
操作,指的是针对value值的类型为字符串或者数字类型的读写操作
Set
:给数据库中名称为key的string赋予值value,并设置失效时间,0为永久有效Get
:查询数据库中名称为key的value值GetSet
:设置一个key的值,并返回这个key的旧值SetNX
:如果key不存在,则设置这个key的值,并设置key的失效时间。如果key存在,则设置不生效MGet
:批量查询key的值。比如redisDb.MGet(“name1”,“name2”,“name3”)MSet
:批量设置key的值。redisDb.MSet(“key1”, “value1”,“key2”, “value2”,“key3”, “value3”)Incr
:Incr函数每次加一,key对应的值必须是整数或nil,否则会报错incr key1: ERR value is not an integer or out of rangencrBy
:IncrBy函数,可以指定每次递增多少,key对应的值必须是整数或nilIncrByFloat
:IncrByFloat函数,可以指定每次递增多少,跟IncrBy的区别是累加的是浮点Decr
:Decr函数每次减一,key对应的值必须是整数或nil,否则会报错DecrBy
:DecrBy,可以指定每次递减多少,key对应的值必须是整数或nilDel
:删除key操作,支持批量删除 redisDb.Del(“key1”,“key2”,“key3”)Expire
:设置key的过期时间,单位秒Append
:给数据库中名称为key的string值追加value
以上是主要的方法。
例子:
可以尝试先填写数据然后读取。
// doCommand go-redis基本使用示例
func doCommand() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// 执行命令获取结果
val, err := rdb.Get(ctx, "key").Result()
fmt.Println(val, err)
// 先获取到命令对象
cmder := rdb.Get(ctx, "key")
fmt.Println(cmder.Val()) // 获取值
fmt.Println(cmder.Err()) // 获取错误
// 直接执行命令获取错误
err = rdb.Set(ctx, "key", 10, time.Hour).Err()
// 直接执行命令获取值
value := rdb.Get(ctx, "key").Val()
fmt.Println(value)
}
特殊
任意方法:Do方法
go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。
这个方法的作用是向Redis服务器发送一个命令并返回执行结果。
源码接口
:
func (c *Client) Do(ctx context.Context, cmd string, args ...interface{}) *Cmd
// doDemo rdb.Do 方法使用示例
func doDemo() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// 直接执行命令获取错误
err := rdb.Do(ctx, "set", "key", 10, "EX", 3600).Err()
fmt.Println(err)
// 执行命令获取结果
val, err := rdb.Do(ctx, "get", "key").Result()
fmt.Println(val, err)
}
其中:
ctx
是上下文,用于控制请求的生命周期。cmd
是要执行的Redis命令,比如"GET"、"SET"等。args
是传递给Redis命令的参数,比如键名、数值等。
Do方法的返回值是一个*Cmd
类型,它代表了一个异步执行
的命令,并且可以用来获取执行结果。
使用Do方法,你可以向Redis发送各种命令,并通过返回的*Cmd
对象获取执行结果,比如获取值、处理错误等。这使得在Golang中使用Redis变得非常方便和灵活。
上面这个例子我是用的官方文档中的。如果有朋友试了,就会发现,这东西只能指向第一个key:value,第二个看存不存在,存在就不报错,不存在就报错,然后第二个直接无法写入。这个主打的就是一个扯淡。真不知这玩意出来时干啥的。
redis.Nil
go-redis 库提供了一个 redis.Nil
错误来表示 Key 不存在的错误。因此在使用go-redis
时需要注意对返回错误的判断。在某些场景下我们应该区别处理redis.Nil
和其他不为 nil 的错误。
// getValueFromRedis redis.Nil判断
func getValueFromRedis(key, defaultValue string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
val, err := rdb.Get(ctx, key).Result()
if err != nil {
// 如果返回的错误是key不存在
if errors.Is(err, redis.Nil) {
return defaultValue, nil
}
// 出其他错了
return "", err
}
return val, nil
}
数据结构的处理
剩下的其实也就是对整这个数据结构的处理,包括string,hash,等集合类的操作。需要详细的说明的可以看这篇:非常详细的方法使用文章
Pipeline(缓冲读写)
Redis Pipeline
允许通过使用单个 client-server-client
往返执行多个命令来提高性能。区别于一个接一个地执行100个命令,你可以将这些命令放入 pipeline
中,然后使用1次读写操作像执行单个命令一样执行它们。这样做的好处是节省了执行命令的网络往返时间(RTT)。
pipe := rdb.Pipeline()
//添加计数器
incr := pipe.Incr(cxt, "pipeline_counter")
//设置过期时常
pipe.Expire(cxt, "pipeline_counter", time.Hour)
//Exec是将管道中缓冲的所有命令发送到redis-server。
cmds, err := pipe.Exec(cxt)
if err != nil {
panic(err)
}
fmt.Println(cmds)
fmt.Println(incr.Val())
Exec
:Exec是将管道中缓冲的所有命令发送到redis-server。
Expire
:设置过期时常
Incr
:计数器
Discard
:表示丢弃缓存中所有尚未执行的命令
Process
:把要执行的命令放入流水线缓冲区中
Do
:如果某个Redis命令还不支持,你可以使用Do来执行它。执行任何命令的API
Len
:获取管道中尚未执行的命令的数量
使用Pipelined
方法,它会在函数退出时调用 Exec
var incr *redis.IntCmd
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
incr = pipe.Incr(ctx, "pipelined_counter")
pipe.Expire(ctx, "pipelined_counter", time.Hour)
return nil
})
if err != nil {
panic(err)
}
// 在pipeline执行后获取到结果
fmt.Println(incr.Val())
我们可以遍历pipeline
命令的返回值依次获取每个命令的结果,示例代码中使用pipiline
一次执行了100个 Get 命令,在pipeline
执行后遍历取出100个命令的执行结果。
cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
for i := 0; i < 100; i++ {
pipe.Get(ctx, fmt.Sprintf("key%d", i))
}
return nil
})
if err != nil {
panic(err)
}
for _, cmd := range cmds {
fmt.Println(cmd.(*redis.StringCmd).Val())
}
一次性执行多个命令的场景下,就可以考虑使用 pipeline 来优化。
事务
Redis 是单线程执行命令的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,Multi/exec
能够确保在multi/exec
两个语句之间的命令之间没有其他客户端正在执行命令。
在这种场景我们需要使用 TxPipeline
或 TxPipelined
方法将 pipeline
命令使用 MULTI
和EXEC
包裹起来。
// TxPipeline demo
pipe := rdb.TxPipeline()
incr := pipe.Incr(ctx, "tx_pipeline_counter")
pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
_, err := pipe.Exec(ctx)
fmt.Println(incr.Val(), err)
// TxPipelined demo
var incr2 *redis.IntCmd
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
incr2 = pipe.Incr(ctx, "tx_pipeline_counter")
pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
return nil
})
fmt.Println(incr2.Val(), err)
相当于
MULTI
INCR pipeline_counter
EXPIRE pipeline_counts 3600
EXEC
Watch
搭配 WATCH命令来执行事务操作
从使用WATCH命令监视某个 key 开始,直到执行EXEC命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务
或者放弃事务
源码接口
Watch(fn func(*Tx) error, keys ...string) error
Watch
方法搭配 TxPipelined
的使用示例:
// watchDemo 在key值不变的情况下将其值+1
func watchDemo(ctx context.Context, key string) error {
return rdb.Watch(ctx, func(tx *redis.Tx) error {
n, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil {
return err
}
// 假设操作耗时5秒
// 5秒内我们通过其他的客户端修改key,当前事务就会失败
time.Sleep(5 * time.Second)
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, n+1, time.Hour)
return nil
})
return err
}, key)
}
将上面的函数执行并打印其返回值,如果我们在程序运行后的5秒内修改了被 watch 的 key 的值,那么该事务操作失败,返回redis: transaction failed
错误。
GET 、SET和WATCH 的示例
go-redis
官方文档中使用 GET
、SET
和WATCH
命令实现一个 INCR
命令的完整示例
// 此处rdb为初始化的redis连接客户端
const routineCount = 100
// 设置5秒超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// increment 是一个自定义对key进行递增(+1)的函数
// 使用 GET + SET + WATCH 实现,类似 INCR
increment := func(key string) error {
txf := func(tx *redis.Tx) error {
// 获得当前值或零值
n, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil {
return err
}
// 实际操作(乐观锁定中的本地操作)
n++
// 仅在监视的Key保持不变的情况下运行
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
// pipe 处理错误情况
pipe.Set(ctx, key, n, 0)
return nil
})
return err
}
// 最多重试100次
for retries := routineCount; retries > 0; retries-- {
err := rdb.Watch(ctx, txf, key)
if err != redis.TxFailedErr {
return err
}
// 乐观锁丢失
}
return errors.New("increment reached maximum number of retries")
}
// 开启100个goroutine并发调用increment
// 相当于对key执行100次递增
var wg sync.WaitGroup
wg.Add(routineCount)
for i := 0; i < routineCount; i++ {
go func() {
defer wg.Done()
if err := increment("counter3"); err != nil {
fmt.Println("increment error:", err)
}
}()
}
wg.Wait()
n, err := rdb.Get(ctx, "counter3").Int()
fmt.Println("最终结果:", n, err)