MQ专题:顺序消息落地方案

news2024/11/14 3:26:40

一、什么是顺序消息

投递消息的顺序和消费消息的顺序一致。

比如生产者按顺序投递了1/2/3/4/5 这 5 条消息,那么消费的时候也必须按照1到5的顺序消费这些消息。

二、顺序消息如何实现?(2种方案)

  1. 方案1:生产者串行发送+同一个队列+消费者单线程消费
  2. 方案2:生产方消息带上连续递增编号+同一个队列+消费按编号顺序消费消息

2.1 方案1:生产者串行发送+同一个队列+消费者单线程消费

2.1.1 生产者串行发送消息

  • 使用分布式锁,让消息投递串行执行
  • 确保顺序消息到达同一个队列

2.1.2 消费者单线程消费

  • 消费者这边只能有一个线程,拉取一个消费一个,消费完成后再拉取下一个消费

2.2 方案2:生产方消息带上连续递增编号+同一个队列+消费按编号顺序消费消息

2.2.1 生产方消息带上连续递增编号

  • 顺序消息携带连续递增的编号,从1开始,连续递增,比如发送了3条消息,编号分别是1、2、3,后面再投递消息,编号就从4开始了
  • 确保顺序消息到达同一个队列

2.2.2 消费方按照编号顺序消费消息

  • 消费方需要记录消息消费的位置:当前轮到哪个编号的消息了
  • 收到消息后,将消息的编号和当前消费的位置对比下,是不是轮着这条消息消费了,如果是则进行消费,如果不是,则排队等待,等待前一个到达后,且消费成功后,将自己唤醒进行消费

这里举个例子,如下

  • 生产者发送了编号为看1、2、3 的3条消息

  • 到达的消费端顺序刚好相反,3先到,发现没有轮到自己,会进行排队

  • 然后2到了,发现也没有轮到自己,也会排队

  • 然后过了一会1到了,发现轮到自己了,然后1被消费了

  • 1消费后,会唤醒下一个编号的2进行消费

  • 2消费完了,会唤醒下一个编号的3进行消费。

本文我们会落地方案2。

三、代码

-- 创建订单表
drop table if exists t_order_lesson034;
create table if not exists t_order_lesson034
(
    id    varchar(32)    not null primary key comment '订单id',
    goods varchar(100)   not null comment '商品',
    price decimal(12, 2) comment '订单金额'
) comment '订单表';

-- 创建本地消息表
drop table if exists t_msg_lesson034;
create table if not exists t_msg_lesson034
(
    id               varchar(32) not null primary key comment '消息id',
    exchange         varchar(100) comment '交换机',
    routing_key      varchar(100) comment '路由key',
    body_json        text        not null comment '消息体,json格式',
    status           smallint    not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',
    expect_send_time datetime    not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',
    actual_send_time datetime comment '消息实际投递时间',
    create_time      datetime comment '创建时间',
    fail_msg         text comment 'status=2 时,记录消息投递失败的原因',
    fail_count       int         not null default 0 comment '已投递失败次数',
    send_retry       smallint    not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',
    next_retry_time  datetime comment '投递失败后,下次重试时间',
    update_time      datetime comment '最近更新时间',
    key idx_status (status)
) comment '本地消息表';


-- 创建消息和消费者关联表
drop table if exists t_msg_consume_lesson034;
create table if not exists t_msg_consume_lesson034
(
    id              varchar(32)  not null primary key comment '消息id',
    producer        varchar(100) not null comment '生产者名称',
    producer_bus_id varchar(100) not null comment '生产者这边消息的唯一标识',
    consumer_class_name        varchar(300) not null comment '消费者完整类名',
    queue_name      varchar(100) not null comment '队列名称',
    body_json       text         not null comment '消息体,json格式',
    status          smallint     not null default 0 comment '消息状态,0:待消费,1:消费成功,2:消费失败',
    create_time     datetime comment '创建时间',
    fail_msg        text comment 'status=2 时,记录消息消费失败的原因',
    fail_count      int          not null default 0 comment '已投递失败次数',
    consume_retry   smallint     not null default 1 comment '消费失败后,是否还需要重试?1:是,0:否',
    next_retry_time datetime comment '投递失败后,下次重试时间',
    update_time     datetime comment '最近更新时间',
    key idx_status (status),
    unique uq_msg (producer, producer_bus_id, consumer_class_name)
) comment '消息和消费者关联表';

drop table if exists t_msg_consume_log_lesson034;
create table if not exists t_msg_consume_log_lesson034
(
    id              varchar(32)  not null primary key comment '消息id',
    msg_consume_id        varchar(32) not null comment '消息和消费者关联记录',
    status          smallint     not null default 0 comment '消费状态,1:消费成功,2:消费失败',
    create_time     datetime comment '创建时间',
    fail_msg        text comment 'status=2 时,记录消息消费失败的原因',
    key idx_msg_consume_id (msg_consume_id)
) comment '消息消费日志';

-- 幂等辅助表
drop table if exists t_idempotent_lesson034;
create table if not exists t_idempotent_lesson034
(
    id             varchar(50) primary key comment 'id,主键',
    idempotent_key varchar(500) not null comment '需要确保幂等的key',
    unique key uq_idempotent_key (idempotent_key)
) comment '幂等辅助表';

-- 顺序消息编号生成器
drop table if exists t_sequential_msg_number_generator_lesson034;
create table if not exists t_sequential_msg_number_generator_lesson034
(
    id        varchar(50) primary key comment 'id,主键',
    group_id  varchar(256) not null comment '组id',
    numbering bigint       not null comment '消息编号',
    version   bigint       not null default 0 comment '版本号,每次更新+1',
    unique key uq_group_id (group_id)
) comment '顺序消息排队表';

-- 顺序消息消费信息表,(group_id、queue_name)中的消息消费到哪里了?
drop table if exists t_sequential_msg_consume_position_lesson034;
create table if not exists t_sequential_msg_consume_position_lesson034
(
    id         varchar(50) primary key comment 'id,主键',
    group_id   varchar(256) not null comment '组id',
    queue_name varchar(100) not null comment '队列名称',
    consume_numbering  bigint   default 0   not null comment '当前消费位置的编号',
    version   bigint       not null default 0 comment '版本号,每次更新+1',
    unique key uq_group_queue (group_id, queue_name)
) comment '顺序消息消费信息表';


-- 顺序消息排队表
drop table if exists t_sequential_msg_queue_lesson034;
create table if not exists t_sequential_msg_queue_lesson034
(
    id          varchar(50) primary key comment 'id,主键',
    group_id    varchar(256) not null comment '组id',
    numbering   bigint       not null comment '消息编号',
    queue_name  varchar(100) not null comment '队列名称',
    msg_json    text         not null comment '消息json格式',
    create_time datetime comment '创建时间',
    unique key uq_group_number_queue (group_id, numbering, queue_name)
) comment '顺序消息排队表';

3.1 发送顺序消息

如下模拟发送订单相关的5条顺序消息

com.itsoku.lesson034.controller.TestController#sendSequential

@PostMapping("/sendSequential")
public Result<Void> sendSequential() {
    String orderId = IdUtil.fastSimpleUUID();

    List<String> list = Arrays.asList("订单创建消息",
            "订单支付消息",
            "订单已发货",
            "买家确认收货",
            "订单已完成");
    for (String type : list) {
        msgSender.sendSequentialWithBody(orderId,
                RabbitMQConfiguration.Order.EXCHANGE,
                RabbitMQConfiguration.Order.ROUTING_KEY,
                OrderMsg.builder().orderId(orderId).type(type).build());

    }
    return ResultUtils.success();
}

在这里插入图片描述

3.2 消息按顺序消费

3.2.1 消费者拉取消息时,先把消息连同顺序的编号保存在表t_sequential_msg_queue里

在这里插入图片描述

3.2.2 从t_sequential_msg_queue表里取出消息进行消费
在这里插入图片描述

有个疑问:

  1. 为什么加锁失败要去触发重试?比如消费者1,2,3,4,5分别拿到了编号为1,2,3,4,5的消息,1,2,3,4,5都已经入库了,这时候只需要一个加锁成功的线程就能消费全部消息了,因为有个循环。
  2. 假如有5个线程,那么每次加锁势必会有至多4个线程获取不到锁,那么消息会重新投递4次,需要这样做吗?
  3. 比如消费者1,2,3,4,5分别拿到了编号为1,2,3,4,5的消息,2,3,4,5都已经入库了,编号为1的消息还没有入库。这时候消费者2获得了锁,1,3,4,5都还没有获得锁,但消息队列表里编号最小的是2,所以比较编号的时候没轮到消费者2,直接break退出循环了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2099768.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV颜色空间转换(1)颜色空间转换函数cvtColor()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像从一个颜色空间转换到另一个颜色空间。 此函数将输入图像从一个颜色空间转换到另一个颜色空间。在进行 RGB 颜色空间之间的转换时&#x…

在Ubuntu/Linux下重温FC游戏——超级玛丽奥

文章目录 在Ubuntu/Linux下重温FC游戏——超级玛丽奥1 概述2 安装 FCEUX 模拟器3 下载 FC ROMS4 重温时光 在Ubuntu/Linux下重温FC游戏——超级玛丽奥 1 概述 FC 游戏机&#xff0c;是任天堂生产、发行和销售的 8 位第三世代家用游戏机&#xff0c;日本版官方名称为家庭电脑&…

pod基础和镜像拉取策略

目录 pod概念 pod的分类 1.基础容器 pause 2.初始化容器 init 实验&#xff1a;定义初始化容器 init容器的作用 实验&#xff1a;如何在容器内部进行挂载 镜像拉取策略 pod概念 pod是k8s里面的最小单位&#xff0c;pod也是最小化运行容器的资源对象。容器是基于pod在k…

flink---window

Window介绍 DataStream: https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/datastream/operators/windows/ SQL: https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/queries/window-tvf/ 1、为什么需要Window?…

机械学习—零基础学习日志(概率论总笔记2)

正态分布 高斯分布也叫做正态分布。假定事件A经过n次试验后发生了k次&#xff0c;把k的概率分布图画一下&#xff0c;就得到了一个中间鼓起&#xff0c;像倒扣的钟一样的对称图形。 18世纪&#xff0c;数学家棣莫弗和拉普拉斯把这种中间大&#xff0c;两头小的分布称为正态分布…

社交媒体的智能变革:Facebook AI优化用户体验

Facebook作为全球领先的社交平台&#xff0c;一直致力于通过人工智能&#xff08;AI&#xff09;技术提升用户体验。AI技术在Facebook的应用涵盖了推荐系统、自然语言处理、广告投放和用户反馈等多个方面&#xff0c;使平台的互动和内容体验更加智能和个性化。 推荐系统的智能化…

四、材料与制造工艺 笔记

目录 四、材料与制造工艺 4.1 常见聚合物类材料&#xff08;塑料&#xff09; 4.1.1 聚丙烯塑料发泡材料&#xff08;EPP&#xff09; 4.1.2 尼龙 &#xff08;1&#xff09;PA66 4.1.3光固化树脂 4.1.4 KT板 4.1.5 术板 4.1.6 EDA 3D打印 &#xff08;1&#xf…

基于VUE的校园二手物品交易管理系统的设计与实现 (含源码+sql+视频导入教程)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于VUE的校园二手物品交易管理系统8拥有两种角色 管理员&#xff1a;闲置物品管理、订单管理、用户管理 用户&#xff1a;登录注册、购物车、发布闲置物品、评论、发货、收货地址管理等…

C++string类(1)

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 Cstring类(1) 收录于专栏【C语法基础】 本专栏旨在分享学习C的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 目录 1. 为什么学习stri…

旋转编码器模块(软件消抖)

目录 旋转编码器简介 模块接线 正交编码器控制原理 模块代码 Encoder.h Encoder.c main.c 消抖代码 Encoder.c 旋转编码器简介 旋转编码器是一种将旋转角位移转换为一连串数字脉冲信号的旋转式电位器。当编码器的旋转轴旋转时&#xff0c;其输出端可以输出与旋转…

C语言 strlen求字符串长度

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; C 标准库 - <string.h…

OpenCV中的颜色映射函数applyColorMap的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 OpenCV 中应用类似于 GNU Octave 或 MATLAB 中的颜色映射&#xff0c;虽然 OpenCV 中的颜色映射类型与 GNU Octave 或 MATLAB 中的颜色映射类型名…

Windows Server 安装MySQL教程(图文)

本篇教程&#xff0c;在服务器Windows Server 2016 &#xff08;中文版&#xff09;上安装MySQL8.0&#xff0c;并记录详细的安装步骤。 1、下载安装包 在mysql官网上下载安装包 下载地址&#xff1a;https://dev.mysql.com/downloads/installer/ 2、安装步骤 下载之后&#xf…

常见概念 -- WSS光层环回

光层环回&#xff0c;即业务收发经过同一块WSS单板&#xff0c;在WSS单板的DM/AM层面或内部端口层面进行环回&#xff0c;用于定位问题。 目前&#xff0c;光层环回有两种实现方式。 方式一&#xff1a;需要人工进行物理连纤&#xff0c;将WSS单板的DM和AM接口用光纤环回&…

3.2 寻址方式

&#x1f393; 微机原理考点专栏&#xff08;通篇免费&#xff09; 欢迎来到我的微机原理专栏&#xff01;我将帮助你在最短时间内掌握微机原理的核心内容&#xff0c;为你的考研或期末考试保驾护航。 为什么选择我的视频&#xff1f; 全程考点讲解&#xff1a;每一节视频都…

Datawhale x李宏毅苹果书AI夏令营深度学习详解入门 Task3

在深度学习中&#xff0c;模型偏差、优化问题和过拟合是我们经常会遇到的挑战。理解这些问题并找到合适的解决方法对于提高模型的性能至关重要。 第一章&#xff1a;模型偏差 1.1 模型过于简单可能导致模型偏差 在应用机器学习算法时&#xff0c;如果模型过于简单&#xff0c;就…

前端三件套配合MarsCode实现钉钉官网动画 # 豆包MarsCode

文章目录 如何固定动画区域创建项目MarsCode 设置样式MarsCode 优点1MarsCode 缺点MarsCode 优点2 js实现动画实现获取动画曲线的函数为什么实现这个函数&#xff1f;根据当前滚动位置&#xff0c;计算每一个元素不同的数值更新 dom 的 style更新 animationMapgetDomAnimation …

| AutoDL租服务器 |AutoDL租服务器保姆级教程

&#x1f411; | AutoDL租服务器 |AutoDL租服务器保姆级教程 &#x1f411; 文章目录 &#x1f411; | AutoDL租服务器 |AutoDL租服务器保姆级教程 &#x1f411;&#x1f411; 前言&#x1f411;&#x1f411; 实例创建&#x1f411;&#x1f411; 环境配置&#x1f411;&am…

C# DLL已定义类或方法,但是编译报错未定义

现有应用程序1个&#xff0c;动态链接库3个分别称为A、B、C。 应用程序输出在目录P1&#xff0c;动态链接库输出在目录P2。 应用程序引用A、B、C动态链接库&#xff0c;动态链接库A引用B&#xff0c;B引用C。 此时修改动态链接库C&#xff0c;在VS中开发应用程序时可以识别到…

MQ专题:消息积压相关问题和解决思路

一、如何发现消息出现了堆积 二、常见的原因及解决方案 三、消息出现了大量堆积&#xff0c;如何解决 四、建议