使用分布式锁解决IM聊天数据重复插入的问题

news2025/1/16 18:06:15

导航

  • 业务背景
  • 问题分析与定位
  • 探索可行的解决方案
    • 数据库层面处理——唯一索引
    • 应用程序层面处理——分布式锁
  • 分布式锁概述
    • 分布式锁需要具备哪些特性?
    • 分布式锁有哪些实现方式?
      • 基于数据库的实现方式
      • 基于Redisson实现方式
  • Redission介绍
    • 概述
    • 可重入锁
  • 基于Redisson解决方案
    • 方案梳理
    • Springboot集成Redisson
  • 结语
  • 参考

本文首发《使用分布式锁解决IM聊天数据重复插入的问题》

业务背景和问题

在IM聊天业务中,除了自建聊天服务器,构架闭环的咨询聊天,往往还需要接入三方的平台的IM流量。

这个就不得不去适配各种平台的推流方式。

在我们自建的IM聊天服务解决方案中,IM会话创建和消息的接收是两个独立模块(接口)。
这种设计方式从客户端层面就将两个流程分开且保证了顺序性,有效避免了一些不可预知的问题。

但是,三方流量平台的是通过消息推流的方式将流量投递给我们,我们必须在接收流量的过程中完成客户、会话、消息的创建。



如果所有消息是排队,一个一个执行,那么这个流程是没有问题的。

但是,我们发现三方推送消息的时候偶尔会发生推送同一客户的多条消息的情况,这种并发写入,导致数据重复写入。

这种情况下,就可能会导致新客户创建多次,对应的会话也会创建多个。



而且还会带来数据查询中偶尔出现selectOne的异常。

desc":"org.mybatis.spring.MyBatisSystemException: nested exception is org.apache.ibatis.exceptions.TooManyResultsException: Expected one result (or null) to be returned by selectOne(), but found: 2

在没有查明具体问题之前,我们在特定查询的时候增加了limit 1限制,原则上取最新的那一条。

问题分析与定位

对于聊天场景来说,这种脏数据的产生是不能容忍的。

为了找到问题的根本解决办法,我们开始专项排查。

我把代码走读了一遍,发现代码层面没有明显的bug。但是,从数据上来看大概率是消息并发投递导致。

为了证明这种猜想,我编写了一个测试用例来验证。

具体做法,就是写一个python脚本程序,模拟10个线程,每个线程都会调用消息接收的业务接口,并且每个消息的fromUsertoUser都是一样的。

核心思想就是同时推送给一个人多条消息。

经过验证,数据重复写入的问题复现了。并发请求原因已经实锤。

这里给个简单示意图,解释一下并发请求的流程。



可行性方案探索

我们自己也思考了一下,大致的解决方案有两种:

  • 数据层面解决
  • 应用程序层面解决
数据层面解决

这个很好理解,利用Mysql字段唯一索引阻止重复插入,这是数据库自己的机制。
但是,因为user表中tenantUserId字段最初就为设计唯一索引。

ALTER TABLE user ADD UNIQUE uk_tenant_user_id( tenantUserId );

一旦为tenantUserId列加上唯一索引后,当上述并发情况发生时,请求1和请求2中必然有一者会优先完成数据的插入操作,而另一者则会得到类似错误。因此,最终保证user表中只有一条tenantUserId=xxx的记录存在。

 Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'xxx' for key 'tenantUserId'\n##

经过评估,目前单表已经仅仅2000w数据。短时间内升级不太现实。
而且历史数据的修复也不是一个小工程。

应用程序层面解决

另一种解决的思路是我们不依赖底层的数据库来为我们提供唯一性的保障,而是靠应用程序自身的代码逻辑来避免并发冲突。

之所以我们会遇到重复插入数据的问题,是因为“检测数据是否已经存在”和“插入数据”两个动作被分割开来。由于这两个步骤不具备原子性,才导致两个不同的请求可以同时通过第一步的检测。如果我们能够把这两个动作合并为一个原子操作,就可以避免数据冲突了。这时候我们就需要通过加锁,来实现这个代码块的原子性。


考虑到我们的应用程序API是多机部署的,我们决定采用业界比较成熟的分布式锁方案。

分布式锁概述

分布式锁需要具备哪些特性?
  • 在分布式系统环境下,同一时间只有一台机器的一个线程可以获取到锁
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性
  • 具备锁失效机制,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
分布式锁实现主要有如下三种:
  • 基于数据库实现分布式锁
  • 基于Zookeeper实现分布式锁
  • 基于Redis实现分布式锁

每种的具体实现可以参考《什么是分布式锁?实现分布式锁的三种方式》。

除了以上三种分布式锁实现以外,还有一种是基于Redission实现方式。
因为我们业务接口是基于Springboot框架,所以查阅了相关资料我们选择一种Redission实现。

Redission介绍

概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

以下是Redisson的结构:

Redisson作为独立节点 可以用于独立执行其他节点发布到分布式执行服务 和 分布式调度任务服务 里的远程任务。



可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

Redisson同时还为分布式锁提供了异步执行的相关方法:

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore 对象.

关于Redisson的更多介绍请移步Redisson 中文文档

基于Redisson解决方案

在本案例中,我们采用了基于Redisson实现分布式锁的方式。

方案梳理

技术方案确定了,但是还是需要结合实际场景合理应用。
那么,我们在哪些环节加锁呢?



我们再次对消息接收处理流程进行梳理,在原来的基础上增加了分布式锁。

Springboot集成Redisson

pom.xml中引入redisson

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.34.1</version>
</dependency>  

yml文件中redis配置

  redis:
    enabled: true
    host: xxxx
    port: 6371
    password: xxx
    database: 2
    timeout: 10000
    connectionPoolSize: 15
    connectionMinimumIdleSize: 5

redissonConfig.java

@Configuration
@ConditionalOnExpression("${spring.redis.enabled}")
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.timeout}")
    private String timeout;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.database}")
    private int database;

    @Value("${spring.redis.connectionPoolSize}")
    private int connectionPoolSize;

    @Value("${spring.redis.connectionMinimumIdleSize}")
    private int connectionMinimumIdleSize;

    @Bean(name = "redissonClient")
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.setCodec(new StringCodec());
        SingleServerConfig singleServerConfig =
                config.useSingleServer()
                        .setAddress("redis://" + host + ":" + port)
                        .setDatabase(database)
                        .setConnectionPoolSize(connectionPoolSize)
                        .setConnectionMinimumIdleSize(connectionMinimumIdleSize)
                        .setTimeout(Integer.parseInt(timeout));
        if (StringUtils.isNotBlank(password)) {
            singleServerConfig.setPassword(password);
        }
        return Redisson.create(config);
    }
}

上面准备好之后,就可以在使用了。

核心代码实现
        //新创建增加分布式锁
        String mutex = StrUtil.format("im:lock:user:{}", createUserDto.getTenantUserId());
        RLock lock = redissonClient.getLock(mutex);
        boolean successLock = lock.tryLock();
        if (!successLock) {
            // 获取分布式锁失败
            log.info(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【getOrCreateUser】", JsonUtil.toJson(createUserDto)));
            throw new BizException("该顾客已经在创建中了", ResponseCodeEnum.GET_R_LOCK_FAIL.getCode());
        }
        //创建用户
        User visitor = new User();
        visitor.setUserName(createUserDto.getTenantUserId());
        //...
        //消息创建过程中,首次创建顾客、会话,
        //在获取锁失败的情况下,增加重试机制
        try {
            receiveMessage(inputDto);
        }catch (BizException ex)
        {
            log.error(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【receiveMessage】", ex));
            if(ex.getCode().equals(ResponseCodeEnum.GET_R_LOCK_FAIL.getCode())) {
                //重试一次
                Thread.sleep(1000);
                log.info(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【receiveMessage.retry】", JsonUtil.toJson(inputDto)));
                receiveMessage(inputDto);
            }
        }

Notes: 关于Springboot中如何使用Redisson,更加具体实现代码请移步《Spring Boot 实战纪实》,项目源码中可以查阅。

测试用例

确保写的代码是可调式的。-《对几次通宵加班发版的复盘和思考》

在这么多年的职业生涯中,我逐渐摸索出一个确保代码质量的笨方法——单步调试。

这里我们也写一个测试用例。具体是思路前面也提过,这里不再赘述。

import json
import requests
import time
import uuid
import threading

def receive_xhs_msg():

    try:
          #请求url
          url = """http://localhost:7071/api/message/receive"""

          # 增加请求头
          headers = {
            "Content-Type": "application/json; charset=UTF-8"
          }
          message_id=str(uuid.uuid4())
          print('message_id:'+message_id)
          userInfo={
                "header_image":"xxx.jpg",
                "nickname":"- ",
                "user_id":"63038d28000000001200d311"
          }
   
          payload={
                    "content":"6ED5KduMqTDJZ1ztw+ZPgw==~split~OMo7DD2gqsJqBafx9WKsZlnNNkcEYD4hLLPczczIFmr+YMtTB9Wz4ZI0MYCM4cF28kG7rfqnXdR9cRmamEJzHmKLfTmVxv5jzGUFVQOU00iimtunMAEJ4x76oJDrdAVUc4bJfV5zFLotz/Bm0WM9TADvD2cLhpHsVmaZRXaiJ96wMQgqx+K727l5S15jmMa5PiLqZqBO2q/G+WEkJSbfLQ==",
                    "from_user_id":"63038d28000000001200d311",
                    "intentCommentId":"",
                    "message_id":message_id,
                    "message_source":2,
                    "message_type":"HINT",
                    "timestamp":"1723268668573",
                    "to_user_id":"575d2c135e87e733f0162b88",
                    "user_info":[userInfo]
            }

          #转换成json
          getJson=json.dumps(payload)
          #构造发送请求
          response=requests.post(url=url,data=getJson,headers=headers)
          #打印响应数据
          print(response.text)
          time.sleep(1)
    except Exception as e:
          print('Error:',e)
    finally:
          print('执行完成')

if __name__ == '__main__':
      threads = []
      for _ in range(10):  # 循环创建10个线程
            t = threading.Thread(target=receive_xhs_msg)
            threads.append(t)
      for t in threads:  # 循环启动10个线程
            t.start()
            t.join()

结语

分布式锁在日常工作中应用广泛,比如接口防抖(防重复提交),并发处理等。

在近期的IM消息处理中,正好有了一次生动的实践。

一点浅浅的经验,分享给大家,希望能起到抛砖引玉的作用。

参考

  • Redisson 中文文档
  • 《什么是分布式锁?实现分布式锁的三种方式》
  • 《灵活运用分布式锁解决数据重复插入问题》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2062446.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32G474的HRTIM用作时基定时器

STM32G474的HRTIM由7个定时器组成&#xff0c;分别是主定时器&#xff0c;定时器A&#xff0c;定时器B&#xff0c;定时器C&#xff0c;定时器D&#xff0c;定时器E和定时器F&#xff0c;每个定时器均可单独配置&#xff0c;独立工作。每个定时器都有1个独立的计数器和4个独立的…

九泰智库 | 医械周刊- Vol.51

⚖️ 法规动态 白内障人工晶体类医用耗材集采落地&#xff0c;平均降幅60% 湖北省自7月10日起实施人工晶体类医用耗材集中带量采购政策&#xff0c;中选产品平均降幅达60%&#xff0c;显著减轻了患者经济负担。此举是国家组织医用耗材采购的一部分&#xff0c;旨在通过集中采购…

MES软件实施:解锁企业数字化转型的无限潜能

MES&#xff08;制造执行系统&#xff09;软件实施是企业数字化转型的重要一环&#xff0c;它确实能够解锁企业数字化转型的无限潜能。以下是对这一主题的详细分析&#xff1a; 一、MES软件的核心价值 集成与协同&#xff1a; MES软件能够集成企业内的各种生产数据和设备&…

企业市值排名3D可视化,重塑商业版图新维度

在这个数据驱动的时代&#xff0c;每一个数字背后都蕴藏着无限的可能与机遇。企业市值&#xff0c;作为衡量企业综合实力与市场认可度的关键指标&#xff0c;其动态变化不仅是投资者关注的焦点&#xff0c;也是全球商业竞争格局的晴雨表。 当枯燥的数据表格被转化为生动的3D场景…

如何选择需求跟踪管理软件?8款优质推荐

本文将介绍8款需求跟踪管理软件&#xff1a;PingCode、Worktile、得帆、协作云、火山引擎、Jira、VersionOne、YouTrack。 在面对众多需求跟踪管理软件时&#xff0c;每款软件都声称能提升效率、简化流程&#xff0c;但到底哪一款才真正适合你的团队&#xff1f;为了选择合适的…

Hadoop的概念

1.什么是大数据 数据体量巨大&#xff1a;数据量规模庞大&#xff0c;通常以PB&#xff08;拍字节&#xff09;或EB&#xff08;艾字节&#xff09;来衡量&#xff0c;远远超出了传统数据库和数据处理工具的处理能力。数据类型多样&#xff1a;大数据包括结构化数据、半结构化…

在控件graphicsView中实现绘图功能(二)

目录 前言&#xff1a;基础夯实&#xff1a;1.创建 QGraphicsScene 和 QGraphicsView2. 在 QGraphicsScene 中添加椭圆3. 渲染和显示4. 推荐学习本文之前查看的链接&#xff1a; 效果展示&#xff1a;实现功能&#xff1a;遇到问题&#xff1a;核心代码&#xff1a;仓库源码&am…

windows环境基于python 实现微信公众号文章推送

材料&#xff1a; 1、python 2.7 或者 python3.x 2、windows 可以通过 “python -m pip --version” 查看当前的pip 版本 E:\Downloads\newsInfo>python -m pip --version pip 20.3.4 from C:\Python27\lib\site-packages\pip (python 2.7) 3、windows 系统 制作&#xf…

基于STM32开发的智能家居安防系统

目录 引言环境准备工作 硬件准备软件安装与配置系统设计 系统架构硬件连接代码实现 系统初始化传感器数据采集与处理安防控制与报警机制Wi-Fi通信与远程监控应用场景 家庭安防系统办公室与商铺的安全监控常见问题及解决方案 常见问题解决方案结论 1. 引言 随着智能家居技术的…

【Hot100】LeetCode—226. 翻转二叉树

目录 1- 思路Queue 队列实现层序遍历 交换左右 2- 实现⭐226. 翻转二叉树——题解思路 3- ACM 实现 原题连接&#xff1a;226. 翻转二叉树 1- 思路 Queue 队列实现层序遍历 交换左右 1- 借助 Queue 实现层序遍历2- 实现左右交换方式 2- 实现 ⭐226. 翻转二叉树——题解思…

带你玩转小程序推广,实现短链接一键跳转

不知道各位有没有想过&#xff0c;短链接直接跳转到微信小程序到底该怎么操作呢&#xff1f;掌握这个小技能&#xff0c;能让你的推广效率大幅提升哦。今天就给大家分享一个全新方法&#xff0c;教你如何从短链接直接跳转到微信小程序&#xff0c;实现高效的一键式跨越。 一、…

源代码怎么进行加密?2024年10款源代码加密软件推荐

在软件开发中&#xff0c;源代码是企业的核心资产&#xff0c;其安全性直接关系到企业的竞争力和商业机密。为了防止源代码被未授权访问、复制或篡改&#xff0c;源代码加密成为了一种常见的安全措施。2024年&#xff0c;随着技术的发展&#xff0c;市场上出现了多种源代码加密…

笔记-系统规划与管理师-案例题-2023年-服务运营管理

【说明】 小李是跨国公司新任命的IT服务经理&#xff0c;帮助提升中国区总部的IT服务管理水平。中国区总部的运维管理体系运营了近三年&#xff0c;内外部环境发生了很多变化&#xff0c;其中: &#xff08;1&#xff09;内部变化包括团队组织结构调整、部分团队精简改为外包支…

纯前端导出excel插件pikaz-excel-js使用小结

最近项目有多个报表开发并前端导出为excel的需求&#xff0c;第一张报表用的是pikaz-excel-js插件&#xff0c;git地址为https://github.com/pikaz-18/pikaz-excel-js&#xff0c;网上文档虽然多&#xff0c;但很多都很基础&#xff0c;官方文档介绍也很简单&#xff0c;没有很…

搜维尔科技:‌Manus VR手套通过触觉反馈技术与机器人进行互动

‌Manus VR手套通过触觉反馈技术与机器人进行互动。‌这种技术允许用户通过手套与机器人进行复杂的动作遥操作和训练&#xff0c;使得用户能够通过手套的动作来控制机器人的运动&#xff0c;同时机器人执行的动作也可以通过手套的触觉反馈功能传达给用户&#xff0c;使用户能够…

线索分析2个要点分析:营销归因与市场ROI转化效果评估

1、营销归因 在复杂的大数据时代&#xff0c;消费者能接触的渠道、设备越来越多&#xff0c;营销活动分析也变得越来越复杂。 在当前的营销环境中&#xff0c;了解并优化营销策略是至关重要的。一个关键的部分是通过分析不同营销活动如何促成商机并赢得订单。而要实现这一目标…

秸秆焚烧自动监测摄像机

秸秆焚烧是一种常见的农业废弃物处理方式&#xff0c;但同时也会产生大量的空气污染物&#xff0c;对环境和人类健康造成威胁。为了监测和控制秸秆焚烧的情况&#xff0c;可以使用秸秆焚烧自动监测摄像机。秸秆焚烧自动监测摄像机 是一种结合了人工智能和机器视觉技术的智能设备…

Linux入门——04 gbd git

gbd命令行调试 默认情况下&#xff0c;GDB无法对现在发布的程序进行调试 debug&#xff08;能调试&#xff09;&&release&#xff08;不能调试&#xff09; linux下GCC或G生成软件默认是release的&#xff01; 1.debug模式 gcc -o mytest mytest.c -g 文件的体积不…

STM32学习记录-04-EXTI外部中断

1 中断系统 &#xff08;1&#xff09;中断&#xff1a;在主程序运行过程中&#xff0c;出现了特定的中断触发条件&#xff08;中断源&#xff09;&#xff0c;使得CPU暂停当前正在运行的程序&#xff0c;转而去处理中断程序&#xff0c;处理完成后又返回原来被暂停的位置继续…

在尝试了市面上90%的报表工具后,终于找到了这款免费万能的报表工具!

经常有朋友私信问我有“哪个报表工具好用易上手&#xff1f;”或者是“有哪些适合绝大多数普通职场人的万能报表工具&#xff1f;”等问题。 在这里我总结出大家在报表选择时最在意的三个要点。 一、挑选报表工具的重点 1&#xff09;低门槛上手难度&#xff1a;理想中的报表…