ETCD实现分布式锁

news2025/4/8 5:21:00

分布式锁具备特点

  • 互斥性:在同一时刻,只有一个客户端能持有锁

  • 安全性:避免死锁,如果某个客户端获得锁之后处理时间超过最大约定时间,或者持锁期间发生了故障导致无法主动释放锁,其持有的锁也能够被其他机制正确释放,并保证后续其它客户端也能加锁,整个处理流程继续正常执行

  • 可用性:也被称作容错性,分布式锁需要有高可用能力,避免单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务自身具备集群模式,遇到故障能自动切换恢复工作;另一种是客户端向多个独立的锁服务发起请求,当某个锁服务故障时仍然可以从其他锁服务读取到锁信息(Redlock)

  • 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其他线程持有的锁给释放了

  • 高效灵活:加锁、解锁的速度要快;支持阻塞和非阻塞;支持公平锁和非公平锁

Redis实现分布式锁的缺点

  1. 客户端长时间阻塞导致锁失效问题
    客户端A获取锁后在处理业务时长时间阻塞,导致锁过期释放。当阻塞恢复时,就会出现多客户端同时持有锁的情况。
    redis的解决方案:
    – 延长锁的过期时间。
    – java redission 提供了看门狗机制,在业务处理完之前不断给锁续期。

  2. 单点实例安全性问题
    为了保证分布式锁的高可用,需要部署redis的主从节点,在数据同步之前发生主从切换,可能就会丢失原先master上的锁信息,导致同一时间两个客户端同时持有锁。
    redis的解决方案:
    参考官方文档redlock

ETCD实现分布式锁的思路

prefix

etcd支持前缀查找,所以可以用一个前缀表示锁资源,前缀 + 唯一id的方式表示锁资源的持有者。

lease机制

租约机制可以保证锁的活性,持有锁的客户端宕机,key自动过期,避免宕机。etcd客户端提供的lease续租机制解决客户端长时间阻塞导致锁失效问题。

watch机制

redis采用忙轮询的方式来获取锁,etcd可以使用watch机制监听锁的删除事件,更加高效。

实现策略

etcd实现分布式锁的方案有很多种,可以通过判断是否存在一个固定的key来实现分布式锁,但是这种实现策略有很大的问题。当多客户端同时获取锁时,只有一个成功获得,其余多个客户端监听key的删除事件,一旦锁被释放,多个客户端同时收到锁删除事件(无论尝试加锁的顺序)进行加锁,这就是 “惊群问题” ,所以etcd官网提供了另外一种实现策略。

不再将一个固定的key当作锁资源,而是将一个前缀当作锁资源,每一个客户端尝试加锁的时候都会以该前缀创建一个key,并且监听前一个创建该前缀key的版本号。

输入图片说明

具体的实现方式会在源码解析时讲解。

etcd的强一致性

etcd是基于raft实现的,对外提供的是强一致性的kv存储,不会存在类似于redis主从切换导致的不一致问题。

etcd客户端concurrency包提供的分布式锁

锁的使用

func NewLock() sync.Locker {  
	//创建客户端
   cli, err := clientv3.New(clientv3.Config{Endpoints: ip1,ip2...})  
   if err != nil {  
      log.Fatal(err)  
   }  
   //授权租约
   resp, err := cli.Grant(context.TODO(), 5)  
   if err != nil {  
      log.Fatal(err)  
   }  
   //创建会话
   //会话会创建一个租约,并在客户端生存期内保证租约的活性
   session, err := concurrency.NewSession(cli, concurrency.WithLease(resp.ID))  
   if err != nil {  
      log.Fatal(err)  
   }  
   //利用会话,指定一个前缀创建锁
   return concurrency.NewLocker(session, "/myLock/")  
}

通过以上方式就可以创建一个分布式锁,通过锁的Lock()和UnLock()方法加锁解锁。

源码解析

type Mutex struct {  
   s *Session  //会话 
 
   pfx   string  //锁名称,key的前缀
   myKey string  //key
   myRev int64  //创建key的版本号
   hdr   *pb.ResponseHeader  
}

加锁

func (m *Mutex) Lock(ctx context.Context) error {  
   //尝试获取锁
   resp, err := m.tryAcquire(ctx)  
   if err != nil {  
      return err  
   }
   ......
 }
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {  
   s := m.s  
   client := m.s.Client()  
  
   //拼接完整的key,prefix + leaseId
   m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())  
   // 比较操作,判断当前key的创建版本号是否为0,版本号为0表示key还未创建
   cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)  
   // 创建key操作(将当前key存储到etcd)
   p**加锁**ut := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))  
   // 获取当前key的版本号操作  
   get := v3.OpGet(m.myKey)  
   // 获取第一个创建该前缀key的版本号(不会获取到已经删除的key的版本号),这个key就是当前持有锁的key  
   getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)  
   //通过一个事务执行操作
   //若当前key不存在,创建key,并获取持有锁的key的版本号
   //若当前key存在,获取key的版本号,并获取持有锁的key
   resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()  
   if err != nil {  
      return nil, err  
   }  
   //将当前key的把版本号赋值给myRev字段
   m.myRev = resp.Header.Revision  
   if !resp.Succeeded {  
      m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision  
   }  
   return resp, nil  
}
func (m *Mutex) Lock(ctx context.Context) error {  
   resp, err := m.tryAcquire(ctx)  
   if err != nil {  
      return err  
   }  
   // 将当前持有锁的key赋值给ownerKey
   ownerKey := resp.Responses[1].GetResponseRange().Kvs
   //ownerKey不存在,或者版本号等于自己创建key的版本号,表示当前key正持有锁,可直接返回  
   if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {  
      m.hdr = resp.Header  
      return nil  
   }  
   client := m.s.Client()  
   //等待锁的释放
   _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)  
   // release lock key if wait failed  
   if werr != nil {  
      m.Unlock(client.Ctx())  
      return werr  
   }  
  
   // make sure the session is not expired, and the owner key still exists.  
   gresp, werr := client.Get(ctx, m.myKey)  
   if werr != nil {  
      m.Unlock(client.Ctx())  
      return werr  
   }  
  
   if len(gresp.Kvs) == 0 { // is the session key lost?  
      return ErrSessionExpired  
   }  
   m.hdr = gresp.Header  
  
   return nil  
}
//等待锁的释放
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {  
   //获取最新的该前缀的key的操作,WithMaxCreateRev(maxCreateRev)对返回值进行了限制,返回值版本号必须是小于等于maxCreateRev的,通过这个操作就可以获取仅小于自己key版本号的key
   getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))  
   for {  
      resp, err := client.Get(ctx, pfx, getOpts...)  
      if err != nil {  
         return nil, err  
      }  
      //不存在大于自己版本号的key,可以获取锁了
      if len(resp.Kvs) == 0 {  
         return resp.Header, nil  
      }  
      lastKey := string(resp.Kvs[0].Key)  
      //等待 lastKey 被删除
      if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {  
         return nil, err  
      }  
   }  
}
//通过watch机制监听指定version 的key的删除。
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {  
   cctx, cancel := context.WithCancel(ctx)  
   defer cancel()  
  
   var wr v3.WatchResponse  
   wch := client.Watch(cctx, key, v3.WithRev(rev))  
   for wr = range wch {  
      for _, ev := range wr.Events {  
	     //监听Delete事件
         if ev.Type == mvccpb.DELETE {  
            return nil  
         }  
      }  
   }  
   if err := wr.Err(); err != nil {  
      return err  
   }  
   if err := ctx.Err(); err != nil {  
      return err  
   }  
   return fmt.Errorf("lost watcher waiting for delete")  
}

释放锁

func (m *Mutex) Unlock(ctx context.Context) error {  
   client := m.s.Client()  
   //直接删除key
   if _, err := client.Delete(ctx, m.myKey); err != nil {  
      return err  
   }  
   m.myKey = "\x00"  
   m.myRev = -1  
   return nil  
}

总结

基于etcd实现的分布式锁基本上使用到了etcd的全部性质,并且保证了分布式锁的互斥性,安全性和可用性。官方实现的分布式锁并不支持可重入性,但是要实现可重入性锁也很简单,对这个锁在封装一层,并增加一个计数器。

参考资料
极客时间- etcd实战课
etcd分布式锁的实现原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/529751.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ANR实战案例 - FCM拉活启动优化

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 例如:第一章 Python 机器学习入门之pandas的使用 文章目录 系列文章目录前言一、Trace日志分析二、业务分析1.Firebase源码分析2.Firebase官方查看官方文档Dem…

数据压缩新利器!小精灵ELF助你高效存储与传输

存储空间不够用?网络传输太慢?想必每个人在生活中都会遇到这些问题。看着爆满的硬盘、焦急的等待数据的接受,更新设备?不是每个人都能承担这个成本。那不如尝试一下无损压缩? 为了减少存储空间的占用,提高…

《Netty》从零开始学netty源码(五十七)之ServerBootstrap.bind()

目录 ServerBootstrap.bind()initAndRegister()init()register()doBind0() ServerBootstrap.bind() 在第一篇的HelloWorld中通过ServerBootstrap.bind()方法绑定端口号并最终启动Netty的服务,服务端的bind过程如下: 上面的代码主要分成两部分&#xff0…

【P20】JMeter XPath提取器(XPath Extractor)

文章目录 一、准备工作二、测试计划 一、准备工作 百度:https://www.w3school.com.cn/example/xmle/cd_catalog.xml 进入网页后,右键检查或按F12,打开调试工具 如图,使用XPath提取器(XPath Extractor)获取…

typescript学习笔记(下)

1、类型拓宽 所有通过 let 或 var 定义的变量、函数的形参、对象的非只读属性,如果满足指定了初始值且未显式添加类型注解的条件,那么它们推断出来的类型就是指定的初始值字面量类型拓宽后的类型,这就是字面量类型拓宽。 下面我们通过字符串…

数据结构-排序-(选择、堆排序、归并排序、基数排序)

目录 一、选择排序 二、堆排序 排序 效率分析 三、归并排序 排序 分析 四、基数排序 一、选择排序 思想:每趟在待排序元素中选取关键字最小的元素加入有序子列 不稳定性 空间复杂度:O(1) 时间复杂度: void swap(int &a,int &…

[Linux] 动态 / 静态库的生成与使用

文章目录 简要概念 静态库生成使用 动态库生成使用 简要概念 库一般分为两种: 静态库动态库 在 Linux 中: 如果是动态库,库文件是以 .so 作后缀的如果是静态库,库文件是以 .a 作后缀的 库文件的命名: libXXX.so …

RBTree

目录 红黑树的概念 红黑树性质 红黑树节点设计 红黑树的插入 红黑树的验证 红黑树和AVL树的比较 红黑树的概念 红黑树,是一种二叉搜索树,但在每个结点上增加一个存储位表示结点的颜色,可以是Red或 Black。 通过对任何一条从根到叶子的…

Point-SLAM: Dense Neural Point Cloud-based SLAM阅读记录

前言 只读了前半部分就感慨文章结构真的好清晰,从Introduction到related work完完全全都在体现它的motivation——他做了一件什么事情?以及为什么要这么做?解决了什么问题。 第一遍阅读 keywords: 以RGBD作为输入 使用点云表示场景的 dens…

【P21】JMeter XPath2 提取器(XPath2 Extractor)

文章目录 一、准备工作二、测试计划 一、准备工作 百度:https://www.w3school.com.cn/example/xmle/cd_catalog.xml 进入网页后,右键检查或按F12,打开调试工具 如图,使用XPath2 提取器(XPath2 Extractor)…

python 使用pandas或xlrd、xlwt实现对Excel的读取、添加、追加等一系列封装

不说了,又是造轮子的一天。在此我要严重批评CSDN或百度一堆浑水摸鱼的,某些人明明代码明显报错也来上传发博客,要么就是标题党,代码没报错但压根就不是实现那个功能的,简直是浪费时间。 废话不多说直接贴代码&#xff…

Linux—网络基础

目录 计算机网络背景 网络发展 认识 "协议" 网络协议初识 协议分层 OSI七层模型 TCP/IP五层(或四层)模型 网络传输基本流程 协议报头 局域网通信 网络传输流程图 局域网通信图 跨网络通信图 数据包封装和分用 网络中的地址管理 认识IP地址 认识MAC地址…

8款主流产品原型设计软件分享

在产品设计中,你知道如何选择合适的产品设计软件吗?每个产品设计软件的功能实际上是不同的,不同的产品设计软件应用领域是不同的。 只有深入了解每个产品设计软件的功能和主要适合该软件的行业,我们才能在设计相应的产品时找到合…

linux内核篇-进程及其调度

介绍一个程序从源文件到进程执行的过程 1、编译链接(源文件到二进制文件) Linux 下面二进制的程序也要有严格的格式,称为ELF(Executeable and Linkable Format,可执行与可链接格式) ,这个格式可…

Simulink 和 Gazebo联合仿真控制机械臂【Matlab R2022a】

逛 B 站,偶然发现一个 up 主上传的视频,可以实现 Simulink 中搭建机器人的控制器设计,对运行在虚拟机中 Gazebo 中的机械臂进行控制,链接:三关节机械臂Gazebo-Simulink联合仿真,这让我很感兴趣,…

Web基础 ( 一 ) HTML

1.HTML <input /><input typebutton value按钮 />1.1.概念 1.1.1.HTML文件是什么 HTML表示超文本标记语言&#xff08;Hyper Text Markup Language&#xff09;, HTML文件是一个包含标记的文本文件, 必须有htm标记或者html扩展名。 可以通过浏览器(Browser)直接…

如何用自己公司的知识、流程等来训练Chat GPT?

在玩过 ChatGPT 并向它询问有关世界、金融和初创公司的一般问题后&#xff0c;我开始思考&#xff1a;“如果我可以用我自己的初创公司甚至大型公司的所有流程、知识和商业经验来训练 AI 模型会怎样&#xff1f;企业&#xff1f;” 使用您自己公司的知识、流程等培训 ChatGPT …

华为OD机试 - 计算网络信号、信号强度( Python)

题目描述 网络信号经过传递会逐层衰减,且遇到阻隔物无法直接穿透,在此情况下需要计算某个位置的网络信号值。 注意:网络信号可以绕过阻隔物。 array[m][n] 的二维数组代表网格地图, array[i][j] = 0代表i行j列是空旷位置; array[i][j] = x(x为正整数)代表i行j列是信号源,…

Python实现哈里斯鹰优化算法(HHO)优化XGBoost回归模型(XGBRegressor算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 2019年Heidari等人提出哈里斯鹰优化算法(Harris Hawk Optimization, HHO)&#xff0c;该算法有较强的全…

【A*算法——清晰解析 算法逻辑——算法可以应用到哪些题目】例题1.第K短路

A*算法 A*算法是什么例题1. 第K短路题意解析 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示…