RabbitMQ之顺序消费

news2024/11/17 22:36:21

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){
        // 创建一个 单活模式 队列
        HashMap<String, Object> args=new HashMap<>();
        args.put("x-single-active-consumer",true);
        return new Queue(name,true,false,false,args);
        }

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {
    @Bean
    public Queue queue15_0() {
        return creatQueue(Message15.QUEUE_0);
    }


    @Bean
    public Queue queue15_1() {
        return creatQueue(Message15.QUEUE_1);
    }

    @Bean
    public Queue queue15_2() {
        return creatQueue(Message15.QUEUE_2);
    }

    @Bean
    public Queue queue15_3() {
        return creatQueue(Message15.QUEUE_3);
    }

    @Bean
    public DirectExchange exchange15() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message15.EXCHANGE, true, false);
    }


    @Bean
    public Binding binding15_0() {
        return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");
    }

    @Bean
    public Binding binding15_1() {
        return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");
    }

    @Bean
    public Binding binding15_2() {
        return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");
    }

    @Bean
    public Binding binding15_3() {
        return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");
    }

    /**
     * 创建一个 单活 模式的队列
     * 注意 :
     * <p>
     * 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除
     *
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        // 创建一个 单活模式 队列
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }
=================================》生产者
@Component
@Slf4j
public class Producer15 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 这里的发送是 拟投递到多个队列中
     *
     * @param id  业务id
     * @param msg 业务信息
     */
    public void syncSend(int id, String msg) {
        Message15 message = new Message15(id, msg);
        rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);
    }

    /**
     * 根据 id 取余来决定丢到那个队列中去
     *
     * @param id id
     * @return routingKey
     */
    private String getRoutingKey(int id) {
        return String.valueOf(id % Message15.QUEUE_COUNT);
    }
}
============================》消费者
/**
 * 要想保证消息的顺序,每个队列只能有一个消费者
 *
 * @author 深漂码农@明哥
 * @date 2024-03-18
 */
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {

    @RabbitHandler
    public void onMessage(Message15 message) throws InterruptedException {
        log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
        // 这里随机睡一会,模拟业务处理时候的耗时
        long l = new Random(1000).nextLong();
        TimeUnit.MILLISECONDS.sleep(l);
    }
}
==============================》测试类
@Test
    void mock() throws InterruptedException {
        // 先启动这个测试类,模拟多个副本情况下,看如何消费
        new CountDownLatch(1).await();
    }

    @Test
    void syncSend() throws InterruptedException {
        // 模拟每个队列中扔 10 个数据,看看效果
        for (int i = 0; i < 10; i++) {
            for (int j = 0; j < 4; j++) {
                producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");
            }
        }

        TimeUnit.SECONDS.sleep(20);
    }
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1642766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】进程exec函数族以及守护进程

一.exec函数族 1.exec函数族的应用 在shell下敲shell的命令都是在创建shell的子进程。而我们之前学的创建父进程和子进程代码内容以及通过pid与0的关系来让父子进程执行不同的代码内容都是在一个代码文件里面&#xff0c;而shell是如何做到不在一个文件里面写代码使之成为子进…

4- 29

五六月安排 5.12江苏CPC 6.2、6.16、6.30三场百度之星省赛 6月蓝桥杯国赛 7.15 睿抗编程赛道省赛 5 6月两个科创需要申请完软著。 网络技术挑战赛过了资格赛&#xff0c;下面不知道怎么搞&#xff0c;如果参加需要花费很多的时间。 1.100个英语单词一篇阅读&#xff0c;讲了文…

Docker Compose 部署若依前后端分离版

准备一台服务器 本次使用虚拟机&#xff0c;虚拟机系统 Ubuntu20.04&#xff0c;内存 4G&#xff0c;4核。 确保虚拟机能连接互联网。 Ubuntu20.04 安装 Docker 添加 Docker 的官方 GPG key&#xff1a; sudo apt-get update sudo apt-get install ca-certificates curl su…

Hibernate的QBC与HQL查询

目录 1、Hibernate的QBC查询 2、Hibernate的HQL查询 3、NatvieSQL原生查询 1、Hibernate的QBC查询 Hibernate具有一个直观的、可扩展的条件查询API public class Test { /** * param args */ public static void main(String[] args) { Session sessio…

【八股】AQS,ReentrantLock实现原理

AQS 概念 AQS 的全称是 AbstractQueuedSynchronized &#xff08;抽象队列同步器&#xff09;&#xff0c;在java.util.concurrent.locks包下面。 AQS是一个抽象类&#xff0c;主要用来构建锁和同步器&#xff0c;比如ReentrantLock, Semaphore, CountDownLatch&#xff0c;里…

安卓LayoutParams浅析

目录 前言一、使用 LayoutParams 设置宽高二、不设置 LayoutParams2.1 TextView 的 LayoutParams2.2 LinearLayout 的 LayoutParams 三、getLayoutParams 的使用四、setLayoutParams 的作用五、使用 setWidth/setHeight 设置宽高 前言 先来看一个简单的布局&#xff0c;先用 x…

Jackson-jr 对比 Jackson

关于Jackson-jr 对比 Jackson 的内容&#xff0c;有人在做了一张下面的图。 简单点来说就 Jackson-jr 是Jackson 的轻量级应用&#xff0c;因为我们在很多时候都用不到 Jackson 的很多复杂功能。 对很多应用来说&#xff0c;我们可能只需要使用简单的 JSON 读写即可。 如我们…

手撕spring框架(5)

手撕spring框架(5) 相关系列 手撕spring框架&#xff08;1&#xff09; 手撕spring框架&#xff08;2&#xff09; 手撕spring框架&#xff08;3&#xff09; 手撕spring框架&#xff08;4&#xff09; 这是本专题最后一节了&#xff0c;主要是讲述自定义一个注解&#xff0c;实…

QT中的容器

Qt中的容器 关于Qt中的容器类&#xff0c;下面我们来进行一个总结&#xff1a; Qt的容器类比标准模板库&#xff08;STL&#xff09;中的容器类更轻巧、安全和易于使用。这些容器类是隐式共享和可重入的&#xff0c;而且他们进行了速度和存储的优化&#xff0c;因此可以减少可…

HackTheBox_knote

前言 最近打算刷一些内核利用的 CTF 的题目~~~ 题目分析 内核版本&#xff1a;v5.8.3&#xff0c;但是没有开启 cg 隔离smap/smep/kpti/kaslr 全关&#xff0c;可以 ret2usr&#xff0c;所以应该是比较老的题目了&#xff08;&#xff1a;这里很奇怪的是就算设置 kaslr 但是…

虚拟化技术 使用Vsphere Client管理ESXi服务器系统

使用Vsphere Client管理ESXi服务器系统 一、实验目的与要求 1.掌握使用vSphere Client管理ESXi主机 2.掌握将CentOS的安装介质ISO上传到ESXi存储 3.掌握在VMware ESXi中创建虚拟机 4.掌握在所创建的虚拟机中安装CentOS6.5操作系统 5.掌握给CentOS6.5安装VMware Tools 6.掌…

RabbitMQ(Docker 单机部署)

序言 本文给大家介绍如何使用 Docker 单机部署 RabbitMQ 并与 SpringBoot 整合使用。 一、部署流程 拉取镜像 docker pull rabbitmq:3-management镜像拉取成功之后使用下面命令启动 rabbitmq 容器 docker run \# 指定用户名-e RABBITMQ_DEFAULT_USERusername \# 指定密码-e R…

python数据可视化:显示两个变量间的关系散点图scatterplot()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 python数据可视化&#xff1a; 显示两个变量间的关系 散点图 scatterplot() [太阳]选择题 请问关于以下代码表述错误的选项是&#xff1f; import seaborn as sns import matplotlib.pyplot …

EPAI手绘建模APP编辑模型2

⑩ 桥接&#xff0c;选择两个面。桥接完成后&#xff0c;在选择的两个面之间生成了一个边界放样模型&#xff0c;边界放样模型和原模型合并成一个新的模型。 图 213 桥接 ⑪ 移除特征&#xff0c;选择倒圆角面、倒直角面、挖孔面、凸起面&#xff0c;移除。移除特征后&#xff…

图像处理ASIC设计方法 笔记21 标记ASIC的顶层状态机

目录 (一)标记ASIC的工作流程1 ASIC首先从控制寄存器内读出待标记图像的基本参数2若写入了有效的启动命令,则进入下面一帧图像的标记过程。3 ASIC通过接口模块从FIFO1中读取待标记的图像4一帧图像初步标记完成后进行等价表的整理压缩5从临时标记存储器中读取临时标记送入标记…

【iOS】KVC

文章目录 前言一、KVC常用方法二、key与keypath区别key用法keypath用法 三、批量存值操作四、字典与模型相互转化五、KVC底层原理KVC设值底层原理KVC取值底层原理 前言 KVC的全称是Key-Value Coding&#xff0c;翻译成中文叫做键值编码 KVC提供了一种间接访问属性方法或成员变…

数据结构练习题---环形链表详解

链表成环&#xff0c;在力扣中有这样的两道题目 https://leetcode.cn/problems/linked-list-cycle/ https://leetcode.cn/problems/linked-list-cycle-ii/description/ 这道题的经典解法是利用快慢指针&#xff0c;如果链表是一个环形链表&#xff0c;那么快指针(fast)和慢指…

AI图书推荐:AI在语言学习教育领域的应用和挑战

这本书《AI在语言学习教育领域的应用和挑战》&#xff08;AI in Language Teaching, Learning, and Assessment&#xff09;由Fang Pan编辑&#xff0c;出版于IGI Global&#xff0c;主要探讨了人工智能&#xff08;AI&#xff09;在语言教育领域的应用、挑战以及潜在的益处。 …

【苍穹外卖】项目实战Day04

&#x1f525; 本文由 程序喵正在路上 原创&#xff0c;CSDN首发&#xff01; &#x1f496; 系列专栏&#xff1a;苍穹外卖项目实战 &#x1f320; 首发时间&#xff1a;2024年5月5日 &#x1f98b; 欢迎关注&#x1f5b1;点赞&#x1f44d;收藏&#x1f31f;留言&#x1f43e…

TwinCAT3 实时内核调度算法

前言 TwinCAT3 支持多核心CPU并行运行实时任务&#xff0c;根据官方网站的帮助信息“实时”定义取自DIN44300&#xff0c;而且实时任务的调度算法默认是 RMS算法&#xff08;速率单调调度算法&#xff09; RMS算法 来看一下百度百科的解释&#xff1a; RMS&#xff08;单调速…