分布式锁的基本概念
在 Redis 中实现分布式锁的常用方式是通过 SETNX
命令(SET
with NX
option)来设置一个键(key
),这个键代表锁。如果 key
不存在,SETNX
会设置成功,并返回 1
,表示成功获取锁。如果 key
已经存在,SETNX
会返回 0
,表示获取锁失败。
线程号在 Redis 锁中的作用
通常,为了确保在释放锁时只有锁的持有者能够删除这个锁,会在锁的 key
或 value
中保存线程标识符或节点标识符(例如,线程 ID 或唯一的随机值)。这样,在解锁时,可以验证当前操作的线程是否为持有该锁的线程。
你提到的问题分析
1. 关于 key
和线程号
如果你理解的是在锁的 key
中添加线程号,那确实会导致每个线程设置的 key
不一样。例如,假设线程 A 设置的 key
为 lock:threadA
,线程 B 设置的 key
为 lock:threadB
,那么它们各自的 key
是独立的,互不影响。这种情况下,确实每个线程都会成功设置自己的 key
,但这实际上并不是我们想要的分布式锁效果,因为每个线程都能创建自己的锁,而不会有竞争关系。
2. 正确的做法:在 value
中保存线程标识符
通常情况下,锁的 key
是固定的,比如 lock:resource_id
,表示对某个资源的锁,而不同线程或节点通过在 value
中存储唯一的标识符来区分持有锁的线程。
示例:
go复制代码// 线程 A 尝试获取锁
SET lock:resource_id "threadA" NX PX 30000
// 线程 B 尝试获取锁
SET lock:resource_id "threadB" NX PX 30000
- 如果线程 A 成功获取锁,那么
lock:resource_id
的value
就是"threadA"
。 - 如果线程 B 尝试获取锁,因为
key
已经存在,操作会失败。
3. 释放锁
当线程 A 需要释放锁时,会先检查 key
的 value
是否等于 "threadA"
,如果是,才能删除锁:
go复制代码if GET lock:resource_id == "threadA" {
DEL lock:resource_id
}
这样,只有持有锁的线程才能释放锁,避免其他线程误删。
总结
- 锁的
key
:应固定,表示要锁定的资源。 - 锁的
value
:保存线程或节点的唯一标识符,用于区分持有锁的线程或节点。
在 Redis 分布式锁中,所有线程都应该尝试设置相同的 key
,而区分不同线程的是 value
中的标识符。你提到的问题可能是在理解时将线程标识符放在了 key
中,这样会导致每个线程创建的 key
不一样,从而失去了锁的作用。
在 Go 中结合 Redis 实现一个简单的分布式锁,通常可以使用 Redis 的 SET
命令和一些关键选项,如 NX
(表示仅在键不存在时才设置键)和 PX
(设置键的过期时间,以毫秒为单位)。以下是一个示例代码,演示如何使用 Redis 实现一个分布式锁。
1. 环境准备
首先,确保你已经安装了 Redis,并在 Go 项目中引入了 Redis 的 Go 客户端库 go-redis
。
bash
复制代码
go get github.com/redis/go-redis/v9
2. 实现分布式锁
以下代码展示了如何在 Go 中使用 Redis 实现一个简单的分布式锁。
go复制代码package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
var ctx = context.Background()
// 尝试获取分布式锁
func acquireLock(client *redis.Client, key string, value string, expiration time.Duration) (bool, error) {
// SET key value NX PX expiration
result, err := client.SetNX(ctx, key, value, expiration).Result()
if err != nil {
return false, err
}
return result, nil
}
// 释放分布式锁
func releaseLock(client *redis.Client, key string, value string) (bool, error) {
// Lua脚本保证原子性
luaScript := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
result, err := client.Eval(ctx, luaScript, []string{key}, value).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
func main() {
// 创建Redis客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis服务器地址
})
// 锁的key和value
lockKey := "my_lock"
lockValue := "unique_value" // 每个线程应生成唯一的值,通常是UUID或线程ID
// 尝试获取锁,设置10秒过期
locked, err := acquireLock(client, lockKey, lockValue, 10*time.Second)
if err != nil {
fmt.Println("Error acquiring lock:", err)
return
}
if locked {
fmt.Println("Lock acquired successfully!")
// 执行需要保护的操作...
// 操作完成后释放锁
unlocked, err := releaseLock(client, lockKey, lockValue)
if err != nil {
fmt.Println("Error releasing lock:", err)
return
}
if unlocked {
fmt.Println("Lock released successfully!")
} else {
fmt.Println("Failed to release lock!")
}
} else {
fmt.Println("Failed to acquire lock, another process might hold it.")
}
}
3. 代码说明
- 获取锁 (
acquireLock
):- 使用
SETNX
命令尝试获取锁。如果key
不存在,则设置该key
,同时指定过期时间,确保锁在超时后会自动释放。 - 如果
key
已经存在,SETNX
返回false
,表示获取锁失败。
- 使用
- 释放锁 (
releaseLock
):- 为了防止误释放锁(例如:锁已过期并被其他线程重新获取),我们使用 Lua 脚本保证删除操作的原子性。
- 只有当
key
的value
与当前线程持有的锁的value
一致时,才删除锁。
- 主函数 (
main
):- 创建 Redis 客户端,连接到 Redis 服务器。
- 尝试获取锁并进行保护的操作。
- 完成操作后释放锁。
4. 扩展
在实际应用中,分布式锁可能需要更多功能,例如自动续期、死锁检测等。这些功能可以根据具体需求进行扩展。还可以使用现成的库,如 Redlock 实现更复杂的分布式锁机制。
看门狗机制
分布式锁中的“开门狗机制”是用来解决锁过期时间不足而导致的锁提前释放的问题。开门狗机制可以自动延长锁的有效期,防止在锁持有者还在执行任务时锁被释放,从而避免其他客户端意外获得锁。
为什么需要开门狗机制?
当一个客户端获取分布式锁时,它通常会设置一个锁的过期时间(TTL),以防止锁因为某些意外原因(例如客户端崩溃)而永远不被释放。TTL 确保了即使客户端没有主动释放锁,锁也会在一定时间后自动释放。
然而,任务执行的时间有时可能比预期的要长。如果没有合适的机制,锁的 TTL 到期后,锁会被自动释放,导致其他客户端可能在任务尚未完成时获得锁,进而引发数据一致性问题。 看门狗可以更新所得到期时间
看门狗机制的工作原理
开门狗机制主要包括以下步骤:
- 获取锁并设置初始TTL:客户端获取锁时,设置一个初始的 TTL(例如 10 秒)。
- 定期续约:在锁持有期间,客户端启动一个后台任务(开门狗),定期检查锁的状态。如果客户端依然持有锁,并且任务还在执行,开门狗会延长锁的TTL。例如,每隔一半的TTL时间(例如5秒),将锁的TTL重置为原来的TTL时间(例如10秒)。
- 释放锁:一旦任务完成,客户端主动释放锁,同时停止开门狗。
示例代码
以下是一个简单的示例,演示如何在 Go 中实现带开门狗机制的分布式锁。
go复制代码package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
var ctx = context.Background()
// 获取分布式锁,带初始TTL
func acquireLock(client *redis.Client, key string, value string, expiration time.Duration) (bool, error) {
result, err := client.SetNX(ctx, key, value, expiration).Result()
if err != nil {
return false, err
}
return result, nil
}
// 续约锁的TTL
func renewLock(client *redis.Client, key string, value string, expiration time.Duration) error {
luaScript := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`
_, err := client.Eval(ctx, luaScript, []string{key}, value, int(expiration.Milliseconds())).Result()
return err
}
// 释放分布式锁
func releaseLock(client *redis.Client, key string, value string) (bool, error) {
luaScript := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
result, err := client.Eval(ctx, luaScript, []string{key}, value).Int()
if err != nil {
return false, err
}
return result == 1, nil
}
// 开门狗机制,定期续约锁的TTL
func startWatchdog(client *redis.Client, key string, value string, expiration time.Duration, interval time.Duration, stopChan chan bool) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 续约锁的TTL
err := renewLock(client, key, value, expiration)
if err != nil {
fmt.Println("Error renewing lock:", err)
return
}
fmt.Println("Lock renewed for another", expiration)
case <-stopChan:
fmt.Println("Watchdog stopped")
return
}
}
}
func main() {
// 创建Redis客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 锁的key和value
lockKey := "my_lock"
lockValue := "unique_value"
// 尝试获取锁,设置初始TTL为10秒
locked, err := acquireLock(client, lockKey, lockValue, 10*time.Second)
if err != nil {
fmt.Println("Error acquiring lock:", err)
return
}
if locked {
fmt.Println("Lock acquired successfully!")
// 启动开门狗机制,间隔5秒续约,TTL为10秒
stopChan := make(chan bool)
go startWatchdog(client, lockKey, lockValue, 10*time.Second, 5*time.Second, stopChan)
// 模拟执行任务
time.Sleep(15 * time.Second)
// 任务完成后释放锁
unlocked, err := releaseLock(client, lockKey, lockValue)
if err != nil {
fmt.Println("Error releasing lock:", err)
return
}
if unlocked {
fmt.Println("Lock released successfully!")
} else {
fmt.Println("Failed to release lock!")
}
// 停止开门狗
stopChan <- true
} else {
fmt.Println("Failed to acquire lock, another process might hold it.")
}
}