IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】

news2025/1/24 18:03:46

0前言

消息收发模型
在这里插入图片描述

在这里插入图片描述
第一张图是一个时序图,第二张图是一个标清楚步骤的流程图,更加清晰。消息的插入环节主要在2步。save部分。主要也是对这个部分就行消息幂等的操作。

前情提要:使用Redis发布 token 以及lua脚本来共同完成消息的幂等

目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期)】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】

1.我开源项目IM重复的原因

  • IM系统中有三个常见的指标。消息可靠(不丢消息)。就是消息不能重复(不重复)。保证消息的时序性(不乱序)。这三个指标非常重要。
  • 消息可靠主要通过报文协议等操作来完成。前面视频有一期讲过报文协议。目前主要采取上述方式去保证消息的可靠性。然后再保证消息可靠性的过程中,有一些需要重试的操作。可能会导致数据库多次插入。需要我们来保证一下消息的幂等。通俗的讲就是保证消息的不重复。
1.客户端会重复的发送消息

客户端是一个timer的机制。客户端a发送给b消息的时候,在0.5秒没有收到b的ack的时候会重发消息,重发三次还没有收到ack视为重发失败
//使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数

伪代码如下。用户在线并且不是重试消息的时候,添加到队列里面。

 if (res.params.online == true && res.params.isretry == "false") {
            state.queue.offer(state.tempSendMsg);
            //
            //使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数
            const result = await retry(fetchDataFn, 3, 1000, res.params.msgid);
            //三次之后消息还没有发送成功 提示消息发送失败
            if (result == false) {
              Toast("消息发送失败,请重新发送");
            }
          } else {
            console.log("【IM日志】 接受消息者没有登录或者是重试消息 ");
          }

进行重试的js代码

//重试的一个方法
export function retry(fn, maxRetry, timeout,msg) {
  return new Promise(async (resolve, reject) => {
    let retryCount = 0;
    let timer;
 
    const run = async () => {
      try {
        const result = await fn(msg);
        resolve(result);
      } catch (err) {
        if (retryCount < maxRetry) {
          retryCount++;
          clearTimeout(timer);
          timer = setTimeout(run, timeout);
        } else {
          reject(err);
        }
      }
    };
 
    timer = setTimeout(run, timeout);
  });
}
2.mq出现超时等的重试机制

参考上述逻辑图,消息落库的时候异步分发到了mq上面。rocketmq有超时重试机制,会自动重试。导致消息被多次消费。(明天补充个图片例子)

2.如何解决的幂等

为什么要解决幂等,什么情况下出现幂等(明天写);
使用redis做的幂等。redis做幂等其实有两种思路。

一种思路是我目前正在使用的防重 Token 令牌思路。另一种是下游传递唯一请求编号。主要说明防重token令牌的思路。其实差别就是一个redis里面的键被删除了。另一个没有删除。
防重token令牌
在这里插入图片描述
下游传递唯一请求编号如下
在这里插入图片描述

当客户端请求分布式id的时候将其存入redis。也就是获取一个唯一id。当进行消费消息的时候。先判断唯一id在不在。在的话删除redis中的唯一id并且进行业务操作。不再的话就不能进行业务操作来实现的幂等。
流程代码如下所示:

1.获取token以及存储token到redis中;
在loginUser 用户中心服务中

    @RequestMapping(value = "/api/segment/get/{key}")
    public GenericResponse getSegmentId(@PathVariable("key") String key) {
        String leafno = get(key, segmentService.getId(key));
        SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();
        Long add = opsForSet.add(RedisPrefix.LEAF_PERFIX, leafno);//往集合添加元素
        /**
         * 设置一个10分钟的有效期
         */
//        stringRedisTemplate.expire(RedisPrefix.LEAF_PERFIX,600, TimeUnit.SECONDS);
        return GenericResponse.response(ServiceError.NORMAL,leafno );
    }

我们使用了美团的分布式id来生成分布式id。
2.前台发送消息的时候携带上唯一id

const sendMsg2 = async () => {
      const { content, toUser } = state;
      const no = await getLeaf();
      let data = {
        // 1代表着私聊的意思
        type: 1,
        params: {
          msgid: no.content,
          toMessageId: toUser.openid,
          message: content,
          fileType: 0,
          isretry: false,
        },
      };
      if (state.current == 2) {
        data = {
          type: 9,
          params: {
            toMessageId: state.groupId,
            message: content,
            fileType: 0,
          },
        };
      }
      console.log(data);
      state.tempSendMsg = data;
      state.socketServe.send(data);
      state.recesiveAllMsg.push({
        type: "self",
        content: content,
      });
      state.content = "";
    };

这个是发送消息的操作
const no = await getLeaf();这行代码请求后端接口。然后构造消息体。
3.聊天服务(Netty)收到前台消息后 mq异步发送消息

 public void sendMessage(String topic ,ChannelHandlerContext ctx, String message, String toUser, String state, Boolean type, String msgid,String token) {
        MqMessage messageMQ = new MqMessage();
        messageMQ.setFromId(SessionUtils.getUser(ctx.channel()).getOpenid());
        messageMQ.setToId(toUser);
        messageMQ.setType(state);
        messageMQ.setInfoContent(message);
        messageMQ.setTime(new DateTime().toString());
        messageMQ.setState(type);
        messageMQ.setMsgid(msgid);
        messageMQ.setToken(token);
        messageDispatchService.sendForSave(topic,messageMQ);
    }

发送给保存的主题
4.业务模块(frist)消费消息

 @Override
    public void onMessage(String o) {
        String mqmsg =o;
        log.info("RocketMqConsumerService=====消费消息:"+mqmsg);
        //消息内容

        MqMessage message1 = JSON.parseObject(mqmsg, MqMessage.class);
        try {
            ChatDto chatDto = new ChatDto();
            chatDto.setContent(message1.getInfoContent());
            chatDto.setToOpenid(message1.getToId());
            chatDto.setGroup(message1.getState());
            //将msgid存储进去,方便后续进行update
            chatDto.setMsgId(message1.getMsgid());
            SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();
//            Boolean member = opsForSet.isMember(RedisPrefix.LEAF_PERFIX, message1.getMsgid());
            if( executeOperation(message1.getMsgid())){
//                Long remove = opsForSet.remove(RedisPrefix.LEAF_PERFIX, message1.getMsgid());//删除元
                if (message1.getState() !=null){
                    if(message1.getType().equals("onLine")){
                        /**
                         * 用户在线需要去推送一下
                         */
                        yanUserChatService.saveChat(message1.getFromId(),chatDto,1);
                        SendRequest send = buildSendRequest(message1);
                        //设置过滤应该有的token
                        RoseFeignConfig.token.set(message1.getToken());
                        nettyMqFeign.send(send);
                    }else {
                        /**
                         * 离线消息直接落库就链路就结束了
                         */
                        yanUserChatService.saveChat(message1.getFromId(),chatDto,0);
                    }
                }
            }
        }catch (Exception e){
            //失败的话需要把redis的这个消息还回去.
            SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();
            Long add = opsForSet.add(RedisPrefix.LEAF_PERFIX,  message1.getMsgid());//往集合添加元素
            log.error("consumeMsg 消费mq消息失败.",e);
            // 处理失败,抛出异常,消息会根据重试策略稍后重新消费
            throw new RuntimeException("处理消息时发生错误,消息将被重新消费。");
        }

    }

lua表达式
目前使用redis的类型是set,键是yan_leaf

    /**
     * 幂等的方法,判断list存不存在。存在的话直接删除,下次进来就不存在了。
     * @param token
     * @return
     */
    public boolean executeOperation(String token) {
        // Lua脚本
        String script = "if redis.call('sismember', KEYS[1], ARGV[1]) == 1 then return redis.call('srem', KEYS[1], ARGV[1]) else return 0 end";
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);

        // 执行Lua脚本
        Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(RedisPrefix.LEAF_PERFIX), token);

        // 根据Lua脚本执行结果判断操作是否执行
        return result != null && result > 0;
    }

通过这个lua防止并发请求进来导致幂等失败

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1446160.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构(4) 链表(链式存储)

链表&#xff08;链式存储&#xff09; 单链表定义基本操作的实现单链表的插入按位序插入指定节点的前插指定节点的后插 单链表的删除 小结 单链表 定义 顺序表优点:可随机存取&#xff0c;存储密度高&#xff0c;缺点:要求大片连续空间&#xff0c;改变容量不方便。 单链表优…

winprop二次开发

winprop二次开发 前言工具1——整合多个天线结果用途代码实现 工具2——wallman辅助工具 前言 工作需求&#xff0c;对该软件进行简单地二次开发&#xff0c;都是一些挺简单的代码&#xff0c;单纯是为了上传之后将其从本地删除 工具1——整合多个天线结果 用途 winprop最终…

MIT-Missing Semester_Topic 3:Editors (Vim) 练习题

文章目录 练习一练习二练习三练习四练习五练习六练习七练习八 本 Topic 的 MIT 讲解网页&#xff08;练习题未给解答&#xff09; 练习一 自行完成 vimtutor。vimtutor 是 Vim 本身附带的一个入门教程&#xff0c;在 shell 中直接输入 vimtutor 便能运行。注意该教程在 8024 大…

第2讲投票系统后端架构搭建

创建项目时&#xff0c;随机选择一个&#xff0c;后面会生成配置properties文件 生成文件 maven-3.3.3 设置阿里云镜像 <?xml version"1.0" encoding"UTF-8"?><!-- Licensed to the Apache Software Foundation (ASF) under one or more cont…

助力智能化农田作物除草,基于轻量级YOLOv8n开发构建农田作物场景下玉米苗、杂草检测识别分析系统

在我们前面的系列博文中&#xff0c;关于田间作物场景下的作物、杂草检测已经有过相关的开发实践了&#xff0c;结合智能化的设备可以实现只能除草等操作&#xff0c;玉米作物场景下的杂草检测我们则少有涉及&#xff0c;这里本文的主要目的就是想要基于最新的YOLOv8下最轻量级…

【开源】JAVA+Vue.js实现高校学院网站

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学院院系模块2.2 竞赛报名模块2.3 教育教学模块2.4 招生就业模块2.5 实时信息模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 学院院系表3.2.2 竞赛报名表3.2.3 教育教学表3.2.4 招生就业表3.2.5 实时信息表 四、系…

Kafka 入门介绍

目录 一. 前言 二. 使用场景 三. 分布式的流平台 四. Kafka 的基本术语 4.1. 主题和日志 &#xff08;Topic 和 Log&#xff09; 4.2. 分布式&#xff08;Distribution&#xff09; 4.3. 异地数据同步技术&#xff08;Geo-Replication&#xff09; 4.4. 生产者&#xf…

2.2-学成在线内容管理之课程分类查询+新增课程

文章目录 内容管理模块4 课程分类查询4.1 需求分析4.2 接口定义4.3 接口开发4.3.1 树型表查询4.3.2 开发Mapper 4.4 接口测试4.4.1 接口层代码完善4.4.2 测试接口 5 新增课程5.1 需求分析5.1.1 业务流程4.1.2 数据模型 5.2 接口定义5.3 接口开发5.3.1 保存课程基本信息5.3.2 保…

ZigBee学习——BDB

✨本博客参考了善学坊的教程&#xff0c;并总结了在实现过程中遇到的问题。 善学坊官网 文章目录 一、BDB简介二、BDB Commissioning Modes2.1 Network Steering2.2 Network Formation2.3 Finding and Binding&#xff08;F & B&#xff09;2.4 Touchlink 三、BDB Commissi…

【linux系统体验】-ubuntu简易折腾

ubuntu 一、终端美化二、桌面美化2.1 插件安装2.2 主题和图标2.3 美化配置 三、常用命令 以后看不看不重要&#xff0c;咱就是想记点儿东西。一、终端美化 安装oh my posh&#xff0c;参考链接&#xff1a;Linux 终端美化 1、安装字体 oh my posh美化工具可以使用合适的字体&a…

深入浅出CChart 每日一课——红花当然配绿叶,CChart辅助图形绘制

各位同学&#xff0c;好久不见&#xff0c;我可想死你们了&#xff01;&#xff01;&#xff01;咦&#xff0c;那位不是巩叔吗&#xff1f;不好意思&#xff0c;侵权了&#xff0c;请多担待_。 前面的课程呢&#xff0c;拓展的内容比较多&#xff0c;最近笨笨想聚焦在CChart本…

微服务多级缓存

多级缓存 1.什么是多级缓存 传统的缓存策略一般是请求到达Tomcat后&#xff0c;先查询Redis&#xff0c;如果未命中则查询数据库&#xff0c;如图&#xff1a; 存在下面的问题&#xff1a; •请求要经过Tomcat处理&#xff0c;Tomcat的性能成为整个系统的瓶颈 •Redis缓存…

Structured Streaming

目录 一、概述 &#xff08;一&#xff09;基本概念 &#xff08;二&#xff09;两种处理模型 &#xff08;三&#xff09;Structured Streaming和Spark SQL、Spark Streaming关系 二、编写Structured Streaming程序的基本步骤 &#xff08;一&#xff09;实现步骤 &…

jsp计算机线上教学系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 计算机线上教学系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5…

WebSocketServer方法里注入不了其他类

请直接看原文: WebSocketServer无法注入其他对象的问题 - 知乎 (zhihu.com) WebSocket服务无法使用自动注入解决方法_websocket sever不可以直接注入吧-CSDN博客 ------------------------------------------------------------------------------------------------------…

2.11日学习打卡----初学RocketMQ(二)

2.11日学习打卡 一. RocketMQ整合springboot 首先配置pom.xml文件 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>annotationProcessor</scope></dependency><dependency>…

Java图形化界面编程——处理位图 笔记

2.8.3 处理位图 ​ 如果仅仅绘制一些简单的几何图形&#xff0c;程序的图形效果依然比较单调 。 AWT 也允许在组件上绘制位图&#xff0c; Graphics 提供了 drawlmage() 方法用于绘制位图&#xff0c;该方法需要一个Image参数一一代表位图&#xff0c;通过该方法就可 以绘制出…

苹果Mac键盘如何将 F1 到 F12 取消按Fn

苹果电脑安装了Win10操作系统之后&#xff0c;F1到F12用不了怎么办的解决方法。本文将介绍一些解决方法&#xff0c;帮助您解决无法使用F1到F12功能键的问题。 使用 Mac系统的人都知道&#xff0c;Mac系统默认是没有开启 F1-F12 的使用的&#xff0c;平时我们使用的系统都可以使…

【C语言】实现双向链表

目录 &#xff08;一&#xff09;头文件 &#xff08;二&#xff09; 功能实现 &#xff08;1&#xff09;初始化 &#xff08;2&#xff09;打印链表 &#xff08;3&#xff09; 头插与头删 &#xff08;4&#xff09;尾插与尾删 &#xff08;5&#xff09;指定位置之后…