RabbitMQ ---- Work Queues

news2025/1/9 17:15:22

RabbitMQ ---- Work Queues

  • 1. 轮训分发消息
    • 1.1 抽取工具类
    • 1.2 启动两个工作线程
    • 1.3 启动一个发送线程
    • 1.4 结果展示
  • 2. 消息应答
    • 2.1 概念
    • 2.2 自动应答
    • 2.3 消息应答的方法
    • 2.4 Multiple 的解释
    • 2.5 消息自动重新入队
    • 2.6 消息手动应答代码
    • 2.7 手动应答效果演示
  • 3. RabbitMQ 持久化
    • 3.1 概念
    • 3.2 队列如何实现持久化
    • 3.3 消息实现持久化
    • 3.4 不公平分发
    • 3.5 预取值

工作队列(又称任务队列)的注销思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

1. 轮训分发消息

实现两个工作线程,一个消息发送线程,看看他们两个工作线程是如何工作的。

1.1 抽取工具类

/**
 * RabbitMQ 的工具类
 * @author dell
 * @date 2023/7/7 16:49
 */

public class RabbitMqUtils {

    public static Channel getChannel() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.100");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }

}

1.2 启动两个工作线程

/**
 * 工作线程
 * @author dell
 * @date 2023/7/7 16:56
 */

public class Worker01 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息: " + receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消息消费被中断");
        };
        System.out.println("C1消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }

}

在这里插入图片描述
在这里插入图片描述

1.3 启动一个发送线程

/**
 * 发送线程
 * @author dell
 * @date 2023/7/7 17:07
 */

public class Task01 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("发送消息完成: " + message);
            }
        }
    }

}

1.4 结果展示

通过产生者总共发送 4 个消息,消费者1和消费者2分别分得两个消息,并且按照有序的一个接收一次消息。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2. 消息应答

2.1 概念

消费者完成一个任务可能需要一段时间,如何其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

2.2 自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

2.3 消息应答的方法

Channel.basicAck(用于肯定确认): RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject(用于否定确认): 与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

2.4 Multiple 的解释

**手动应答的好处是可以批量应答并且减少网络拥堵 **

在这里插入图片描述

true:代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

false:同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

在这里插入图片描述

2.5 消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

在这里插入图片描述

2.6 消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。

消息生产者

/**
 * 消息生产者
 *
 * @author dell
 * @date 2023/7/7 22:53
 */

public class Task02 {

    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入信息");
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息: " + message);
        }
    }

}

消费者01

/**
 * 消费者01
 * @author dell
 * @date 2023/7/7 23:00
 */

public class Work03 {

    public static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumeTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息: " + message);
            /**
             * 1. 消息标记tag
             * 2. 是否批量应答未应答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }

}

消费者02

/**
 * 消费者02
 * @author dell
 * @date 2023/7/7 23:07
 */

public class Work04 {

    public static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");
        DeliverCallback deliverCallback = (consumeTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息: " + message);
            /**
             * 1. 消息标记tag
             * 2. 是否批量应答未应答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }

}

睡眠工具类

/**
 * 睡眠工具类
 * @author dell
 * @date 2023/7/7 16:49
 */

public class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

2.7 手动应答效果演示

正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理

在这里插入图片描述

在发送者发送消息 dd,发出消息之后把 C2 消费者停掉,按理说该 C2 来处理该消息,但是它处理的时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理。

在这里插入图片描述

在这里插入图片描述

3. RabbitMQ 持久化

3.1 概念

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

3.2 队列如何实现持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

在这里插入图片描述

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

在这里插入图片描述

以下为控制台中持久化与非持久化队列的 UI 显示区

在这里插入图片描述

这个时候即使重启 rabbitmq 队列也依然存在

3.3 消息实现持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。

在这里插入图片描述

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。

3.4 不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1);

在这里插入图片描述

在这里插入图片描述

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

3.5 预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能 限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。 这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。 该值定义通道上允许的未确认消息的最大数量。 一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。 虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗 (随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/746908.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RT-Thread 互补滤波器 (STM32 + 6 轴 IMU)

作者:wuhanstudio 原文链接:https://zhuanlan.zhihu.com/p/611568999 最近在看无人驾驶的 Prediction 部分,可以利用 EKF (Extended Kalman Filter) 融合不同传感器的数据,例如 IMU, Lidar 和 GNSS,从而给出更加准确的…

Go——基础语法

目录 Hello World! 变量和常量 变量交换 匿名变量 常量 iota——特殊常量 基本数据类型 数据类型转换 运算符 算数运算符 关系运算符 逻辑运算符 位运算符号 ​编辑 赋值运算符 输入输出方法 流程控制 函数 可变参数类型 值传递和引用传递 Hello Wor…

性能测试 jmeter 的 beanshell 脚本的 2 个常用例子

目录 前言: Bean Shell 内置变量大全 例子 1 例子 2 技巧 前言: JMeter是一个功能强大的性能测试工具,而Beanshell是JMeter中用于编写脚本的一种语言。 在利用 jmeter 进行接口测试或者性能测试的时候,我们需要处理一些复杂…

使用GithubAction自动构建部署项目

GitHub Actions 是一种持续集成和持续交付(CI/CD) 平台,可用于自动执行生成、测试和部署管道。 您可以创建工作流程来构建和测试存储库的每个拉取请求,或将合并的拉取请求部署到生产环境。 GitHub Actions 不仅仅是DevOps,还允许您在存储库中…

基于linux下的高并发服务器开发(第一章)-GCC(2)1.3

04 / gcc 和 g的区别 gcc 和 g都是GNU(组织)的一个编译器 【误区一】:gcc只能编译 C 代码,g 只能编译 c 代码。两者都可以,请注意: 后缀为 .c 的,gcc 把它当做是 C 程序,而 g 当做是…

Debezium系列之:prometheus采集debezium的jmx数据,grafana通过dashboard展示debezium的jmx数据

Debezium系列之:prometheus采集debezium的jmx数据,grafana通过dashboard展示debezium的jmx数据 一、需求背景二、实现的效果三、导出debezium jmx四、debezium jmx重要指标五、部署prometheus和grafana六、Debezium MySQL Connector的dashboard七、debezium-dashboard.json八…

二叉树(上)——“数据结构与算法”

各位CSDN的uu们好呀,好久没有更新我的数据结构与算法专栏啦,今天,小雅兰继续来更新二叉树的内容,下面,让我们进入链式二叉树的世界吧!!! 二叉树链式结构的实现 二叉树链式结构的实现…

性能测试工具 Jmeter 测试 Dubbo 接口脚本编写

目录 前言: 1、背景 2、工具准备 3、创建一个 maven 项目,此处可以创建一个 quickstart,参考截图 4、以上配置完毕后,开始撸代码 5、上面那个类是不需要从 jmeter 中获取参数,如果要从 jmeter 中获取相关的参数&…

低代码在边缘计算工业软件中的应用

近年来,边缘计算给工业现场带来了许多新的变化。由于计算、储存能力的大幅提升,边缘计算时代的新设备往往能够胜任多个复杂任务。另外,随着网络能力的提升,边缘设备与设备之间、边缘设备与工业互联网云平台之间的通讯延迟与带宽都…

Flowable边界事件-信号边界事件

信号边界事件 信号边界事件一、定义1. 图形标记2. 设置信号 选择信号3. XML标记 二、测试用例2.1 定时边界事件xml文件2.2 信号边界事件测试用例 总结 信号边界事件 一、定义 接收到信号触发事件 1. 图形标记 2. 设置信号 选择信号 3. XML标记 定时边界事件的XML <signal…

JMeter进行WebSocket压力测试

背景 之前两篇内容介绍了一下 WebSocket 和 SocketIO 的基础内容。之后用 Netty-SocketIO 开发了一个简单的服务端&#xff0c;支持服务端主动向客户端发送消息&#xff0c;同时也支持客户端请求&#xff0c;服务端响应方式。本文主要想了解一下服务端的性能怎么样&#xff0c;…

驱动开发-day9

驱动代码&#xff1a; #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/gpio.h> #include <linux/init.h> #include <linux/interrupt.h> #include <linux/module.h> #include <linu…

Hystrix熔断器

雪崩 当山坡积雪内部的内聚力抗拒不了它所受到的重力拉引时&#xff0c;积雪便向下滑动&#xff0c;引起⼤量雪体崩塌&#xff0c;人们把这种自然现象称作雪崩 微服务中&#xff0c;一个请求可能需要多个微服务接口才能实现&#xff0c;会形成复杂的调用链路 …

在Linux下通过MySQL二进制包安装MySQL5.7

在Linux下通过通用压缩包安装MySQL5.7 卸载MySQL 如果是第一次安装MySQL&#xff0c;在安装MySQL前&#xff0c;知道如何卸载MySQL是很有必要的。因为在安装过程中可能会 遇到各种各样的问题&#xff0c;自己玩的话 卸载重装即可。 1. find / -name mysql 查看MySQL相关包…

Layui之动态树 左侧树形菜单栏 详细全面

⭐ฅʕ•̫͡•ʔฅ本期看点&#xff1a;该篇是运用Layui框架来编写后台树形菜单栏&#xff0c;并且结合MySql来编写完成 目录 一.效果图 二.具体步骤 2.1 数据库 2.2 树形导航栏 第一个类&#xff1a;Treevo 第二个类&#xff1a;BuildTree&#xff1a; 2.3 Dao方法 2.3.…

【自我提升】Spring Data JPA之Specification动态查询详解

写在前面&#xff1a;刷完Spring Data JPA的课后&#xff0c;发现Specification动态查询还挺有意思的&#xff0c;还应用到了规约设计模式&#xff0c;在此记录下学习过程和见解。 目录 一、应用场景 二、源码解析 三、规约模式 四、实际应用 一、应用场景 1. 简介 有时我…

Linux中安装Tomcat

前提条件&#xff1a; 虚拟机中已经提前安装好jdk1.8。 安装步骤&#xff1a; 1.下载安装包 首先去Apache官网下载&#xff08;Apache Tomcat - Apache Tomcat 9 Software Downloads&#xff09; 2.上传到 linux 中&#xff0c;我这里上传的目录是&#xff1a; /opt 3. 解压…

element-plus坑总结

reactive和ref对比 // 定义变量 import { reactive } from vue; const person reactive({name: "John",age: 25, });// 赋值修改 person.name "Tom"; person.age 26;// 使用变量 <div>{{ person.name }}</div> <button click"perso…

layui介绍以及登录功能的实现

一. layui简介 1.1 layui介绍 Layui 是一套开源免费的 Web UI 组件库&#xff0c;采用自身轻量级模块化规范&#xff0c;遵循原生态的 HTML/CSS/JavaScript 开发模式&#xff0c;非常适合网页界面的快速构建。Layui 区别于一众主流的前端框架&#xff0c;它更多是面向于后端开…

3、Linux-进程管理类

进程管理类 进程是正在执行的一个程序或命令&#xff0c;每一个进程都是一个运行的实体&#xff0c;都有自己的地址空间&#xff0c;并占用一定的系统资源。 7.10.1 ps 查看当前系统进程状态 ps:process status 进程状态 1&#xff09;基本语法 ps aux | grep xxx &#xff08…