文章目录
- 思维导图
- 为什么需要分布式锁?
- go语言分布式锁的实现
- Redis
- 自己的实现
- 单元测试
- 红锁是什么
- 别人的带红锁的实现
- etcd
- zk的实现
- 面试问题
- 什么是分布式锁?你用过分布式锁吗?
- 你使用的分布式锁性能如何,可以优化吗?
- 怎么用Redis来实现一个分布式锁?
- 怎么确定分布式锁的过期时间?
- 如果分布式锁过期了,但是业务还没有执行完毕,怎么办?
- 加锁的时候得到了超时响应,怎么办?
- 加锁的时候如果锁被人持有了,这时候怎么办?
- 分布式锁为什么要续约?续约失败了怎么办?如果重试一直都失败,怎么办?
- 怎么减少分布式锁竞争?
- 你知道redlock是什么吗?
思维导图
为什么需要分布式锁?
保证分布式系统并发请求或不同服务实例操作共享资源的安全性,确保在同一时间内,仅有一个进程能够修改共享资源,例如数据库记录或文件,主要用于解决分布式环境中的数据一致性和并发控制问题。 应用场景:用户下单,库存扣减,余额扣减。我们的场景:防止用户信息多写,积分扣减(扣减时几个请求都满足积分余额可扣的情况,只允许一个去扣),防止重复设置定时任务。
- 使用分布式锁可能会对性能产生一定的影响,但这是为了确保数据的一致性和正确性所必需的;如果操作是操作是幂等的(即使多次执行也会产生相同的结果),可能不需要分布式锁;
- 如果共享资源是MySQL数据,可以用MySQL 的乐观锁 SELECT FOR UPDATE 来实现分布式锁的。
go语言分布式锁的实现
Redis
https://github.com/zeromicro/go-zero/blob/master/core/stores/redis/redislock.go
go-zero里已经实现了redislock,但没有续约机制
自己的实现
- 为什么需要自己封装? 用的框架里没有,去参考了go-zero,里面的分布式锁有基础的加锁和解锁,缺少续约
// 需要实现的能力
// 1.排他性、原子性
// 2.主动释放/自动释放
// 3.可重入
// 4.可续约
package r_lc
import (
"context"
"errors"
"github.com/go-redis/redis/v8"
"time"
"github.com/google/uuid"
)
// 实现以下4个方法
type Lock interface {
// TryLock 尝试锁
TryLock(ctx context.Context) (lcNum int, res bool)
// LockWait 尝试锁并等待
LockWait(ctx context.Context, wait time.Duration) (lcNum int, res bool)
// Renew 续约
Renew(ctx context.Context)
// Unlock 解锁
Unlock(ctx context.Context) (leftLcNum int, err error)
}
// RLc 基于redis的分布式锁
type RLc struct {
rdb *redis.Client
// key 锁标识
key string
// lcTag 唯一标识,防止串锁
lcTag string
// expiresIn 过期时间
expiresIn time.Duration
// releaseCh 锁释放信号 (看门狗)
releaseCh chan struct{}
// RetryInterval LockWait重试锁的间隔。默认100ms
RetryInterval time.Duration
// RenewInterval 续约锁间隔,默认为expiresIn/2
RenewInterval time.Duration
// MaxRenewDur 自动续约最长时间。默认1小时,当expiresIn大于1小时,为expiresIn
MaxRenewDur time.Duration
}
type RlcOpt func(lc *RLc)
const (
retryIntervalDefault = 100 * time.Millisecond
maxRenewDurDefault = time.Hour
)
// LUA脚本
var (
// tryLockLua
// return 0. 加锁失败
// return >0. 加锁成功,当前锁的数量
tryLockLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]
-- 锁不存在,加锁
if redis.call('EXISTS', key) == 0 then
redis.call('HINCRBY', key, val, 1)
redis.call('PEXPIRE', key, expiresIn)
return 1
end
-- 锁存在,判断持有锁,增加加锁次数 (可重入)
if redis.call('HEXISTS', key, val) == 1 then
return redis.call('HINCRBY', key, val, 1)
end
-- 锁被其他进程占用
return 0
`
// unlockLua
// return > 0. 剩余待解锁次数
// return = 0. 解锁成功
// return = -1. 锁不存在 | 未持有锁
unlockLua = `
local key = KEYS[1]
local val = ARGV[1]
-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 then
return -1
end
-- 按次数解锁
local count = redis.call('HINCRBY', key, val, -1)
if count <= 0 then
-- 全部解锁
redis.call("DEL",key)
return 0
end
-- 剩余待解锁次数
return count
`
// renewLua
// return 0. 续约失败
// return 1. 续约成功
renewLua = `
local key = KEYS[1]
local val = ARGV[1]
local expiresIn = ARGV[2]
-- 锁不存在或未持有锁
if redis.call('HEXISTS', key, val) == 0 then
return 0
end
-- 设置过期时间
return redis.call('PEXPIRE', key, expiresIn)
`
)
var (
ErrLostKey = errors.New("lost key") // 锁不存在或被其他进程占用
)
func NewRLc(rdb *redis.Client, key string, expiresIn time.Duration, opts ...RlcOpt) *RLc {
lc := &RLc{
rdb: rdb,
key: key,
lcTag: uuid.New().String(),
expiresIn: expiresIn,
releaseCh: make(chan struct{}),
}
for _, opt := range opts {
opt(lc)
}
if lc.RetryInterval == 0 {
lc.RetryInterval = retryIntervalDefault
}
if lc.RenewInterval == 0 {
lc.RenewInterval = lc.expiresIn / 2
}
if lc.MaxRenewDur == 0 {
lc.MaxRenewDur = maxRenewDurDefault
if lc.MaxRenewDur < lc.expiresIn {
lc.MaxRenewDur = lc.expiresIn
}
}
return lc
}
// TryLock 尝试锁
func (lc *RLc) TryLock(ctx context.Context) (lcNum int, res bool) {
lua := redis.NewScript(tryLockLua)
lcNum, _ = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()
if lcNum == 0 {
return 0, false
}
return lcNum, true
}
// LockWait 尝试锁并等待
func (lc *RLc) LockWait(ctx context.Context, wait time.Duration) (lcNum int, res bool) {
ctx, cancel := context.WithTimeout(ctx, wait)
defer cancel()
jumpEnd:
for {
select {
case <-ctx.Done():
break jumpEnd
case <-lc.releaseCh:
break jumpEnd
default:
lcNum, res = lc.TryLock(ctx)
if res {
return
}
time.Sleep(lc.RetryInterval)
}
}
return 0, false
}
// Unlock 解锁
func (lc *RLc) Unlock(ctx context.Context) (leftLcNum int, err error) {
lua := redis.NewScript(unlockLua)
leftLcNum, err = lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag).Int()
if err != nil {
return 0, err
}
if leftLcNum < 0 {
return 0, ErrLostKey
}
if leftLcNum == 0 {
close(lc.releaseCh)
}
return
}
// Renew 续约
// expiresIn=0时,会使用初始化时设定的expiresIn
func (lc *RLc) Renew(ctx context.Context) {
// 限制续约最大持续时间,减少协程泄露影响
ctx, cancel := context.WithTimeout(ctx, lc.MaxRenewDur)
// 续约
go func(lc *RLc) {
for {
select {
case <-ctx.Done():
return
case <-lc.releaseCh:
cancel()
return
default:
lua := redis.NewScript(renewLua)
res, _ := lua.Run(ctx, lc.rdb, []string{lc.key}, lc.lcTag, lc.expiresIn.Milliseconds()).Int()
if res == 0 {
cancel()
}
time.Sleep(lc.RenewInterval)
}
}
}(lc)
return
}
单元测试
两个测试函数主要是针对分布式锁的获取、释放、续约操作进行单元测试,以确保实现的正确性。
package r_lc
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"runtime"
"testing"
"time"
)
func getRdb() (rdb *redis.Client, err error) {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 1,
})
_, err = rdb.Ping(context.TODO()).Result()
if err != nil {
return
}
return
}
func TestLockWait(t *testing.T) {
rdb, err := getRdb()
if err != nil {
t.Fatal(err)
}
// lock1 和 lock2 子测试分别调用 lockWaitFunc 函数,模拟两个协程(goroutine)尝试获取同一个分布式锁的情况。
t.Run("lock1", func(t1 *testing.T) {
go func() {
lockWaitFunc(t1, rdb, 10*time.Second)
}()
})
time.Sleep(10 * time.Millisecond)
t.Run("lock2", func(t2 *testing.T) {
go func() {
lockWaitFunc(t2, rdb, 5*time.Second)
}()
})
// - 通过 time.Sleep 和 runtime.NumGoroutine 观察各个时间点的协程数量变化,判断是否正确地进行了锁的获取、释放和续约。
fmt.Println("0s NumGoroutine", runtime.NumGoroutine())
time.Sleep(3 * time.Second)
fmt.Println("3s NumGoroutine", runtime.NumGoroutine()) // 续约协程启动
time.Sleep(2 * time.Second)
fmt.Println("5s NumGoroutine", runtime.NumGoroutine()) // lock2 续约协程释放
time.Sleep(5 * time.Second)
fmt.Println("10s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程保持
time.Sleep(5 * time.Second)
fmt.Println("15s NumGoroutine", runtime.NumGoroutine()) // lock1 续约协程释放
time.Sleep(2 * time.Second)
}
func TestRetry(t *testing.T) {
rdb, err := getRdb()
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
// 初始化锁信息
lc := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {
lc.MaxRenewDur = time.Second * 10
lc.RenewInterval = time.Second * 5
lc.RetryInterval = 10 * time.Millisecond
})
lc2 := NewRLc(rdb, "test-lock", 5*time.Second, func(lc *RLc) {
lc.MaxRenewDur = time.Second * 10
lc.RenewInterval = time.Second * 5
lc.RetryInterval = 10 * time.Millisecond
})
// 启动续约
lc.Renew(ctx)
// 调用 lc.TryLock 尝试获取锁,观察获取锁的结果。等待 5 秒后,尝试使用 lc2 实例获取锁并观察结果。调用 lc.Unlock 多次尝试解锁,观察解锁的结果。
fmt.Println(lc.TryLock(ctx))
fmt.Println(lc.TryLock(ctx))
fmt.Println(lc.TryLock(ctx))
time.Sleep(5 * time.Second)
fmt.Println("wwwww")
fmt.Println(lc2.LockWait(ctx, 10*time.Second))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
fmt.Println(lc.Unlock(ctx))
return
}
func lockWaitFunc(t *testing.T, rdb *redis.Client, wait time.Duration) {
ctx := context.Background()
// 初始化锁信息
lc := NewRLc(rdb, "test-lock", 15*time.Second)
// 阻塞式获取锁
_, getLock := lc.LockWait(ctx, wait)
if getLock == false {
fmt.Println("获取锁超时")
t.Log("获取锁超时")
return
}
defer lc.Unlock(ctx)
// 启动续约
lc.Renew(ctx)
// 处理业务代码
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
}
return
}
使用分布式锁
// 加分布式锁
contxt := ctx.GetSpanCtx()
lc := r_lc.NewRLc(server.GetRedisClient(), config.GetDistributedLockKey(fmt.Sprintf("user_info:%s", wsId)), 30*time.Second)
_, lcRes := lc.LockWait(contxt, 30*time.Second)
if lcRes == false {
err = my_err.WrapF(err, Err.ErrCodeSysRequestTimeout, "user_info lc.LockWait wsId:%s", wsId)
return
}
defer lc.Unlock(contxt)
lc.Renew(contxt)
测试结果,可以使用压测工具
jmeter安装方法:https://blog.csdn.net/u012375924/article/details/112613202
压测资料:
https://blog.csdn.net/vxzhg/article/details/118548270
https://blog.csdn.net/IT_LanTian/article/details/134267994
红锁是什么
红锁算法(Redlock)是一种分布式锁的实现算法,由 Redis 的作者 Antirez 发布。它主要用于解决分布式环境下的资源争用问题,同时保证锁的可靠性和安全性。红锁算法通过在多个 Redis 节点上创建锁,要求获得锁的客户端必须在大多数节点上成功创建锁,从而确保在分布式环境中只有一个客户端可以获得锁。
红锁算法的基本步骤如下:
- 客户端获取当前系统时间。
- 客户端尝试在 N 个 Redis 节点上创建锁,设置锁的过期时间为过期时间加上一个小的时延。
- 如果客户端在大多数节点上成功创建了锁(N/2+1),则认为客户端获得了锁。客户端应将锁的有效期设置为从步骤1开始计算的实际过期时间。
- 如果客户端未能在大多数节点上创建锁,那么客户端需要删除在其他节点上创建的锁,并等待一段随机时间后重新尝试。
别人的带红锁的实现
https://juejin.cn/post/7148391514966589477
etcd
todo 待研究
库:https://github.com/etcd-io/etcd
https://juejin.cn/post/7148391514966589477
https://www.liwenzhou.com/posts/Go/etcd/
zk的实现
todo 待研究
库:https://github.com/samuel/go-zookeeper
zookeeper简称zk,zk是通过生成临时有序节点来实现分布式锁的,首先会在/lock目录下一个临时有序节点,后续请求会在节点后面继续创建临时节点。新的子节点后面,会添加一个次序编号,这个生成的编号,会在上一次的编号进行 +1 操作。
zk节点监听机制:每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个依次向后。
zk临时节点自动删除:当我们客户端断开连接之后,我们出创建的临时节点会进行自动删除操作,所以我们在使用分布式锁的时候,一般都是会去创建临时节点,这样可以避免因为网络异常等原因,造成的死锁。
面试问题
什么是分布式锁?你用过分布式锁吗?
你使用的分布式锁性能如何,可以优化吗?
锁续期设置了最大持续时间,如果没设置的话续约的 Goroutine 会一直进行,占用系统资源
怎么用Redis来实现一个分布式锁?
// 1.排他性、原子性
// 2.主动释放/自动释放
// 3.可重入
// 4.可续约
第一个特性就是互斥,即保证不同节点、不同线程的互斥访问,这部分知识我们在上面已经讨论过,就不再赘述了。
第二个特性是超时机制,即超时设置,防止死锁,分布式锁才有这个特性。在概述篇的第二节课“新的挑战”中,我们讨论过部分失败和异步网络的问题,而这个问题在分布式锁的场景下就会出现。因为锁服务和请求锁的服务分散在不同的机器上面,它们之间是通过网络来通信的,所以我们需要用超时机制,来避免获得锁的节点故障或者网络异常,导致它持有的锁不能归还,出现死锁的情况。同时,我们还要考虑,持有锁的节点需要处理的临界区代码非常耗时这种问题,我们可以通过另一个线程或者协程不断延长超时时间,避免出现锁操作还没有处理完,锁就被释放,之后其他的节点再获得锁,导致锁的互斥失败这种情况。对于超时机制,我们可以在每一次成功获得锁的时候,为锁设置一个超时时间,获得锁的节点与锁服务保持心跳,锁服务每一次收到心跳,就延长锁的超时时间,这样就可以解决上面的两个问题了。
第三个特性是完备的锁接口,即阻塞接口 Lock 和非阻塞接口 tryLock。通过阻塞 Lock 接口获取锁,如果当前锁已经被其他节点获得了,锁服务将获取锁的请求挂起,直到获得锁为止,才响应获取锁的请求;通过 tryLock 接口获取锁,如果当前锁已经被其他节点获得了,锁服务直接返回失败,不会挂起当前锁的请求。
第四个特性是可重入性,即一个节点的一个线程已经获取了锁,那么该节点持有锁的这个线程可以再次成功获取锁。我们只需在锁服务处理加锁请求的时候,记录好当前获取锁的节点 + 线程组合的唯一标识,然后在后续的加锁请求时,如果当前请求的节点 + 线程的唯一标识和当前持有锁的相同,那么就直接返回加锁成功,如果不相同,则按正常加锁流程处理。
公平性(未实现),即对于 Lock 接口获取锁失败被阻塞等待的加锁请求,在锁被释放后,如果按先来后到的顺序,将锁颁发给等待时间最长的一个加锁请求,那么就是公平锁,否则就是非公平锁。锁的公平性的实现也非常简单,对于被阻塞的加锁请求,我们只要先记录好它们的顺序,在锁被释放后,按顺序颁发就可以了。
——参考极客时间《深入浅出分布式技术原理》,不算写的很好吧,有一点价值
怎么确定分布式锁的过期时间?
这个超时时间需要根据业务场景进行压测然后根据压测结果进行评估,在压测结果上进行稍微放大1~2倍。
如果分布式锁过期了,但是业务还没有执行完毕,怎么办?
锁续期