【RabbitMQ】消息分发、事务

news2025/1/20 14:49:49

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        acknowledge-mode: manual # 消息确认机制为手动确认
        prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {

    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }

    @Bean("qosExchange")
    public Exchange qosExchange() {
        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
    }

    @Bean("qosQueueBind")
    public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
    }

}
@RestController
@RequestMapping("/qos")
public class QosController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void qosQueue() {
        for (int i = 0; i < 10; i++) {
            this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);
            System.out.println("第" + i + "次发送消息成功!");
        }
    }

}
@Configuration
public class QosListener {

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void qosListener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收的消息为:" + msg);
        // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {

    @Bean("transactionRabbitTemplate")
    public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true); // 开启事务
        return rabbitTemplate;
    }

}
@Configuration
public class TransactionConfig {

    @Bean("transactionQueue")
    public Queue transactionQueue() {
        return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();
    }

    @Bean("transactionExchange")
    public Exchange transactionExchange() {
        return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();
    }

    @Bean("transactionQueueBind")
    public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,
                                       @Qualifier("transactionExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();
    }

}
@RestController
@RequestMapping("/transaction")
public class TransactionController {

    @Resource(name = "transactionRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @Transactional
    @RequestMapping
    public void transactionQueue() {
        System.out.println("发送成功");
         this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
         int i = 1 / 0;
        this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
    }

}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2153690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker desktop windows stop

服务docker改为启动 cmd下查看docker版本 {"builder": {"gc": {"defaultKeepStorage": "20GB","enabled": true}},"experimental": false,"registry-mirrors": ["https://hub.atomgit.com/"]…

详解c++:new和delete

文章目录 前言一、new和mallocnew的用法&#xff08;爽点&#xff09;自动构造 delete和freedelete的用法&#xff08;爽点&#xff09; 提醒 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 在C中&#xff0c;new 和 delete 是两个非常重要的操作符&am…

Python面相对象案例熟悉对MySQL的操作(案例一、学生管理系统,案例二、模拟注册与用户登录)有源码

Python面相对象案例熟悉对MySQL的操作 案例一&#xff0c;学生管理系统 对数据表的要求&#xff1a; 在mysql中创建数据库gamedb,创建用户表userinfo,字段如下&#xff1a; 用户编号uid(int,主键&#xff0c;自动增长) 用户姓名uname(varchar(20),非空) 用户昵称nickname(…

【Delphi】Delphi 中的 LiveBindings 使用场景与概念

LiveBindings 是 Delphi 提供的一种数据绑定机制&#xff0c;用于将 UI 控件与数据源&#xff08;如数据库字段、对象属性等&#xff09;进行动态连接。LiveBindings 允许开发人员通过可视化的方式绑定数据&#xff0c;省去了大量的手动编写代码&#xff0c;使 UI 更新和数据同…

RAG+Agent人工智能平台:RAGflow实现GraphRA知识库问答,打造极致多模态问答与AI编排流体验

1.RAGflow简介 全面优化的 RAG 工作流可以支持从个人应用乃至超大型企业的各类生态系统。大语言模型 LLM 以及向量模型均支持配置。基于多路召回、融合重排序。提供易用的 API&#xff0c;可以轻松集成到各类企业系统。支持丰富的文件类型&#xff0c;包括 Word 文档、PPT、exc…

『玉竹』基于Laravel 开发的博客、微博客系统和Android App

基于 Laravel 和 Filament 开发, 使用 Filament 开发管理后台&#xff0c;前端比较简洁。 博客大家都清楚是什么东西&#xff0c;微博客类似于微博之类的吧&#xff0c;有时候想要写的东西可能只有几句话&#xff0c;想要起个标题都不好起。 为了是微博客功能更好用&#xff0c…

Navicat导入Sql文件至Mysql数据库,事务失效

Mysql 版本&#xff1a;8.0.39 Navicat 版本&#xff1a;17.x、16.x 结论&#xff1a; Navicat 导入sql文件&#xff0c;事务不会生效&#xff0c;无论怎么设置 mysql.exe 导入sql文件&#xff0c;事务生效 测试 准备一张表 name约束不能为空&#xff0c;用于测试事务失败…

Qemu开发ARM篇-2、uboot交叉编译

文章目录 1、交叉编译工具安装2、uboot交叉编译3、FAQ 在继上一篇 Qemu开发ARM篇-1、环境搭建篇中&#xff0c;我们搭建安装了qemu虚拟机&#xff0c;在本节中&#xff0c;我们将演示如何安装交叉编译工具并交叉编译 uboot,在下一节中&#xff0c;我们将演示如何使用 qemu运…

如何快速找回Finalshell中VPS的SSH密码

买了vps亦或者重装了系统&#xff0c;就会更新SSH的连接密码&#xff0c;如果忘记保存或者遗忘&#xff0c;在邮箱里也找不到&#xff0c;再重装系统会非常麻烦。这时就需要在Finalshell中找回SHH的密码了。方法如下&#xff1a; 第一步&#xff1a;无认哪一种方法&#xff0c…

嵌入式入门小工程

此代码基于s3c2440 1.点灯 //led.c void init_led(void) {unsigned int t;t GPBCON;t & ~((3 << 10) | (3 << 12) | (3 << 14) | (3 << 16));t | (1 << 10) | (1 << 12) | (1 << 14) | (1 << 16);GPBCON t; }void le…

window下idea中scala的配置

目录 Scala安装步骤&#xff1a; 1.下载scala安装包 2.配置环境变量&#xff1a; 3.检查scala是否安装成功&#xff1a; 4.idea安装scala插件 5.导入scala-sdk 6.新建scala文件 Scala安装步骤&#xff1a; 1.下载scala安装包 访问Scala官网&#xff1a;https://www.sca…

MySQL高阶1907-按分类统计薪水

目录 题目 准备数据 分析数据 总结 题目 结果表 必须 包含所有三个类别。 如果某个类别中没有帐户&#xff0c;则报告 0 。 按 任意顺序 返回结果表。 查询每个工资类别的银行账户数量。 工资类别如下&#xff1a; "Low Salary"&#xff1a;所有工资 严格低于…

YOLOv8改进 | 特征融合篇,YOLOv8添加iAFF(多尺度通道注意力模块),并与C2f结构融合,提升小目标检测能力

摘要 特征融合,即来自不同层或分支的特征的组合,是现代网络架构中无处不在的一部分。虽然它通常通过简单的操作(如求和或拼接)来实现,但这种方式可能并不是最佳选择。在这项工作中,提出了一种统一且通用的方案,即注意力特征融合(Attentional Feature Fusion),适用于…

刷题训练之栈

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;熟练掌握字符串算法。 > 毒鸡汤&#xff1a;学习&#xff0c;学习&#xff0c;再学习 ! 学&#xff0c;然后知不足。 > 专栏选自&#xff1a;刷题…

【C++】C++库:如何链接外部库、静态链接和动态链接,以及如何自建库并使用

十三、C库&#xff1a;如何链接外部库、静态链接和动态链接&#xff0c;以及如何自建库并使用 本篇讲C库&#xff0c;先讲如何在项目中使用外部库&#xff0c;包括静态链接和动态链接的实现&#xff1b;再讲如何在VisualStudio中自建模块或库项目&#xff0c;让所有项目都能使…

大数据实验2.Hadoop 集群搭建(单机/伪分布式/分布式)

实验二&#xff1a; Hadoop安装和使用 一、实验目的 实现hadoop的环境搭建和安装Hadoop的简单使用&#xff1b; 二、实验平台 操作系统&#xff1a;Linux&#xff08;建议Ubuntu16.04或者18.04&#xff09;&#xff1b;Hadoop版本&#xff1a;3.1.3&#xff1b;JDK版本&…

C#解决方案的各种操作

C#开发编程软件下载安装 C#开发编程软件下载安装_c#下载安装-CSDN博客文章浏览阅读208次。。。。_c#下载安装https://rxxw-control.blog.csdn.net/article/details/140879228 C#和S7-1200PLC S7.NET通信 C#和S7-1200PLC S7.NET通信_c# s1200 s7协议设置-CSDN博客文章浏览阅读…

Linux开发工具(git、gdb/cgdb)--详解

目录 一、Linux 开发工具分布式版本控制软件 git1、背景2、使用 git&#xff08;1&#xff09;预备工作——安装 git&#xff1a;&#xff08;2&#xff09;克隆远程仓库到本地&#xff08;3&#xff09;把需要提交的代码拷贝到本地仓库&#xff08;4&#xff09;提交本地仓库文…

JavaScript ---案例(统计字符出现次数)

统计字符出现次数 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-w…

在Linux中运行flask项目

准备 这里我准备了一个GitHub上某个大佬写的留言板的Flask项目&#xff0c;就用这个来给大家做示范了。 查看留言板的目录结构 查看主程序所用的库函数 只有一个第三方库 Flask 安装pip sudo apt install python3-pip -y测试 pip 安装成功 修改pip镜像源 修改pip的默认下载…