mq消费并发排队及幂等机制实现

news2025/1/15 20:06:18

一,解决什么问题及实现方案

主要为了解决并发访问排队执行及重复下发保证幂等的问题

实现技术方案

如何实现并发访问时,请求排队执行方法。@Redssion

如何实现MQ重复下发时,保证幂等。@redis

先插入一个分布式锁的话题,如下,使用时先想下,有没有共享资源,保护什么?再想一下mq消费,是否存在这样的问题,需要用什么去解决这些问题。不是为了用而用一个技术,应该从问题本身出发,再找技术方案。

当并发去读写一个【共享资源】的时候,我们为了保证数据的正确,需要控制同一时刻只有一个线程访问。

分布式锁就是用来控制同一时刻,只有一个 JVM 进程中的一个线程可以访问被保护的资源

二,redisson实现大概逻辑如下:

RLock lock = redissonClient.getLock(lockKey);

if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {
业务逻辑
}

} finally {
    lock.unlock();
}

提出以下疑问

1.不同lockKey获取的锁是否相同

2.相同lockKey,多次获取锁,是否相同

3.加锁粒度是什么

相同的key与不同的key,是否都可以实现并发访问,如何保证对线程队排执行

4.什么时候会加锁失败,如何解决问题的

5.定义不同key意义是什么,为什么不是相同key

6.多线程排队执行,变成串行执行,是否会影响执行效率,怎么解决效率问题

7.如果多线程排查,线程池在哪里,可以有多少线程排查,多少可以在队列中

8.与分布式锁概念比较

猜测

1.不同

2.相同

3.相同的key可以实现多线程排队执行,不同key不知道,参考

Redisson分布式锁入门使用(可重入锁(lock))_redissonclient.getlock_Upstream LV@菜哥的博客-CSDN博客

4.加锁失败,只是暂时未加锁成功,在排队,等前面的锁释放,就加锁成功了

5.相同的key,多线程并发访问,会排队执行。不同的key难道会并发访问?

那加锁的意义是什么?它不就是一个分布式锁吗,控制的粒度,如果是MQ key的话,没有出现MQ重复下发的情况下,其实相当于没有控制,就是允许并发访问

是否可以理解为:不同key就是不同锁,既然是不同锁,就可以各自加锁成功,也就不用排队,只有key相同,同一把锁时,才需要排队。那mq下发基本就是并发访问。

6.参考5,如果穿行执行,肯定影响效率,待确认。

7.不知道

Redisson 实现分布式锁原理分析 - 知乎

这篇文章说了加锁的源码,跟之前排队,存在线程池理解不对。

先看下这个业务方法

 if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {

这个方法,是接口Lock的方法

实现

现在回想一下,tryLock中时间参数是什么含义,对应下面waitTime即该线程的等待时间,

leaseTime这个参数,上面默认-1,这个时间是看门狗默认的过期时间。

注意区分一个等待时间一个key过期时间。

       注意:从以上源码我们看到 leaseTime 必须是 -1 才会开启 Watch Dog 机制,也就是如果你想开启 Watch Dog 机制必须使用默认的加锁时间为 30s。如果你自己自定义时间,超过这个时间,锁就会自定释放,并不会延长。

怎么手动设置过期时间?

通过这个方法

lock.lock(1,TimeUnit.SECONDS);
也就是说下吗两个方法没有设置过期时间,的默认leaseTime = -1,实现了看门口机制。
lock.lock();
if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {

如何等待获取锁的呢?

第一个持有锁的线程后,其他线程进来会订阅释放锁的消息,当持有锁线程释放锁,会发布消息,其他持有锁的会订阅消息,尝试获得锁。总体通过while死循环来获得锁,可以试想一下,不会一直无条件的循环尝试获取锁,这样效率太低。详细参考上文与下文

Redis进阶- Redisson分布式锁实现原理及源码解析-腾讯云开发者社区-腾讯云

7.1多线程不还是存在线程池吗?

8.是否可以理解为就是分布式锁,分布式锁如果并发访问(key相同),第二个请求是不会执行方法的,这里是会排队的,还是会执行,有区别的。

相当于这里解决的还是并发时重复的问题,排队访问,所以还需要通过redis校验是否重复访问。

假如不是重复下发不是并发场景,有时间间隔,还是需要redis校验是否重复下发。

那redission就是控制重复消息排队的问题了,且真正控制重复下发的是redis,那是否可以直接把

redisson直接去掉,只用redis控制重复消费。

因为key不同,redission也就无法控制并发消费。

且当key相同时,是排队的,也无法实现分布式锁的目的。

分布式锁概念是什么:

1、互斥

在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。

再想一下,同一时刻只能有一个线程获得锁,这是否有前提,key要相同,还是与key无关?

@这里有一个前提,key相同

参考文档

【精选】Redis:Redisson分布式锁的使用(推荐使用)_redisson分布式锁使用_穿城大饼的博客-CSDN博客

该文章说明了加锁的详细过程,可得出结论,key不同可以加锁成功,mq下发如果key是不同的,那么每次都可以加锁成功,即是并发处理的,粒度较小,符合高效率,高并发的要求。

那么他的意义是什么,@上面第8点的分析,好像可以去掉。

@那么控制重复消费的任务完全落在了redis了,过期时间很重要,设置一个合理的过期时间,控制这段时间的重复消费问题。如果过了这段时间还会重复,要么不会,要么给出原因。

继续提问

9.或者说mq体现不了,分布式锁的意思,场景不适合,哪些场景符合,请举例

10.mq高并发访问,会不会存在问题,比如更新失败,共享资源异常

三,解决redssion实现机制,参考如下

Redisson分布式锁入门使用(可重入锁(lock))_redissonclient.getlock_Upstream LV@菜哥的博客-CSDN博客

四,关于环绕通知原理,如何控制方法的执行,参考如下

Spring AOP 中的 @Around 通知执行原理_aop around 控制方法执行-CSDN博客

五,实现代码

结合以上知识,在分析代码

package com.yonghui.yh.rme.srm.ordercenter.service.annotation;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yonghui.redis.utils.RedisUtils;
import com.yonghui.yh.rme.srm.ordercenter.common.enums.RedisEnum;

import lombok.extern.slf4j.Slf4j;

/**
 * 解决什么问题
 * 1,控制并发访问,实现多线程并发访问时,排队
 * 2,幂等,防止MQ重复消费,不通过反射调用(joinPoint.proceed()),不执行原方法
 */
@Aspect
@Component
@Slf4j
public class MqConsumerAspect {

    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisUtils redisUtils;


    @Pointcut("@annotation(com.yonghui.yh.rme.srm.ordercenter.service.annotation.MqConsumer)")
    public void point(){}

    /**
     * @Around的作用
     * 既可以在目标方法之前织入增强动作,也可以在执行目标方法之后织入增强动作;
     *
     * 可以决定目标方法在什么时候执行,如何执行,甚至可以完全阻止目标目标方法的执行;
     *
     * 可以改变执行目标方法的参数值,也可以改变执行目标方法之后的返回值; 当需要改变目标方法的返回值时,只能使用Around方法;
     *
     * 虽然Around功能强大,但通常需要在线程安全的环境下使用。因此,如果使用普通的Before、AfterReturing增强方法就可以解决的事情,就没有必要使用Around增强处理了。
     *
     * 注解方式:如果需要对某一方法进行增强,只需要在相应的方法上添加上自定义注解即可
     * @param joinPoint
     * @return
     * @throws Throwable
     */
    @Around("point()")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
        Object o = joinPoint.getArgs()[0];
        JSONObject jsonObject;
        if (o instanceof String) {
            jsonObject = JSON.parseObject(o.toString());
        } else {
            jsonObject = JSON.parseObject(JSON.toJSONString(o));
        }
        String messageKey = jsonObject.getString("messageKey");
        if (StringUtils.isBlank(messageKey)) {
            // 方法是否会继续执行?
            // @ 答: 会执行 //利用反射调用目标方法,就是method.invoke() proceed = joinPoint.proceed(args);
            log.info("MqConsumerAspect mq消息消费 messageKey 为空");
            return joinPoint.proceed();
        }
        // 虽然Around功能强大,但通常需要在线程安全的环境下使用。这是使用Lock的原因,保证现在安全,多线程排队访问。
        String lockKey = RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.toKey(messageKey);
        RLock lock = redissonClient.getLock(lockKey);
        try {
            if (lock.tryLock(RedisEnum.ORDER_TRACE_MQ_LOCK_KEY.expired, TimeUnit.SECONDS)) {

                // 并发请求,排队等待,在这里写能控制吗? @应该能,决定方法是否执行,调用joinPoint.proceed()方法
                // @ 所以会排队
                // @ 所以,重复消费,返回true的结果,相当于,原方法直接返回true
                String key = RedisEnum.ORDER_TRACE_MQ_REPEAT_CONSUMER_KEY.toKey(messageKey);
                // 幂等,这里什么用,能控制方法幂等,不再执行吗?还是只是记录重复记录。
                if (redisUtils.exists(key)) {
                    log.info("MqConsumerAspect messageKey={} 重复消费",key);
                    return true;
                }

                // 这里只是记录消费完成日志?
                Object proceed = joinPoint.proceed();
                if (proceed.equals(Boolean.TRUE)) {
                    log.info("MqConsumerAspect messageKey={} 消费完成",key);
                    redisUtils.set(key, "1", RedisEnum.ORDER_TRACE_MQ_REPEAT_CONSUMER_KEY.expired);
                }
                return proceed;
            }
            // 这里排队,直接返回false为什么?todo,原方法没有执行,这里直接返回false,代表原方法返回false?
            return false;
            // 上面返回true / proceed / false,有什么影响?
        } catch (Exception e) {
            log.error("MqConsumerAspect error param={}", JSON.toJSONString(o), e);
            throw e;
        } finally {
            lock.unlock();
        }
    }

}

参考文档

Redis 分布式锁的正确实现原理演化历程与 Redission 实战总结 - 知乎

这篇文章说明了红锁redLock,不同方法区别,是否有看门狗机制,并通过举例说明分布式锁应用场景。

Redisson 实现分布式锁原理分析 - 知乎

这个文章,通过源码讲解了如何发布订阅的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1140893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode 740.删除并获得点数---->打家劫舍

前言:简单写写自己对这道题的拙见,如有意见或者建议可以联系笔者owo 首先,看看完整题目描述: 给你一个整数数组 nums ,你可以对它进行一些操作。 每次操作中,选择任意一个 nums[i] ,删除它并获…

归并排序(java)

大家好我是苏麟 , 今天说说归并排序 . 归并排序 递归 正式学习归并排序之前,我们得先学习一下递归算法。 定义: 定义方法时,在方法内部调用方法本身,称之为递归. public void show(){System.out.println("aaaa")…

SEACALL海外呼叫中心系统的优势包括

SEACALL海外呼叫中心系统的优势包括 封卡封号问题解决 海外呼叫中心系统通过API开放平台能力,定制电话营销系统,提供多项功能如自动拨打、智能应答、真人语音交互等,帮助企业克服员工离职率高、客户资源流失严重等挑战。 - 高级管理者操控 …

《实现领域驱动设计》

DDD入门 1.1 DDD是什么? DDD是一种软件开发方法 DDD将领域专家和开发人员聚集到一起,开发的软件能够反映出领域专家的思维模型。目标是:交付最具业务价值的软件。DDD关注业务战略:指引我们如何实现面向服务架构(ser…

开题报告怎么写?-案例+模板保姆级)

以前导师让们带本科生开题报告,我深知开题报告在学术研究中的重要性。一个出色的开题报告能够展示学生的科学思维、研究能力和创新潜力。 在本篇博客中,我将为大家详细介绍如何撰写史上最强开题报告。 将从课题的 科学意义国内外研究概况和发展趋势应…

SourceTree 使用

如何拉取远程仓库?如何拉去远程分支?如何创建本地分支?如何删除本地分支?如何删除远端分支? 删除了远程分支,如果本地还有此分支,那么是可以通过推送本地分支来还原远端分支。如何合并本地分支&…

2023中国计算机大会:蚂蚁集团连发两支百万级科研基金

10月26日,中国计算机学会(CCF)主办的第二十届中国计算机大会(CNCC2023)在沈阳举行。在“CCF-蚂蚁科研基金及产学研合作交流活动”上,蚂蚁集团发布了2023年度“CCF-蚂蚁科研基金”绿色计算及隐私计算两支百万级专项基金&#xff0c…

动态代理IP怎么设置?动态代理IP有哪些应用场景?

动态代理IP是指代理服务器会根据实际IP地址的变化而变化,可以帮助用户隐藏真实的IP地址,同时可以在不同的网络环境下使用不同的代理IP地址。下面是动态代理IP的设置和应用场景: 一、动态代理IP怎么设置? 1. 手动设置代理IP地址 用…

基于Arduino的物流分拣控制系统设计

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 技术交流认准下方 CSDN 官方提供的联系方式 文章目录 概要 一、控制系统设计1.1系统方案1.2 系统工作原理1.3方案设计1.3.1快递检测电路方案设计1.3.2控制电路方案设计 二、硬件…

【Java基础】File类与IO流

File类与IO流 文章目录 File类与IO流1. java.io.File类的使用1.1 概述1.2 构造器1.3 作用 2. IO流原理及流的分类2.1 流的分类2.3 流的API 3. 节点流之一:FileReader\FileWriter3.1 Reader与Writer3.3 关于flush(刷新) 4. 节点流之二&#xf…

GPIO常见名词——推挽、开漏、浮空、上拉、下拉、高阻态

🙌秋名山码民的主页 😂oi退役选手,Java、大数据、单片机、IoT均有所涉猎,热爱技术,技术无罪 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 获取源码,添加WX 目录 1、前言…

VTK OrientationMarker 方向 三维坐标系 相机坐标轴 自定义坐标轴

本文 以 Python 语言开发 我们在做三维软件开发时,经常会用到相机坐标轴,来指示当前空间位置; 坐标轴效果: 相机方向坐标轴 Cube 正方体坐标轴 自定义坐标轴: Code: Axes def main():colors vtkNamedC…

以太网通讯与485通讯哪个好?

随着工业自动化、物联网和信息技术的快速发展,数据传输成为了各个领域越来越关注的问题。在众多通讯技术中,以太网通讯和485通讯被广泛应用于各种场景。那么,这两种通讯技术究竟哪个更好呢?接下来,小编就来为大家一一讲…

使用Terraform管理已经存在的kubernates和默认的节点池

背景: 通过terraform resource "alicloud_cs_managed_kubernetes" "k8s" {...}创建集群时,会产生一个默认的节点池default-nodepool,但是如何去修改这个默认节点池的信息呢? 解决思路: 因为Ter…

LLM系列-大模型技术汇总

LLM系列-大模型技术汇总 1. 大模型技术汇总-参数高效迁移学习方法2. 千亿模型并行训练技术2.1. 数据并行(Data Parallelism,DP)2.2. 模型并行(Model Parallelism,MP)2.2.1. 流水线并行(Pipeline…

Ubuntu16.04 python matplotlib 输出轴标签出现中文乱码

问题:坐标轴打印中文时,显示会乱码 import matplotlib.pyplot as plt plt.ylabel(时间刻度)原因:matplotlib里面没有中文字体解决方法:下载SimHei字体,快捷方法是使用everything直接在windows搜索simhei.ttf&#xff…

【0230】PG内核底层事务(transaction)实现原理之基础篇

1. 事务上层 事务是如何工作? 接下来让我们通过gdb方式来跟踪一下PG内核中事务的底层工作原理。 (1)psql登录PG服务(2)psql登录界面输入:begin(3)gdb跟踪backend进程读取用户的输入,并进入exec_simple_query()函数函数exec_simple_query()的实现如下: static void …

JAVA实现学生日常行为评分管理系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.2.1 登录注册模块2.2.2 用户管理模块2.2.3 评分项目模块2.2.4 评分数据模块2.2.5 数据字典模块 2.3 可行性设计2.4 用例设计2.5 数据库设计2.5.1 整体 E-R 图2.5.2 用户2.5.3 评分项目2.5.4 评分数据2.5.…

PyTorch入门教学——torchvision中数据集的使用

1、torchvision.datasets datasets是torchvision工具集中的一个工具。可以理解为调用官方数据集的一种方式,其中有很多开源的数据集,可供我们学习使用。datasets官网:Datasets — Torchvision 0.16 documentation (pytorch.org) 2、使用 …

【漏洞复现】用友OA用户信息泄露

漏洞描述 通过该接口可下载oa用户信息文件 免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危害国家安全、荣誉和利益,未经授权请勿利用文章中的技…