Go微服务: redis分布式锁保证数据原子操作的一致性

news2024/10/6 18:34:05

概述

  • 随着云计算和大数据技术的飞速发展,分布式系统已经成为现代IT架构的重要组成部分
  • 在分布式系统中,数据的一致性是一个至关重要的挑战,特别是在并发访问和修改共享资源的场景下
  • 分布式锁是一种跨进程、跨机器节点的互斥锁,用于控制多个节点对共享资源的访问
  • 其核心目标是确保在分布式系统中,同一时刻只有一个节点能够访问特定的共享资源,从而实现数据的一致性
  • 分布式锁的实现方式多种多样,包括基于数据库、缓存(如Redis)、分布式协调服务(如Zookeeper)等

场景示例

  • 在多携程或者多线程的情况下,这个数据竞争是避免不了的, 参考下图
  • 一开始我们有一个Redis图标代表着之前用到的分布式的锁
  • 两边有两个grorouting,在实际工作当中有很多grorouting
  • 让左边的 grorouting 进行 getKey 和 setKey,就是说对这个 redis 进行读写
  • 让右边的 grorouting 也进行读写,这里面就会产生一个问题
  • 多个gorouting在同时写的时候会不会产生数据一致性的问题
  • 这个场景就是我们经常说的中断,举个例子
    • 写代码,听歌上网写文档,感觉这个计算机同时在做几件事情一样
    • 但是要在计算机看来,它在不停的切换,只是说计算机执行的足够快
    • 人类是无法感觉到它在极短的时间内进行时间的这个切换
  • 这里有两个命令,一个是上面的 getKey和下面这一行的 setKey
  • 在高并发的时候就会导致切换出现数据竞争, 也就是数据不一致的这种情况发生
  • REDIS 里提供了一个命令就是这个 Setnx,全称就是 Set if Not Exists
  • 中文意思就是说设置它,如果它不存在,如果设置成功,就是1,设置失败就返回0
  • 那就是说不存在这个key的时候,我就可以设置成功,如果存在了,那就设置是失败的
  • 那这就保证了第一个gorouting是可以设置成功的,在这个后面的gorouting,它就是设置不成功的

源码示例

  • 分布式锁 redSync 是如何结合这个setNX解决这个数据竞争的问题的
  • 我们来看下之前的业务代码
    pool := goredis.NewPool(client)
    rs := redsync.New(pool)
    mutexname := "my-global-mutex"
    mutex := rs.NewMutex(mutexname, redsync.WithExpiry(30*time.Second))
    fmt.Println("Lock()....")
    if err := mutex.Lock(); err != nil {
        panic(err)
    }
    
  • 上面这里 mutex.Lock() 进入源码
    // Lock locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) Lock() error {
        return m.LockContext(context.Background())
    }
    
  • 这里看到就一个 LockContext 方法,再次进入,其实这里有2个重载函数
    // LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) LockContext(ctx context.Context) error {
        return m.lockContext(ctx, m.tries)
    }
    
    
    // lockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
    func (m *Mutex) lockContext(ctx context.Context, tries int) error {
        // 如果 ctx 不存在则新建
        if ctx == nil {
            ctx = context.Background()
        }
    
        // 获取 value
        value, err := m.genValueFunc()
        if err != nil {
            return err
        }
    
        var timer *time.Timer
        // 循环 tries 次
        for i := 0; i < tries; i++ {
            if i != 0 {
                if timer == nil {
                    timer = time.NewTimer(m.delayFunc(i))
                } else {
                    timer.Reset(m.delayFunc(i))
                }
    
                select {
                case <-ctx.Done():
                    timer.Stop()
                    // Exit early if the context is done.
                    return ErrFailed
                case <-timer.C:
                    // Fall-through when the delay timer completes.
                }
            }
    
            start := time.Now()
    
            n, err := func() (int, error) {
                ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
                defer cancel()
                return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                    // 在分配锁的时候会加 nx, 进入这里
                    return m.acquire(ctx, pool, value)
                })
            }()
    
            now := time.Now()
            until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
            if n >= m.quorum && now.Before(until) {
                m.value = value
                m.until = until
                return nil
            }
            _, _ = func() (int, error) {
                ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
                defer cancel()
                return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
                    return m.release(ctx, pool, value)
                })
            }()
            if i == tries-1 && err != nil {
                return err
            }
        }
    
        return ErrFailed
    }
    
  • 再次进入 m.acquire
    func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
        conn, err := pool.Get(ctx)
        if err != nil {
            return false, err
        }
        defer conn.Close()
        // 这里 SetNX 就是 redSync 封装起来的 nx 特性
        reply, err := conn.SetNX(m.name, value, m.expiry)
        if err != nil {
            return false, err
        }
        return reply, nil
    }
    
  • 进入 conn.SetNX,可见 key, value 和 过期时间 expiry (默认 8s)
    // Conn is a single Redis connection.
    type Conn interface {
        Get(name string) (string, error)
        Set(name string, value string) (bool, error)
        SetNX(name string, value string, expiry time.Duration) (bool, error) // 注意这里
        Eval(script *Script, keysAndArgs ...interface{}) (interface{}, error)
        PTTL(name string) (time.Duration, error)
        Close() error
    }
    
    • 不传过期时间,它会有死锁的风险
    • 因为一旦处于某种异常状况,那你这个锁就永远不会释放了,就会出现死锁
    • 过期时间比如说五秒之后自动释放,那我无论是宕机还是崩溃等其他问题,它都会五秒之后,释放出资源
    • 释放之后其他 gorouting 就可以继续抢锁和业务逻辑操作

关于redis锁过期问题


  • 在上述业务代码中
    mutex := rs.NewMutex(mutexname, redsync.WithExpiry(30*time.Second))
    
  • 这里有对过期时间做初始化的代码,其实在new的时候,有一个默认值,进入 NewMutex
    // NewMutex returns a new distributed mutex with given name.
    func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
    	m := &Mutex{
    		name:   name,
    		expiry: 8 * time.Second,
    		tries:  32,
    		delayFunc: func(tries int) time.Duration {
    			return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
    		},
    		genValueFunc:  genValue,
    		driftFactor:   0.01,
    		timeoutFactor: 0.05,
    		quorum:        len(r.pools)/2 + 1,
    		pools:         r.pools,
    	}
    	for _, o := range options {
    		o.Apply(m)
    	}
    	if m.shuffle {
    		randomPools(m.pools)
    	}
    	return m
    }
    
  • 这里可见,默认是8s的过期时间,如果业务特别复杂,这个时间可以调整,因为如果8s的时间hold不住业务,其他gorouting就可能抢到锁干一些事情导致数据不一致
  • redSync 中有一种机制是对锁进行续命,在 mutux.go 中有个 touch 方法
    // 里面不是go脚本,是 Lua 脚本
    var touchScript = redis.NewScript(1, `
    	// 核对你是否是key和值的拥有者,不能让别人随便设置
    	// 只有自己才能对自己进行续期
    	if redis.call("GET", KEYS[1]) == ARGV[1] then
    		return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    	else
    		return 0
    	end
    `)
    
    func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
    	conn, err := pool.Get(ctx)
    	if err != nil {
    		return false, err
    	}
    	defer conn.Close()
    
    	touchScript := touchScript
    	if m.setNXOnExtend {
    		touchScript = touchWithSetNXScript
    	}
    
    	status, err := conn.Eval(touchScript, m.name, value, expiry)
    	if err != nil {
    		return false, err
    	}
    	return status != int64(0), nil
    }
    
  • 也就是说默认8s后,如果不够,通过 touch 方法续期,如果一直续期则会导致锁会一直被占用,结果带来的是不公平和可能得死锁问题需要谨慎
  • 一般在8s内已经足够业务使用了,如果不够,可以考虑使用,但一定要慎用

分布式锁如何防止被篡改和删除

  • redSync 已经对防篡改和删除做了一些防范处理
  • 我们可以从 Unlock 的源码中获得一些答案
    // Unlock unlocks m and returns the status of unlock.
    func (m *Mutex) Unlock() (bool, error) {
    	return m.UnlockContext(context.Background())
    }
    
    // UnlockContext unlocks m and returns the status of unlock.
    func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
    	n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
    		return m.release(ctx, pool, m.value)
    	})
    	if n < m.quorum {
    		return false, err
    	}
    	return true, nil
    }
    
  • 进入 release 函数
    var deleteScript = redis.NewScript(1, `
    	local val = redis.call("GET", KEYS[1])
    	if val == ARGV[1] then
    		return redis.call("DEL", KEYS[1])
    	elseif val == false then
    		return -1
    	else
    		return 0
    	end
    `)
    
    func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    	conn, err := pool.Get(ctx)
    	if err != nil {
    		return false, err
    	}
    	defer conn.Close()
    	status, err := conn.Eval(deleteScript, m.name, value)
    	if err != nil {
    		return false, err
    	}
    	if status == int64(-1) {
    		return false, ErrLockAlreadyExpired
    	}
    	return status != int64(0), nil
    }
    
  • 这里的核心是 conn.Eval(deleteScript, m.name, value) 直接把值删除,在删除的逻辑又是一段 Lua
  • Lua可以保证原子性,性能更高,现在再看这个 value 是怎么来的,回到 NewMutex 中,value 就在这里拿到的,在其源码内部有一个 genValueFunc: genValue 的配置项
    func genValue() (string, error) {
    	b := make([]byte, 16)
    	_, err := rand.Read(b)
    	if err != nil {
    		return "", err
    	}
    	return base64.StdEncoding.EncodeToString(b), nil
    }
    
  • 这里有一个处理 base64 的程序,其实在内部每次调用一次 LockContext 都会执行这个函数生成一次这个 base64并存放到 m.value 上,这个base64 就达到了防篡改的目的
  • 回到 Unlock 删除时的 deleteScript 中可以看到,删除时也要保证是自己的,这样就保证了数据的安全性
  • 这样就不会随随便便被别人给干掉

总结

  • 为了保持分布式锁的数据原子操作的一致性,我们可以得出以下结论

1 )互斥性

  • 分布式锁的最基本特性是互斥性,即同一时刻只有一个节点能够持有锁,访问共享资源。这一特性确保了并发操作下的数据一致性
  • 当多个节点尝试同时访问共享资源时,只有获得锁的节点能够执行操作,其他节点则被阻塞或等待,直到锁被释放

2 )锁的获取与释放

  • 在分布式系统中,锁的获取和释放过程需要特别小心
  • 首先,锁的获取必须是一个原子操作,以确保在多个节点同时尝试获取锁时,只有一个节点能够成功
  • 其次,锁的释放也需要在操作完成后立即进行,以避免死锁和资源泄露
  • 此外,为了应对可能的异常情况(如节点宕机、网络故障等),还需要实现锁的自动释放机制,如设置超时时间

3 )超时机制

  • 超时机制是分布式锁中非常重要的一个特性。当节点在持有锁的过程中出现异常或处理时间过长时,可能导致其他节点无法获取锁,形成死锁
  • 为了避免这种情况的发生,分布式锁通常会设置超时时间
  • 当节点持有锁的时间超过设定的超时时间时,锁会自动释放,其他节点可以重新尝试获取锁

4 )锁的续期

  • 在某些场景下,节点可能需要长时间持有锁以完成某些复杂的操作
  • 为了避免因超时导致锁被释放而中断操作,分布式锁通常支持锁的续期功能
  • 节点可以在持有锁的过程中定期发送续期请求,以延长锁的持有时间
  • 这样可以在保证数据一致性的同时,提高系统的可用性和性能。

5 )分布式锁的实现方式

  • 分布式锁的实现方式多种多样,包括基于数据库的分布式锁、基于缓存的分布式锁(如Redis)以及基于分布式协调服务的分布式锁(如Zookeeper)
  • 这些实现方式各有优缺点,需要根据具体的业务场景和需求来选择合适的实现方式

6 )所以

  • 分布式锁作为保证分布式系统中数据一致性的一种重要机制,具有广泛的应用场景。通过实现互斥性、锁的获取与释放、超时机制、锁的续期等特性,分布式锁能够有效地保证数据原子操作的一致性
  • 在实际应用中,我们需要根据具体的业务场景和需求来选择合适的分布式锁实现方式,并合理配置相关参数,以确保系统的稳定性和性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1847189.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python web 开发 flask 实践

1、前言 前文已经介绍了很多关于 python 的算法和脚本的写法&#xff0c;在本文将开启python的 web 的开发&#xff0c;和java 类似的&#xff0c;对于 web 开发也需要引入框架&#xff0c;对于 python 的 web 开发来说常见的有 flask 和 django 两种&#xff0c;在本文中将要…

Comparison method violates its general contract! 神奇的报错

发生情况 定位到问题代码如下&#xff08;脱敏处理过后&#xff09;&#xff0c;意思是集合排序&#xff0c;如果第一个元素大于第二个元素&#xff0c;比较结果返回1&#xff0c;否则返回-1&#xff0c;这里粗略的认为小于和等于是一样的结果 List<Integer> list Arr…

【Android14 ShellTransitions】(六)SyncGroup完成

这一节的内容在WMCore中&#xff0c;回想我们的场景&#xff0c;是在Launcher启动某一个App&#xff0c;那么参与动画的就是该App对应Task&#xff08;OPEN&#xff09;&#xff0c;以及Launcher App对应的Task&#xff08;TO_BACK&#xff09;。在确定了动画的参与者后&#x…

C#.net6.0语言+B/S架构+前后端分离 手术麻醉信息管理系统源码

C#.net6.0语言&#xff0b;B/S架构前后端分离 手术麻醉信息管理系统源码 什么是手术麻醉信息管理系统 满足医院等级评级需求 满足电子病历评级需求 满足科室需求 术前 1、患者术前评估/诊断 2、术前讨论制定手术方案 3、手术准备 4、术前准备 术中 1、送手术室 2、麻…

openlayers 轨迹回放(历史轨迹),实时轨迹

本篇介绍一下使用openlayers轨迹回放&#xff08;历史轨迹&#xff09;&#xff0c;实时轨迹 1 需求 轨迹回放&#xff08;历史轨迹&#xff09;实时轨迹 2 分析 主要是利用定时器&#xff0c;不断添加feature 轨迹回放&#xff08;历史轨迹&#xff09;&#xff0c;一般是…

Ubuntu安装qemu-guest-agent

系列文章目录 Ubuntu-24.04-live-server-amd64安装界面中文版 Ubuntu-24.04-live-server-amd64启用ssh Ubuntu乌班图安装VIM文本编辑器工具 文章目录 系列文章目录前言一、安装二、启用服务三、效果总结 前言 QEMU Guest Agent&#xff08;简称QEMU GA或QGA&#xff09;在虚拟…

什么是NLP-自然语言处理

什么是NLP-自然语言处理 什么是NLP开通NLP新建项目创建模型 什么是NLP NPL是面向算法小白用户的行业自适应标注、训练和服务平台。该产品支持文本实体抽取、文本分类、关键短语抽取、情感分析、关系抽取、短文本匹配、商品评价解析等 NLP 定制化算法能力&#xff0c;用户无需拥…

御龙掘宝挂机零撸修仙类游戏定制开发源码部署

随着移动游戏的普及&#xff0c;御龙掘宝挂机零撸修仙类游戏定制开发源码部署应运而生。这款游戏结合了传统的修仙元素、挂机游戏的核心玩法以及零撸掘金的商业模式&#xff0c;为玩家提供了一个全新的游戏体验。本文将探讨御龙掘宝挂机零撸修仙类游戏定制开发源码部署的核心技…

linux 安装sftp及使用sftp工具类上传和下载

一、centos7 安装sftp 1.安装 OpenSSH 服务&#xff1a; sudo yum install openssh-server2.启动 SSH 服务&#xff0c;并设置为开机启动&#xff1a; sudo systemctl start sshd sudo systemctl enable sshd3.创建一个新用户&#xff0c;用于SFTP连接&#xff08;替换your_…

Linux:多线程中的互斥与同步

多线程 线程互斥互斥锁互斥锁实现的原理封装原生线程库封装互斥锁 死锁避免死锁的四种方法 线程同步条件变量 线程互斥 在多线程中&#xff0c;如果存在有一个全局变量&#xff0c;那么这个全局变量会被所有执行流所共享。但是&#xff0c;资源共享就会存在一种问题&#xff1…

Ilya出走记:SSI的超级安全革命

图片&#xff5c;OpenAI官网 ©自象限原创 作者丨罗辑、程心 和OpenAI分道扬镳以后&#xff0c;Ilya“神秘而伟大”的事业终于揭开了面纱。 6月20日&#xff0c;前OpenAI核心创始人 Ilya Stuskever&#xff0c;在官宣离职一个月后&#xff0c;Ilya在社交媒体平台公开了…

opencascade AIS_InteractiveContext源码学习2

AIS_InteractiveContext 前言 交互上下文&#xff08;Interactive Context&#xff09;允许您在一个或多个视图器中管理交互对象的图形行为和选择。类方法使这一操作非常透明。需要记住的是&#xff0c;对于已经被交互上下文识别的交互对象&#xff0c;必须使用上下文方法进行…

神经网络学习5-非线性激活

非线性激活&#xff0c;即 这是最常用的 inplaceTrue 原位操作 改变变量本身的值&#xff0c;就是是否输入时若原本有值&#xff0c;是否更换 该函数就是表示&#xff1a;输入小于零时输出0&#xff0c;大于零时保持不变 代码如下&#xff1a; import torch from torch imp…

芋道源码 yudao-cloud 、Boot 文档,开发指南 看全部,破解[芋道快速开发平台 Boot + Cloud]

1、文档全部保存本地部署查看&#xff0c;真香 文档已抓取最新版本&#xff0c;2024.06.21。【唯一遗憾&#xff0c;表结构到2024.04月&#xff0c;已被限制放到知识星球】会员中心&#xff0c;支付中心&#xff0c;CRM&#xff0c;ERP&#xff0c;商城&#xff0c;公众号运行…

利氪科技拿下C轮超级融资,国产智能底盘黑马奔向黄金时代

“智能驾驶遗珠&#xff0c;国产替代富矿。” 这是海通证券在最近一期研报中&#xff0c;描述线控底盘产业的用语。它很巧妙地点明了&#xff0c;这个藏在车身之下的部分&#xff0c;拥有何种特征——稳坐技术体系的核心点位&#xff0c;拥有前景广阔的市场。 事实上&#xf…

生成式AI与开发者:威胁还是机遇?

近期&#xff0c;围绕生成式人工智能&#xff08;AI&#xff09;是否能取代程序员的讨论达到了前所未有的高度。百度的创始人李彦宏甚至预言&#xff0c;未来可能不再需要程序员这一职业。这个话题让很多开发者&#xff0c;包括有几年开发经验的我&#xff0c;感到不安。我记得…

【ArcGIS微课1000例】0120:ArcGIS批量修改符号的样式(轮廓)

ArcGIS可以批量修改符号的样式,如样式、填充颜色、轮廓等等。 文章目录 一、加载实验数据二、土地利用符号化三、批量修改符号样式四、注意事项一、加载实验数据 订阅专栏后,从私信查收专栏配套的完整实验数据包,打开0120.rar中的土地利用数据,如下图所示: 查看属性表: …

Python Web实战:Python+Django+MySQL实现基于Web版的增删改查

项目实战 1.创建项目(sms) File->New Project->Django 稍等片刻&#xff0c;项目的目录结构如下图 项目创建后确认是否已安装Django和mysqlclient解释器&#xff0c;如何确认&#xff1f;file->Settings 如果没有请在Terminal终端输入以下命令完成安装 pip instal…

德璞资本:科技股波动解析,三巫日与日元效应下的市场走向

摘要 近期&#xff0c;美国科技股的表现令人担忧&#xff0c;标普500指数在科技股的拖累下出现下跌。亚洲股市也受到影响&#xff0c;特别是日本和韩国股市。随着期权到期日的临近&#xff0c;市场面临更大的波动风险。本文将详细分析科技股失去动能的原因、三巫日的影响及未来…

elementui组件库实现电影选座面板demo

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Cinema Seat Selection</title><!-- 引入E…