深入理解Redis锁与Backoff重试机制在Go中的实现

news2024/11/25 16:57:20

文章目录

    • 流程图
    • Redis锁的深入实现
    • Backoff重试策略的深入探讨
    • 结合Redis锁与Backoff策略的高级应用
    • 具体实现
    • 结论

在构建分布式系统时,确保数据的一致性和操作的原子性是至关重要的。Redis锁作为一种高效且广泛使用的分布式锁机制,能够帮助我们在多进程或分布式环境中同步访问共享资源。本文将深入探讨如何在Go语言中实现Redis锁,并结合Backoff重试策略来优化锁的获取过程,确保系统的健壮性和可靠性。

流程图

获取成功
获取失败
开始
尝试获取锁
执行操作
操作成功?
释放锁
重试操作
应用Backoff策略
重试成功?
结束

Redis锁的深入实现

在Go语言中,我们使用github.com/gomodule/redigo/redis包来操作Redis。Redis锁的实现依赖于Redis的SET命令,该命令支持设置键值对,并且可以带有过期时间(EX选项)和仅当键不存在时才设置(NX选项)。以下是一个更详细的Redis锁实现示例:

func SetWithContext(ctx context.Context, redisPool *redis.Pool, key string, expireSecond uint32) (bool, string, error) {
    // ...省略部分代码...
    conn, err := redisPool.GetContext(ctx)
    if err != nil {
        return false, "", err
    }
    defer conn.Close()

    randVal := generateRandVal() // 生成随机值
    _, err = conn.Do("SET", key, randVal, "NX", "EX", int(expireSecond))
    if err != nil {
        return false, "", err
    }

    return true, randVal, nil
}

在上述代码中,generateRandVal()函数用于生成一个唯一的随机值,这个值在释放锁时用来验证是否是锁的持有者。expireSecond参数确保了即使客户端崩溃或网络问题发生,锁也会在一定时间后自动释放,避免死锁。

释放锁时,我们使用Lua脚本来确保只有持有锁的客户端才能删除键:

func ReleaseWithContext(ctx context.Context, redisPool *redis.Pool, key string, randVal string) error {
    // ...省略部分代码...
    conn, err := redisPool.GetContext(ctx)
    if err != nil {
        return err
    }
    defer conn.Close()

    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `
    _, err = conn.Do("EVAL", script, 1, key, randVal)
    return err
}

Backoff重试策略的深入探讨

在分布式系统中,获取锁可能会因为网络延迟、高负载或其他原因而失败。Backoff重试策略通过在重试之间引入等待时间来减轻这些问题的影响。在提供的代码中,我们定义了多种Backoff策略,每种策略都有其特定的使用场景和优势。

例如,指数退避策略ExponentialBackoff的实现如下:

func (b *ExponentialBackoff) Next(retry int) (time.Duration, bool) {
    // ...省略部分代码...
    m := math.Min(r*b.t*math.Pow(b.f, float64(retry)), b.m)
    if m >= b.m {
        return 0, false
    }
    d := time.Duration(int64(m)) * time.Millisecond
    return d, true
}

在这个策略中,重试间隔随重试次数的增加而指数级增长,但有一个最大值限制。这有助于在遇到连续失败时,逐步增加等待时间,避免立即重载系统。

结合Redis锁与Backoff策略的高级应用

将Redis锁与Backoff策略结合起来,可以创建一个健壮的锁获取机制。例如,我们可以定义一个MustSetRetry方法,该方法会不断尝试获取锁,直到成功为止:

func (r *RedisLock) MustSetRetry(ctx context.Context, key string) (string, error) {
    op := func() (string, error) {
        return r.MustSet(ctx, key)
    }

    notifyFunc := func(err error) {
        // ...错误处理逻辑...
    }

    return mustSetRetryNotify(op, r.backoff, notifyFunc)
}

在这个方法中,mustSetRetryNotify函数负责执行重试逻辑,直到MustSet方法成功获取锁或达到最大重试次数。通过这种方式,我们能够确保即使在高竞争环境下,也能以一种可控和安全的方式获取锁。

具体实现

  1. backoff
package lock

import (
	"math"
	"math/rand"
	"sync"
	"time"
)

// BackoffFunc specifies the signature of a function that returns the
// time to wait before the next call to a resource. To stop retrying
// return false in the 2nd return value.
type BackoffFunc func(retry int) (time.Duration, bool)

// Backoff allows callers to implement their own Backoff strategy.
type Backoff interface {
	// Next implements a BackoffFunc.
	Next(retry int) (time.Duration, bool)
}

// -- ZeroBackoff --

// ZeroBackoff is a fixed backoff policy whose backoff time is always zero,
// meaning that the operation is retried immediately without waiting,
// indefinitely.
type ZeroBackoff struct{}

// Next implements BackoffFunc for ZeroBackoff.
func (b ZeroBackoff) Next(retry int) (time.Duration, bool) {
	return 0, true
}

// -- StopBackoff --

// StopBackoff is a fixed backoff policy that always returns false for
// Next(), meaning that the operation should never be retried.
type StopBackoff struct{}

// Next implements BackoffFunc for StopBackoff.
func (b StopBackoff) Next(retry int) (time.Duration, bool) {
	return 0, false
}

// -- ConstantBackoff --

// ConstantBackoff is a backoff policy that always returns the same delay.
type ConstantBackoff struct {
	interval time.Duration
}

// NewConstantBackoff returns a new ConstantBackoff.
func NewConstantBackoff(interval time.Duration) *ConstantBackoff {
	return &ConstantBackoff{interval: interval}
}

// Next implements BackoffFunc for ConstantBackoff.
func (b *ConstantBackoff) Next(retry int) (time.Duration, bool) {
	return b.interval, true
}

// -- Exponential --

// ExponentialBackoff implements the simple exponential backoff described by
// Douglas Thain at http://dthain.blogspot.de/2009/02/exponential-backoff-in-distributed.html.
type ExponentialBackoff struct {
	t float64 // initial timeout (in msec)
	f float64 // exponential factor (e.g. 2)
	m float64 // maximum timeout (in msec)
}

// NewExponentialBackoff returns a ExponentialBackoff backoff policy.
// Use initialTimeout to set the first/minimal interval
// and maxTimeout to set the maximum wait interval.
func NewExponentialBackoff(initialTimeout, maxTimeout time.Duration) *ExponentialBackoff {
	return &ExponentialBackoff{
		t: float64(int64(initialTimeout / time.Millisecond)),
		f: 2.0,
		m: float64(int64(maxTimeout / time.Millisecond)),
	}
}

// Next implements BackoffFunc for ExponentialBackoff.
func (b *ExponentialBackoff) Next(retry int) (time.Duration, bool) {
	r := 1.0 + rand.Float64() // random number in [1..2]
	m := math.Min(r*b.t*math.Pow(b.f, float64(retry)), b.m)
	if m >= b.m {
		return 0, false
	}
	d := time.Duration(int64(m)) * time.Millisecond
	return d, true
}

// -- Simple Backoff --

// SimpleBackoff takes a list of fixed values for backoff intervals.
// Each call to Next returns the next value from that fixed list.
// After each value is returned, subsequent calls to Next will only return
// the last element. The values are optionally "jittered" (off by default).
type SimpleBackoff struct {
	sync.Mutex
	ticks  []int
	jitter bool
}

// NewSimpleBackoff creates a SimpleBackoff algorithm with the specified
// list of fixed intervals in milliseconds.
func NewSimpleBackoff(ticks ...int) *SimpleBackoff {
	return &SimpleBackoff{
		ticks:  ticks,
		jitter: false,
	}
}

// Jitter enables or disables jittering values.
func (b *SimpleBackoff) Jitter(flag bool) *SimpleBackoff {
	b.Lock()
	b.jitter = flag
	b.Unlock()
	return b
}

// jitter randomizes the interval to return a value of [0.5*millis .. 1.5*millis].
func jitter(millis int) int {
	if millis <= 0 {
		return 0
	}
	return millis/2 + rand.Intn(millis)
}

// Next implements BackoffFunc for SimpleBackoff.
func (b *SimpleBackoff) Next(retry int) (time.Duration, bool) {
	b.Lock()
	defer b.Unlock()

	if retry >= len(b.ticks) {
		return 0, false
	}

	ms := b.ticks[retry]
	if b.jitter {
		ms = jitter(ms)
	}
	return time.Duration(ms) * time.Millisecond, true
}

关键Backoff策略:

  • ZeroBackoff: 不等待,立即重试。
  • StopBackoff: 从不重试。
  • ConstantBackoff: 固定等待时间。
  • ExponentialBackoff: 指数增长的等待时间。
  • SimpleBackoff: 提供一组固定的等待时间,可选择是否添加随机抖动。
package lock

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/gomodule/redigo/redis"
)

var (
	// 防止孤儿lock没release
	// 目前expire过期时间的敏感度是考虑为一致的敏感度
	defaultExpireSecond uint32 = 30
)

var (
	ErrLockSet     = errors.New("lock set err")
	ErrLockRelease = errors.New("lock release err")
	ErrLockFail    = errors.New("lock fail")
)

// RedisLockIFace 在common redis上封一层浅封装
// 将redis pool 与expire second作为redis lock已知数据
type RedisLockIFace interface {
	MustSet(ctx context.Context, k string) (string, error)
	MustSetRetry(ctx context.Context, k string) (string, error) // 必须设置成功并有重试机制
	Release(ctx context.Context, k string, randVal string) error
}

// RedisLock nil的实现默认为true
type RedisLock struct {
	redisPool    *redis.Pool
	expireSecond uint32
	backoff      Backoff
}

// An Option configures a RedisLock.
type Option interface {
	apply(*RedisLock)
}

// optionFunc wraps a func so it satisfies the Option interface.
type optionFunc func(*RedisLock)

func (f optionFunc) apply(log *RedisLock) {
	f(log)
}

// WithBackoff backoff set
func WithBackoff(b Backoff) Option {
	return optionFunc(func(r *RedisLock) {
		r.backoff = b
	})
}

func NewRedisLock(redisPool *redis.Pool, opts ...Option) *RedisLock {

	r := &RedisLock{
		redisPool:    redisPool,
		expireSecond: defaultExpireSecond,
		backoff:      NewExponentialBackoff(30*time.Millisecond, 500*time.Millisecond), // default backoff
	}

	for _, opt := range opts {
		opt.apply(r)
	}

	return r
}

func (r *RedisLock) Set(ctx context.Context, key string) (bool, string, error) {

	if r == nil {
		return true, "", nil
	}

	isLock, randVal, err := SetWithContext(ctx, r.redisPool, key, r.expireSecond)
	if err != nil {
		return isLock, randVal, ErrLockSet
	}

	return isLock, randVal, err
}

// MustSetRetry 必须设置成功并带有重试功能
func (r *RedisLock) MustSetRetry(ctx context.Context, key string) (string, error) {

	op := func() (string, error) {
		return r.MustSet(ctx, key)
	}

	notifyFunc := func(err error) {
		if err == ErrLockFail {
			fmt.Printf("RedisLock.MustSetRetry redis must set err: %v", err)
		} else {
			fmt.Printf("RedisLock.MustSetRetry redis must set err: %v", err)
		}
	}

	return mustSetRetryNotify(op, r.backoff, notifyFunc)
}

func (r *RedisLock) MustSet(ctx context.Context, key string) (string, error) {

	isLock, randVal, err := r.Set(ctx, key)
	if err != nil {
		return "", err
	}

	if !isLock {
		return "", ErrLockFail
	}

	return randVal, nil
}

func (r *RedisLock) Release(ctx context.Context, key string, randVal string) error {

	if r == nil {
		fmt.Printf("that the implementation of redis lock is nil")
		return nil
	}

	err := ReleaseWithContext(ctx, r.redisPool, key, randVal)
	if err != nil {
		fmt.Printf("s.RedisLock.ReleaseWithContext fail, err: %v", err)
		return ErrLockRelease
	}

	return nil
}

func SetWithContext(ctx context.Context, redisPool *redis.Pool, key string, expireSecond uint32) (bool, string, error) {
	if expireSecond == 0 {
		return false, "", fmt.Errorf("expireSecond参数必须大于0")
	}

	conn, _ := redisPool.GetContext(ctx)
	defer conn.Close()

	randVal := time.Now().Format("2006-01-02 15:04:05.000")
	reply, err := conn.Do("SET", key, randVal, "NX", "PX", expireSecond*1000)
	if err != nil {
		return false, "", err
	}
	if reply == nil {
		return false, "", nil
	}

	return true, randVal, nil
}

func ReleaseWithContext(ctx context.Context, redisPool *redis.Pool, key string, randVal string) error {
	conn, _ := redisPool.GetContext(ctx)
	defer conn.Close()

	luaScript := `
		if redis.call("get", KEYS[1]) == ARGV[1] then
			return redis.call("del", KEYS[1])
		else
			return 0
		end;
	`
	script := redis.NewScript(1, luaScript)
	_, err := script.Do(conn, key, randVal)

	return err
}

  1. 重试
package lock

import "time"

type mustSetOperation func() (string, error)

type ErrNotify func(error)

func mustSetRetryNotify1(operation mustSetOperation, b Backoff, notify ErrNotify) (string, error) {

	var err error
	var randVal string
	var wait time.Duration
	var retry bool
	var n int

	for {

		if randVal, err = operation(); err == nil {
			return randVal, nil
		}

		if b == nil {
			return "", err
		}

		n++
		wait, retry = b.Next(n)
		if !retry {
			return "", err
		}

		if notify != nil {
			notify(err)
		}

		time.Sleep(wait)
	}

}

  1. 使用

func main() {
	backoff := lock.NewExponentialBackoff(
		time.Duration(20)*time.Millisecond,
		time.Duration(1000)*time.Millisecond,
	)
	redisPool := &redis.Pool{
		MaxIdle:     3,
		IdleTimeout: 240 * time.Second,
		// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
		Dial: func() (redis.Conn, error) {
			return redis.Dial("tcp",
				"redis host",
				redis.DialPassword("redis password"),
			)
		},
	}
	redisLock := lock.NewRedisLock(redisPool, lock.WithBackoff(backoff))
	ctx := context.Background()
	s, err := redisLock.MustSetRetry(ctx, "lock_user")
	if err != nil && err == lock.ErrLockFail {
		fmt.Println(err)
		return
	}
	
	time.Sleep(20 * time.Second)
	defer func() {
		_ = redisLock.Release(ctx, "lock_user", s)
	}()
	return
}

结论

通过深入理解Redis锁和Backoff重试策略的实现,我们可以构建出既能够保证资源访问的原子性,又能在面对网络波动或系统负载时保持稳定性的分布式锁机制。这不仅提高了系统的可用性,也增强了系统的容错能力。在实际开发中,合理选择和调整这些策略对于确保系统的高性能和高可靠性至关重要。通过精心设计的锁机制和重试策略,我们可以为分布式系统提供一个坚实的基础,以应对各种挑战和压力。

关注我

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2221291.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue+ECharts+iView实现大数据可视化大屏模板

Vue数据可视化 三个大屏模板 样式还是比较全的 包括世界地图、中国地图、canvas转盘等 项目演示&#xff1a; 视频&#xff1a; vue大数据可视化大屏模板

神经网络模型内部

给大家展示一个三层4*24*24*2神经网络文件的内部&#xff1a; 大小5.06KB 想知道这个模型是怎么训练生成的看我的上一篇文章 用神经网络自动玩游戏

Centos7安装ZLMediaKit

https://github.com/ZLMediaKit/ZLMediaKit 一 获取代码 git clone https://gitee.com/xia-chu/ZLMediaKit cd ZLMediaKit git submodule update --init git submodule update --init 命令用于初始化和更新 Git 仓库中的子模块&#xff08;submodules&#xff09;。这个命令…

vue3 + ts + element-plus 二次封装 el-dialog

实现效果&#xff1a; 组件代码&#xff1a;注意 style 不能为 scoped <template><el-dialog class"my-dialog" v-model"isVisible" :show-close"false" :close-on-click-modal"false" :modal"false"modal-class&…

web网页QQ登录

代码&#xff1a; <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>QQ登录ent</title> </head> <style>ul > li{list-style: none; } a …

U盘数据丢失不用慌,这4个工具可以帮你恢复。

因为将大量的数据存到U盘里面很方便&#xff0c;所以U盘使用也很广泛。但是里面的数据丢失想必很多朋友都碰到过&#xff0c;不过现在有很多方法都可以帮助大家将数据回顾回来。这里我便筛选了几款比较好的数据恢复工具&#xff0c;在这里跟大家分享。 1、福昕U盘恢复软件 直通…

AI练中学,你的 AI 助教又升级啦!

你是否在代码学习过程中遇到过这些问题&#xff1f;理论学习和动手实践割裂&#xff1b;课上的示例代码跑起来很麻烦&#xff1b;需要自己配置开发环境&#xff0c;在服务器上配开发环境要付费&#xff0c;折腾半天之后报错。 在大模型应用开发领域&#xff0c;获取大模型 API…

设计模式:类与类之间关系的表示方式(聚合,组合,依赖,继承,实现)

目录 聚合关系 组合关系 依赖关系 继承关系 实现关系 聚合关系 聚合是一种较弱的“拥有”关系&#xff0c;表示整体与部分的关系&#xff0c;但部分可以独立于整体存在。例如&#xff0c;部门和员工之间的关系&#xff0c;一个部门可以包含多个员工&#xff0c;但员工可以…

MFC工控项目实例二十五多媒体定时计时器

承接专栏《MFC工控项目实例二十四模拟量校正值输入》 用多媒体定时器实现0.1秒计时器 1、在SEAL_PRESSUREDlg.h文件中添加代码 #include<MMSystem.h> #pragma comment(lib,"winmm.lib")class CSEAL_PRESSUREDlg : public CDialog { public:CSEAL_PRESSUREDlg(…

计算机网络基础进阶

三次握手四次挥手 三次握手 1------建立连接----------------------2 ACK1&#xff0c;seq0 2------传输数据&#xff0c;建立连接---------1 1------传输数据&#xff0c;建立连接---------2 三次握手用于建立TCP连接&#xff0c;确保通信双方都准备好进行数据传输。整个…

使用 Git LFS(大文件存储)

Git LFS&#xff08;Large File Storage&#xff09;是一种扩展 Git 的工具&#xff0c;旨在更有效地管理大文件的版本控制。它通过将大文件的内容存储在 Git 之外来解决 Git 在处理大文件时的性能问题。 主要特点 替代存储&#xff1a;Git LFS 不直接将大文件存储在 Git 仓库…

C++类和对象 - 下【匿名对象,友元,static成员】

&#x1f31f;个人主页&#xff1a;落叶 &#x1f31f;当前专栏: C专栏 目录 ​编辑 再探构造函数 类型转换 static成员 友元 内部类 匿名对象 对象拷⻉时的编译器优化 再探构造函数 之前我们实现构造函数时&#xff0c;初始化成员变量主要使⽤函数体内赋值&#xff0c;…

Mac 远程 Windows 等桌面操作系统工具 Microsoft Remote Desktop for Mac 下载安装详细使用教程

最近需要在 Mac 上远程连接控制我的 windows 电脑系统&#xff0c;经过一番尝试对于 win 来说还是微软自家推出的 Microsoft Remote Desktop for Mac 最最好用&#xff0c;没有之一 简介 Microsoft Remote Desktop是一款由微软公司开发的远程桌面连接工具&#xff0c;可以让用…

Redis遇到Hash冲突怎么办?

这是小伙伴之前遇到的一个面试题&#xff0c;感觉也是一个经典八股&#xff0c;和大伙分享下。 一 什么是 Hash 冲突 Hash 冲突&#xff0c;也称为 Hash 碰撞&#xff0c;是指不同的关键字通过 Hash 函数计算得到了相同的 Hash 地址。 Hash 冲突在 Hash 表中是不可避免的&am…

开源图像超分ECBSR项目源码分析

相关介绍 项目GitHub地址&#xff1a;https://github.com/xindongzhang/ECBSR项目相关论文&#xff1a;https://www4.comp.polyu.edu.hk/~cslzhang/paper/MM21_ECBSR.pdf&#xff08;也可以点这里下载&#xff09;论文解读&#xff1a;Edge-oriented Convolution Block for Re…

CLion远程开发Ubuntu,并显示helloworld文字框

1.CLion的介绍以及其在远程开发上的优点 1&#xff09;CLion 是一个由 JetBrains 开发的跨平台 C/C 集成开发环境&#xff08;IDE&#xff09;&#xff0c;功能强大。 2&#xff09;CLion的优点&#xff1a; 远程工具链支持&#xff1a;CLion 支持通过 SSH 连接到远程 Ubuntu…

Unity--AssestBundles--热更新

使用Node.js搭建AssestBundle服务器并验证AB包热更新 一、服务器部分 使用NodeJs作为服务器&#xff0c; 使用Express为基础网页模版。 当然&#xff0c; 使用其他的FTP&#xff0c;http服务器也可以&#xff0c; 基础逻辑是存放资源的位置。 1.下载Node.js 下载地址:https…

【Python】NumPy(二):数组运算、数据统计及切片索引、广播机制

目录 Numpy数组 数组的基本运算 乘法 加法 数组的数据统计 平均值 中位数 最大值和最小值 求和 累积和 标准差 方差 切片和索引 索引 一维数组的索引 二维数组的索引 获取多个元素 布尔索引 切片 一维数组切片 二维数组切片 多维数组切片 广播机制 规则 …

本地生活便民信息服务小程序源码系统 PHP+MySQL组合开发 带完整的安装代码包以及搭建部署教程

系统概述 地方门户分类信息网站源码系统是一个基于PHP和MySQL开发的强大平台&#xff0c;旨在帮助用户轻松搭建地方性的分类信息网站。该系统集成了众多实用功能&#xff0c;支持用户自由发帖、浏览和搜索各类信息&#xff0c;如二手交易、求职招聘、房屋租售、生活服务、商家…

【java】抽象类和接口(了解,进阶,到全部掌握)

各位看官早安午安晚安呀 如果您觉得这篇文章对您有帮助的话 欢迎您一键三连&#xff0c;小编尽全力做到更好 欢迎您分享给更多人哦 大家好我们今天来学习Java面向对象的的抽象类和接口&#xff0c;我们大家庭已经来啦~ 一&#xff1a;抽象类 1.1:抽象类概念 在面向对象的概念中…