rabbitmq 使用SAC队列实现顺序消息

news2024/11/20 4:35:04

rabbitmq 使用SAC队列实现顺序消息

前提

SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

目的

实现消息顺序消费,操作:

  • 创建4个SAC队列,
  • 消息的路由key 取队列个数模,这里是4
  • 发送消息到每个队列,保证每个队列只有一个消费者!!

实现

定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {

    //消息id
    private String requestNo;
    //消息中顺序,1,2,3,4
    private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {

    public static final String EXCHANGE = "order-ex";
    public static final String RK_PREFIX = "rk-";
    public static final String ONE_QUEUE = "one-queue";
    public static final String TWO_QUEUE = "two-queue";
    public static final String THREE_QUEUE = "three-queue";
    public static final String FOUR_QUEUE = "four-queue";

    @Bean
    public DirectExchange exchange() { // 使用直连的模式
        return new DirectExchange(EXCHANGE, true, false);
    }

    @Bean
    public Binding oneBinding() {
        return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);
    }
    @Bean
    public Binding twoBinding() {
        return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);
    }
    @Bean
    public Binding threeBinding() {
        return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);
    }
    @Bean
    public Binding fourBinding() {
        return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 3);
    }


    @Bean
    public Queue oneQueue() {
        return createSacQueue(ONE_QUEUE);
    }

    @Bean
    public Queue twoQueue() {
        return createSacQueue(TWO_QUEUE);
    }

    @Bean
    public Queue threeQueue() {
        return createSacQueue(THREE_QUEUE);
    }

    @Bean
    public Queue fourQueue() {
        return createSacQueue(FOUR_QUEUE);
    }

    private static Queue createSacQueue(String queueName) {
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-single-active-consumer", true);
        return new Queue(queueName, true, false, false, arguments);
    }

}

重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

在这里插入图片描述

创建 消费者

为每个队列创建一个监听消费者

@Slf4j
@Component
public class OrderListener {


    @RabbitListener(bindings = @QueueBinding(
                    exchange = @Exchange(value = EXCHANGE,declare = "false"),
                    value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))
    public void onMessage1(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", ONE_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))
    public void onMessage2(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", TWO_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))
    public void onMessage3(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", THREE_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))
    public void onMessage4(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", FOUR_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

}
生产者发送消息
@GetMapping("/send/seq/messqge")
 public String sendSeqMessage() throws JsonProcessingException {
     int cnt = 100;
     int mod = 4;
     int seqSize = 6;
     for (int i = 0; i < cnt; i++) {
         for (int j = 0; j < seqSize; j++) {
             int rk = i % mod + 1;
             SeqMessage seqMessage = new SeqMessage("seq-" + i, j);
             String s = objectMapper.writeValueAsString(seqMessage);
             log.info("routeKey: {}, send msg: {}", rk, s);
             rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);
         }
     }
     return "success";
 }

运行结果:

two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}

three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}

可以发现,消息消费是顺序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1609882.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Marin说PCB之Via 的 Z--AXIS--delay知多少?

周末宅在家刷抖音的时候&#xff0c;看刷到了一条很有趣味的视频&#xff0c;主要讲的是让你如何从一个allegro菜鸟一个月变成大神的&#xff0c;一个月包教会&#xff0c;这不是妥妥地大骗子嘛。现在的整个市场行情不好啊&#xff0c;各大汽车新能源门派都在紧锣密鼓地搞着“裁…

AI预测福彩3D第40弹【2024年4月19日预测--第8套算法开始计算第8次测试】

今天咱们继续测试第8套算法和模型&#xff0c;今天是第8次测试&#xff0c;目前的测试只是为了记录和验证&#xff0c;为后续的模型修改和参数调整做铺垫&#xff0c;所以暂时不建议大家盲目跟买~废话不多说了&#xff0c;直接上结果&#xff01; 2024年4月19日3D的七码预测结果…

最小生成树算法的实现c++

最小生成树算法的实现c 题目链接&#xff1a;1584. 连接所有点的最小费用 - 力扣&#xff08;LeetCode&#xff09; 主要思路&#xff1a;使用krusal算法&#xff0c;将边的权值进行排序&#xff08;从小到大排序&#xff09;&#xff0c;每次将权值最小且未加入到连通分量中…

施耐德 PLC 及模块 ModbusTCP 通信配置方法

1. 通过【I/O扫描器】服务进行读写 相关文档&#xff1a;各模块说明书仅 NOE 网卡模块、部分 CPU 自带的网口支持 优点&#xff1a;不需要额外编程&#xff0c;系统自动周期型读写数据缺点&#xff1a;扫描周期不定&#xff0c;程序无法控制数据刷新的时序 2. 通过内部程序…

C语言---贪吃蛇(一)---准备工作

文章目录 前言1.Win32 API介绍1.1.Win32 API1.2. 控制台程序1.3.控制台屏幕上的坐标[COORD](https://learn.microsoft.com/zh-cn/windows/console/coord-str)1.4.[GetStdHandle](https://learn.microsoft.com/zh-cn/windows/console/getstdhandle)1.5.[GetConsoleCursorInfo](h…

Navicat 干货 | 了解 PostgreSQL 规则

PostgreSQL 是一个强大的开源关系型数据库管理系统&#xff0c;为增强数据管理和操作提供了丰富的功能。这些功能中包含了规则&#xff0c;这是一种用于控制数据库内部查询和命令处理方式的机制。本文将探讨 PostgreSQL 规则的工作原理&#xff0c;以及它们与触发器的区别&…

替代普通塑料吸头的PFA移液吸头

目前市场上的规格&#xff1a;0.01ml、0.05ml、0.1ml、0.2ml、0.5ml、1ml、2ml、5ml、10ml等均可定制加工PFA材质枪头&#xff0c;可以适配市场上大部分移液枪&#xff0c;普兰德&#xff0c;大龙&#xff0c;赛默飞&#xff0c;赛多利斯&#xff0c;力辰、吉尔森&#xff0c;瑞…

Flask中的JWT认证构建安全的用户身份验证系统

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 Flask中的JWT认证&#xff1a;构建安全的用户身份验证系统 随着Web应用程序的发展&#xf…

粤嵌—2024/4/19—三数之和

代码实现&#xff1a; 方法一&#xff1a;排序 回溯——超时 有错误 /*** Return an array of arrays of size *returnSize.* The sizes of the arrays are returned as *returnColumnSizes array.* Note: Both returned array and *columnSizes array must be malloced, assu…

Scanpy(2)多种可视化

本篇内容为scanpy的可视化方法&#xff0c;可以分为三部分&#xff1a; embedding的散点图&#xff1b;用已知marker genes的聚类识别&#xff08;Identification of clusters&#xff09;&#xff1b;可视化基因的差异表达&#xff1b; 我们使用10x的PBMC数据集&#xff08;…

「泰雷兹」新合作推进南美太空安全,量子加密守护卫星系统

在第23届国际航空航天博览会&#xff08;FIDAE&#xff09;期间&#xff0c;泰雷兹与SeQure Quantum签署了一份谅解备忘录&#xff0c;SeQure Quantum是一家专门从事加密和密码学量子技术的智利公司。二者联手探索和制定与智利太空项目相关的联合战略、技术和知识转让。 在一个…

docker安装并跑通QQ机器人实践(2)-签名服务器bs-qsign搭建

在前文中&#xff0c;我们详尽阐述了QQ机器人的搭建过程及其最终实现的各项功能展示。接下来&#xff0c;我们将转向探讨该项目基于Docker构建服务的具体实践。本篇将以QQ机器人签名服务——qsign为起点&#xff0c;逐步展开论述。 1 获取和运行 xzhouqd/qsign:8.9.63 镜像 1.…

Java开发从入门到精通(二十):Java的面向对象编程OOP:IO流文件操作的读取和写入

Java大数据开发和安全开发 &#xff08;一&#xff09;Java的IO流文件读写1.1 IO流前置知识1.1.1 ASCII字符集1.1.2 GBK字符集1.1.3 Unicode字符集1.1.4 UTF-8字符集1.1.4 Java的编码解码 1.2 IO流的基础知识1.2.1 认识I0流1.2.2 应用场景1.2.3 如何学I0流1.2.3.1 先搞清楚I0流…

移除离群点------PCL

statisticalOutlierRemoval滤波器移除离群点 /// <summary> /// 使用statisticalOutlierRemoval滤波器移除离群点 /// </summary> /// <param name"cloud">被过滤的点云</param> /// <param name"meank"></param> //…

lementui el-menu侧边栏占满高度且不超出视口

做了几次老是忘记&#xff0c;这次整理好逻辑做个笔记方便重复利用&#xff1b; 问题&#xff1a;elementui的侧边栏是占不满高度的&#xff1b;但是使用100vh又会超出视口高度不美观&#xff1b; 解决办法&#xff1a; 1.获取到侧边栏底部到视口顶部的距离 2.获取到视口的高…

实验室三大常用仪器1---示波器的基本使用方法(笔记)

目录 示波器的作用 示波器的基础操作方法 示波器测量突变脉冲 示波器的作用 示波器能帮助我们干什么&#xff1f; 比如说某个电源用万用表测量是稳定的5V输出 但是用示波器一看确实波涛汹涌 这样的电源很可能回导致系统异常工作 又比如电脑和单片机进行串口通信时&#xf…

c 多文件编程

1.结构目录 声明类:用于声明方法,方便方法管理和调用&#xff1b; 实现类:用于实现声明的方法; 应用层:调用方法使用 写过java代码的兄弟们可以这么理解&#xff1a; 声明类 为service层 实现类 为serviceimpl层 应用层 为conlloter层 2.Dome 把函数声明放在头文件xxx.h中&…

什么是 GitHub Wiki 以及如何使用它?

GitHub Wiki 是你项目文档的一个很好的地方。你可以使用 wiki 来创建、管理和托管你的存储库的文档&#xff0c;以便其他人可以使用并为你的项目做出贡献。 GitHub Wiki 很容易开始使用&#xff0c;无需安装任何其他软件。最好的部分是 wiki 与你的 GitHub 存储库集成在一起。…

汇编语言——输入4位以内的16进制数,存进BX

data segment data ends stack segment stackdw 100 dup (?)top label word stack ends code segmentassume cs:code,ds:data,ss:stack main proc farmov ax,datamov ds,axmov ax,stackmov ss,axlea sp,topmov bx,0mov cx,4 ;最多输入4位16进制数 L1: mov ah,7 ;用7号功能…

进程互斥的实现

目录 一. 进程同步二. 进程互斥三. 进程互斥软件实现四. 进程互斥硬件实现4.1 中断屏蔽方法4.2 test and set 指令4.3 Swap 指令 五. 互斥锁六. 信号量机制6.1 整型信号量6.2 记录型信号量6.3 信号量机制实现进程互斥6.4 信号量机制实现进程同步6.5 信号量机制实现进程前驱关系…