redsync使用教程
- 前言
- redsync结构
- Pool结构
- Mutex结构
- acquire加锁操作
- release解锁操作
- redsync包的使用
前言
在编程语言中锁可以理解为一个变量,该变量在同一时刻只能有一个线程拥有,以便保护共享数据在同一时刻只有一个线程去操作。对于高可用的分布式锁应该满足以下条件:
1.互斥:在任意时间内,只有一个客户能够获得一把锁,具有排他性。
2.避免死锁:即使客户端宕机或者从集群中分离了,其它客户端仍然可以获取到该锁
3.容错:只要大部分Redis节点存活,客户端就能正确地获取锁和释放锁。即使锁住某个资源的客户端释放锁之前崩溃或者网络分区仍然能够获取锁和释放锁。
对于Redis高可用集群而言,上述三个条件都非常容易满足,所以适合做分布式锁。
redsync结构
redsync 的通用结构定义如下:
- Pool:抽象连接池
- Conn:抽象每个 Redis 连接
- Script:Redis 脚本
Pool结构
redsync结构的Pools是一个redis.pool数组,每个 redis.Pool 都是上面的 Pool 实现,它代表了一个 Redis 实例的连接池:
Mutex结构
Mutex代表了一个分布式锁,其成员多为 redlock 算法所需要的条件:
// A Mutex is a distributed mutual exclusion lock.
type Mutex struct {
name string // 名称
expiry time.Duration // 锁的有效时间
tries int // 尝试次数
delayFunc DelayFunc // 失败尝试设置延迟
factor float64 // 误差系数控制
quorum int // 投票数 一般为节点数 / 2+1,节点数为奇数
genValueFunc func() (string, error) // 加密函数,生成唯一随机串
value string // 默认就是唯一随机串
until time.Time // 过期时间
pools []Pool // 连接池(每个 Pool 指一个 Redis 实例)
}
获取锁的Lock方法实现了redLock的加锁接口,具体实现如下
func (m *Mutex) LockContext(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
//生成随机串,base64
value, err := m.genValueFunc()
if err != nil {
return err
}
//不超过tries次数进行加锁
for i := 0; i < m.tries; i++ {
if i != 0 {
time.Sleep(m.delayFunc(i))
}
start := time.Now()
n, err := func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
//尝试异步去获取锁
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
}()
now := time.Now()
// 过期时间 = 有效时间值 - 获取锁消耗的时间值 - 有效时间值 * 误差系数
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
//成功节点数>=节点数/2+1&& 未过期时,判定加锁成功
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
//获取锁失败,尝试异步去释放锁
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}()
if i == m.tries-1 && err != nil {
return err
}
}
return ErrFailed
}
time.Sleep(m.delayFunc(i))的失败重试逻辑是当客户端无法获取锁时会设置一个随机值来重试。这个随机值应当和申请锁时间错开,减少脑裂的可能性。此外,还调用了actOnPoolsAsync来实现非阻塞方式同时向多个Redis实例发送set请求。我们来看下actOnPoolsAsync是如何定义的。
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
type result struct {
Node int
Status bool
Err error
}
ch := make(chan result)
for node, pool := range m.pools {
go func(node int, pool redis.Pool) {
r := result{Node: node}
r.Status, r.Err = actFn(pool)
ch <- r
}(node, pool)
}
n := 0
var taken []int
var err error
for range m.pools {
r := <-ch
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, &RedisError{Node: r.Node, Err: r.Err})
} else {
taken = append(taken, r.Node)
err = multierror.Append(err, &ErrNodeTaken{Node: r.Node})
}
}
if len(taken) >= m.quorum {
return n, &ErrTaken{Nodes: taken}
}
return n, err
}
acquire加锁操作
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
release解锁操作
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
//调用Eval,以脚本方式释放锁
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
redsync包的使用
该包的使用很简单,具体步骤如下:
- 首先,创建一个Redis的客户端连接;
- 将该客户端连接加入到Redis的Pool中;
- redsync基于该Redis Pool进行实例化;
- 通过redsync实例的NewMutex就可以基于一个具体的key新建一个分布式锁,
该包进行实例化时有基于Redis的单机模式和集群模式两种使用方式,在使用上主要有两种区别: - Redis的客户端是以集群模式还是单机模式创建;
- 在导入redsync包时,集群模式需要导入goredis/v8的版本
具体例子如下:
func main() {
//创建redis的客户端连接
cli := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(cli)
rs := redsync.New(pool)
mutexname := "test-global-mutex"
mutex := rs.NewMutex(mutexname)
if err := mutex.Lock(); err != nil {
panic(err)
}
if ok, err := mutex.Unlock(); !ok || err == nil {
panic("unlock failed")
}
}