我是如何用 redis 分布式锁来解决线上历史业务问题的

news2025/1/10 20:46:08

近期发现,开发功能的时候发现了一个 mq 消费顺序错乱(历史遗留问题),导致业务异常的问题,看看我是如何解决的

问题抛出

首先,简单介绍一下情况:

线上 k8s 有多个 pod 会去消费 mq 中的消息,可是生产者发送的消息是期望一定要有序去消费,此时要表达的是,例如 生产者如果发送了 3 个通知消息,分别是

  • 1 系统已经在 / 组下面添加 a 组,你记得绑定策略 (例如 / 组绑定的是策略是:允许看视频类型的网站)
  • 2 系统已经在 /a 组下添加了 b 组, 你记得绑定策略(期望绑定的策略和他的父组策略一样)
  • 3 系统已经在 b 组下面添加 小 d 用户,你的绑定策略(期望绑定的策略和他的所在组一样)

此处,若有 3 个 pod 的分别拿到了上述 3 条消息,但是自身实际消费完毕的顺序可能是 先完成了 3 消息对应的业务逻辑,再是 2 消息 的业务逻辑,最后是 1 消息的业务逻辑

那么这个时候,小 d 用户就没有绑定上 允许看视频类型的网站 这一条策略,自然 b组 和 a 组也没有绑定上这条策略,这就和我们预期的完全不一致了

当然,实际情况对于单条单条的消息处理基本不会出现这种偏差,但是在批量处理的时候,就会出现实际业务处理顺序与期望不一致的情况,那么就是妥妥的线上问题了(小 d 上网的时候想看视频,可是一直看不了,于是就疯狂投诉。。。)

思考解决

对于这个问题如何解决呢?

我们知道,咱们使用 mq 的目的是为了做到去处理我们的异步逻辑,还能对流量进行削峰,服务间解耦

对于咱们的 A 服务,已经处理了关于添加用户的,添加组的逻辑,发送通知消息给到 B 服务的时候,B 服务自身的处理顺序,未按照既定的顺序真实按照顺序消费完毕,导致出现了业务问题

想法一

我们是期望 B 服务团队去添加批量接口,A 服务将需要通知的信息,排序好给到 B 服务,一个整包, B 服务的单个 pod 接收到这个大包,然后按照顺序处理消息即可,但是这个方式弊端比较明显

  • 当发送了多个批量大包消息的时候,B 服务如果自身处理不过来,也会导致类似的问题,无法根治
  • 需要 B 服务新增和修改的代码较多,肯定谈不下来
  • 而且对于绑定策略的服务来说,不仅仅是 B 服务,还有 C 服务,D 服务呢,他们都要改造… 这个想法就。。。

想法二

对于这一个业务,也不能去对整个架构大改,对于这些历史遗留问题,能少动就少动,兄弟们你们都懂的

于是便想出了使用 redis 分布式锁来处理,对于一个部署在 k8s 中服务的多个 pod 去抢占,谁先抢到锁,那么就谁消费 mq 中的消息,没有抢到锁的 pod ,那就过一会再抢

当然,对于其他类型的业务是没有影响的

如何去实现这个想法呢,我们可以模拟一下

  • 1 首先,我们设置一个 redis 的 key,例如 [服务名]_lockmq, 值的话咱们就任意设置,默认就用 服务名 做 value 吧,过期时间暂定 30 秒,有需求的可以调大
  • 2 如果设置成功,则处理成功之后的事情

    • 2.1 初始化 mq 消费者,并开启协程进行消费

      • 2.1.1 如果初始化失败,则直接返回,退到第 1 步
    • 2.2 对 redis 锁进行续期,此处咱们 10 秒续期一次

      • 如果续期失败,则直接返回,退到第 1 步
  • 3 若拿锁失败,则休息 10 秒再去拿锁

这样来处理的话,我们就可以应对多个 pod 来消费同一类消息的时候,保证同时只有一个 pod 在处理 mq 中的消息了,当然如果正在处理消息的 pod 出现了异常,对于其他 pod ,最晚会在 40 秒之后拿到锁,对于大量的消息来说,这个还是可以容忍的

对应的代码逻辑如下:

  • 简单连接 redis, redis 分布式锁的主逻辑如下

    • 连接 redis ,DB 默认为 0 号
var rdb = redis.NewClient(&redis.Options{
   Addr:     "localhost:6379",
   Password: "123456",
   DB:       0,
})

func LockMq(svrName string) {
   key := fmt.Sprintf("%s_lockmq", svrName)
   // 尝试加锁
   var set bool
   for {
      set = redisLock(key, svrName, time.Second*30)
      if set {
         log.Println("redisLock success ")
         if err := afterLockSuccess(key); err != nil {
            // 如果此处有err ,自然是 mq 初始化失败
            log.Println("mq init error: ", err)
         }else{
            log.Println("redisLock expire failed ")
         }
         time.Sleep(time.Second * 10)
         continue
      }
      // afterLockFailed()
      log.Println("redisLock failed ")
      time.Sleep(time.Second * 10)
   }
}
  • 基本的加锁实现

    • 设置 key , value , 过期时间为 30 秒
func redisLock(key, value string, duration time.Duration) bool {
   set, err := rdb.SetNX(context.TODO(), key, value, duration).Result()
   if err != nil {
      log.Println("setnx failed, error: ", err)
      return false
   }
   return set
}
  • 加锁成功之后,初始化 mq 客户端并进行消费,续期 redis 分布式锁
func afterLockSuccess(key string) error {
   // 初始化需要做的内容或者句柄
   // xxx
   // 对于此处的初始化 mq 句柄失败才返回 err
   ch := make(chan struct{}, 1)
   go func() {
      // 模拟消费消息
      for {
         select {
         case <-ch:
            log.Println("expire failed,mq close")
            return
         default:
            log.Println("is consuming msg")
            time.Sleep(time.Second * 2)
         }
      }
   }()

   for {
      time.Sleep(time.Second*10)
      // 续期
      set, err := rdb.PExpire(context.TODO(), key, time.Second*30).Result()
      if err != nil {
         log.Println("PExpire error!! ", err)
         return nil
      }
      if !set {
         ch <- struct{}{}
         log.Println("PExpire failed!!")
         return nil
      }
      log.Println("PExpire success!! ")
   }

}

具体的测试直接调用 LockMq 函数即可

func main(){
   go redislock.LockMq("helloworld")
   select{}
}

模拟启动多个 pod 去抢锁,抢到锁的执行业务,继续续期,抢不到锁的休息一会再接着抢

程序 a 先启动,程序 b 后启动

程序 a 日志如下:

程序 a 起来之后,启动一段时间之后,kill 掉 程序 a

程序 b 日志如下:

程序 b 先是获取锁失败,过 30s 左右,程序 b 能正常获取到锁

关于源码可以查看地址:https://github.com/qingconglaixueit/my_redis_demo

感谢阅读,欢迎交流,点个赞,关注一波 再走吧

欢迎点赞,关注,收藏

朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力

好了,本次就到这里

技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。

我是阿兵云原生,欢迎点赞关注收藏,下次见~

可以进入地址进行体验和学习:https://xxetb.xet.tech/s/3lucCI

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1001626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

编写更嵌入式软件代码的10个技巧

代码维护是应用程序开发的重要方面&#xff0c;而为了缩短上市时间&#xff0c;通常会忽略代码维护。对于某些应用程序&#xff0c;这可能不会造成重大问题&#xff0c;因为这些应用程序的寿命很短&#xff0c;或者已部署该应用程序&#xff0c;并且再也不会碰它。 但是&#x…

UIScrollView setContentOffset: animated:

项目中遇到感觉一切都设置对了&#xff0c;但是看到的效果和预想的不一样。 后来查询了一番&#xff0c;才知道问题所在&#xff0c;现在记录一下&#xff0c;担心过后又忘了。 最初的问题是这样的&#xff0c;这个热度只有在评论里有&#xff0c;点击赞的时候&#xff0c;热度…

视频号的视频怎么下载,有什么下载工具推荐

视频下载助手去水印小工具是一款方便实用的工具&#xff0c;可以帮助用户在下载视频的时候可以一键去除视频中的水印。 该工具支持多种视频平台的去水印功能&#xff0c;如抖音、快手、小红书、视频号、公众号文字视频、西瓜视频、哔哩哔哩、微博视频、多多视频等。 经过亲自测…

为什么女程序员那么稀缺?女程序员吃不吃香?

程序员脱单一直是个难题&#xff0c;这里的一个客观原因就是程序员群体的男女比例严重失衡&#xff08;比如我司达到了2:8&#xff09;&#xff0c;身边的工作环境缺少异性&#xff0c;大老爷们天天混在一起&#xff0c;脱单自然也就更加困难了。 女程序员那么稀缺&#xff0c…

《Python深度学习-Keras》精华笔记3:解决深度学习多分类问题

公众号&#xff1a;机器学习杂货店作者&#xff1a;Peter编辑&#xff1a;Peter 持续更新《Python深度学习》一书的精华内容&#xff0c;仅作为学习笔记分享。 本文是第三篇&#xff1a;介绍如何使用Keras解决Python深度学习中的多分类问题。 多分类问题和二分类问题的区别注意…

180页的Python完全版电子书

大家好&#xff0c;我是涛哥。 Python学习有很多方式&#xff0c;可以从基础一步步看语法&#xff0c; 可以从案例一步步学习&#xff0c;本篇内容就是通过案例进行讲解&#xff0c;方便大家一步一步进行学习实战。 整个内容经过几个月总结《Python之路2.0.pdf》&#xff0c…

基于 Python 的音乐流派分类

音乐就像一面镜子&#xff0c;它可以告诉人们很多关于你是谁&#xff0c;你关心什么&#xff0c;不管你喜欢与否。我们喜欢说“you are what you stream” - Spotify Spotify 拥有 260 亿美元的净资产&#xff0c;是如今很受欢迎的音乐流媒体平台。它目前在其数据库中拥有数百…

Java拓展--空间复杂度和时间复杂度

空间复杂度和时间复杂度 文章目录 空间复杂度和时间复杂度空间复杂度时间复杂度**评价排序算法****时间频度****什么是时间频度****忽略常数项****忽略低次项****忽略系数** **时间复杂度****什么是时间复杂度****计算时间复杂度的方法****常见的时间复杂度** **常见的时间复杂…

正中优配:证券融资融券是什么意思?

证券融资融券&#xff08;简称“融资融券”&#xff09;是一种股票出资办法&#xff0c;是指出资者经过融券生意和融资生意来进行股票出资。它在出资商场上具有重要的作用&#xff0c;因为经过这种办法&#xff0c;出资者能够使用假贷资金进行股票生意&#xff0c;能够进步出资…

腾讯云4核8G服务器选CVM还是轻量比较好?价格对比

腾讯云4核8G云服务器可以选择轻量应用服务器或CVM云服务器标准型S5实例&#xff0c;轻量4核8G12M服务器446元一年&#xff0c;CVM S5云服务器935元一年&#xff0c;相对于云服务器CVM&#xff0c;轻量应用服务器性价比更高&#xff0c;轻量服务器CPU和CVM有区别吗&#xff1f;性…

23062C++QTday4

仿照string类&#xff0c;完成myString 类 代码&#xff1a; #include <iostream> #include <cstring> using namespace std; class myString {private:char *str; //记录c风格的字符串int size; //记录字符串的实际长度public://无参构造my…

华为云云耀云服务器L实例评测 | 由于自己原因导致MySQL数据库被攻击 【更新中。。。】

目录 引出起因&#xff08;si因&#xff09;解决报错诶嘿&#xff0c;连上了 不出意外&#xff0c;就出意外了打开数据库what&#xff1f;&#xff1f;&#xff1f; 找华为云求助教训&#xff1a;备份教训&#xff1a;密码 解决1.改密码2.新建一个MySQL&#xff0c;密码设置复杂…

Android T 窗口层级其三 —— 层级结构树添加窗口

文章目录 序节点添加Task以DefaultTaskDisplayArea为父节点以Task为父节点 ActivityRecordWindowTokenWindowState以WindowToken为父节点以ActivityRecord为父节点 小结调用场景添加差异 流程分析添加log堆栈打印流程LauncherStatusBar 序 尚未添加窗口的层级结构树&#xff0…

前端该了解的网络知识

网络 前端开发需要了解的网络知识 URL URL(uniform resource locator,统一资源定位符)用于定位网络服务. URL是一个固定格式的字符串 它表达了: 从网络中哪台计算机(domain)中的哪个服务(port),获取服务器上资源的路径(path),以及要用什么样的协议通信(schema). 注意: 当…

探索Netty的FuturePromise

Netty—Future&Promise 一、JDK原生 Future二、Netty包下的 Future三、Promise1、使用Promise同步获取结果2、使用Promise异步获取结果.3、使用Promise同步获取异常 - sync & get4、使用Promise同步获取异常 - await5、使用Promise异步获取异常 在异步处理时&#xff0…

颜色代码对照表

颜色代码对照表 各颜色代码: 1 白色 #FFFFFF 2 红色 #FF0000 3 绿色 #00FF00 4 蓝色 #0000FF 5 牡丹红 #FF00FF 6 青色 #00FFFF 7 黄色 #FFFF00 8 黑色 #000000 9 海蓝 #70DB93 …

出行类APP商业化路径解决方案

当下市场主流的商业化路径和方法相比于之前区别不大&#xff0c;开发者们都是在现有商业化体系下&#xff0c;制定更加详细、优质的策略&#xff0c;以期获得更高利益。 出行类App用户结构分析 年龄层次&#xff1a;出行类App用户的年龄分布比较广泛&#xff0c;主要集中在20…

商品防伪查询溯源小程序开发源码

企业商家在销售商品的过程中被人仿冒产品以次充好损害公司名声和影响产品销量是很多企业的痛点。我们基于次帮助企业开发了一款商品防伪溯源查询小程序&#xff0c;解决企业痛点&#xff0c;酿造更好的经商坏境。 核心功能&#xff1a; 我们采用一物一码的逻辑&#xff0c;可…

【网络基础】——HTTPS

目录 HTTPS背景知识 HTTPS是什么&#xff1f; 加密解密 为什么要加密 常见的加密方式 对称加密 非对称加密 数据摘要&&数据指纹 数字签名 HTTPS工作过程探究 方案1&#xff1a;只使用对称加密 方案2&#xff1a;只使用非对称加密 方案3&#xff1a;双方…

day37 线程

一、线程安全 二、多线程并发的安全问题 当多个线程并发操作同一临界资源 由于线程切换实际不确定 导致操作顺序出现混乱 产生的程序bug 严重时出现系统瘫痪 临界资源 &#xff1a;操作该资源的完整流程同一时间只能被单一线程操作的资源 多线程并发会出现的各种问题、 如…