【初始RabbitMQ】工作队列的实现

news2024/12/27 12:44:41

工作队列

工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

轮训分发消息

我们启动两个工作线程,一个消息发送线程,一个用来接受线程,我们来看看它们两个工作线程是如何工作的

抽取工具类

我们将获取信道这个重复的代码封装为一个类,当时用的时候直接调用

/**
 * 连接工厂创建信道工具类
 */
public class RabbitMqUtils {
    public static Channel getChannel(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("118.31.6.132");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = null;
        try {
            try {
                connection = factory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        Channel channel = null;
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return channel;
    }
}

这里使用try...catch防止之后每次调用都需要抛出异常

生产者代码

/**
 * 生产者 发送大量消息
 */
public class Task01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //发送大量的消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台接受消息
        Scanner scanner = new Scanner(System.in);
        /**
         * 发送一个消息
         * 1.发送到那个交换机
         * 2.路由的KEY值是哪个 本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送"+message+"完成");
        }
    }
}

消费者代码

/**
 * 这是一个工作线程(消费者)
 */
public class Worker01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //消息接受
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        //消息接受被取消时
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消息取消消费接口回调");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        System.out.println("C2等待接受消息......................");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动两个工作线程

在运行之前我们要修改一个选项,这样我们就不需要重复的写消费者2了

启动一个发送线程 

启动程序,用生产者发送四条消息,

结果分析

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且 是按照有序的一个接收一次消息

 消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息应答的方法

  1. Channel.basicAck(用于肯定确认)
    1. RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认)
    1. 与 Channel.basicNack 相比少一个参数(批量应答参数) 不处理该消息了直接拒绝,可以将其丢弃了

Multiple(批量应答)的解释

手动应答的一个好处就是可以批量应答并且减少网络拥堵

multiple 的 true 和 false 代表不同意思:

true:代表批量应答channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

false:同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息

消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码

睡眠工具类:

public class SleepUtils {
    public static void sleep(int second){
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

生产者:

/*
* 消息再手动应答不丢失、放回消息队列重新消费
 */
public class Task2 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消费者01:

public class Work01 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息短");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消费者02:

public class Work02 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息长");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

手动应答效果演示

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1452713.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库

一、效果 记录日志为文档 记录日志到数据库 二、添加NuGet包 三、代码配置 <?xml version"1.0" encoding"utf-8" ?> <log4net><!-- Debug日志 --><appender name"RollingFileDebug" type"log4net.Appender.Roll…

【RL】Value Iteration and Policy Iteration(利用迭代算法求解贝尔曼最优等式)

Lecture 4: Value Iteration and Policy Iteration Value Iteration Algorithm 对于Bellman最优公式&#xff1a; v f ( v ) m a x π ( r γ P π v ) \mathbf{v} f(\mathbf{v}) max_{\pi}(\mathbf{r} \gamma \mathbf{P}_{\pi} \mathbf{v}) vf(v)maxπ​(rγPπ​v) …

Midjourney绘图欣赏系列(一)

Midjourney介绍 Midjourney 是生成式人工智能的一个很好的例子,它根据文本提示创建图像。它与 Dall-E 和 Stable Diffusion 一起成为最流行的 AI 艺术创作工具之一。与竞争对手不同,Midjourney 是自筹资金且闭源的,因此确切了解其幕后内容尚不清楚。我们知道它严重依赖机器学…

Python教程(26)——Python迭代器和生成器详解

迭代器 Python中的迭代器是一种对象&#xff0c;它可以迭代&#xff08;遍历&#xff09;一个可迭代对象&#xff08;比如列表、元组或字符串&#xff09;的元素。迭代器用于实现迭代器协议&#xff0c;即包含 __iter__() 方法和 __next__() 方法。 迭代器的工作原理是每次调…

2.11题目

#include <stdio.h> int main() { char a; while((a getchar()) ! -1) { if(a > A && a < Z) a32; putchar(ch); } return 0;} ———————————————— 版权声明&#xff1a;本文为博主原创文章…

2024年危险化学品经营单位主要负责人证模拟考试题库及危险化学品经营单位主要负责人理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年危险化学品经营单位主要负责人证模拟考试题库及危险化学品经营单位主要负责人理论考试试题是由安全生产模拟考试一点通提供&#xff0c;危险化学品经营单位主要负责人证模拟考试题库是根据危险化学品经营单位主…

【大模型 数据增强】LLMAAA:使用 LLMs 作为数据标注器

【大模型 数据增强】LLMAAA&#xff1a;使用 LLMs 作为数据标注器 提出背景算法步骤1. LLM作为活跃标注者&#xff08;LLMAAA&#xff09;2. k-NN示例检索与标签表述化3. 活跃学习策略4. 自动重权技术 LLMAAA 框架1. LLM Annotator2. Active Acquisition3. Robust Training 总结…

数据结构对链表的初步认识(一)

已经两天没有更新了&#xff0c;今天就写一篇数据结构的链表吧&#xff0c;巩固自己也传授知识&#xff0c;不知道各位是否感兴趣看看这一篇有关联表的文章。 目录 链表的概念与结构 单向链表的实现 链表各个功能函数 首先我在一周前发布了一篇有关顺序表的文章&#xff0c;…

【Linux系统化学习】缓冲区

目录 缓冲区 一个样例 现象解释 缓冲区存在的位置 缓冲区 在刚开始学习C语言的时候我们就听过缓冲区这个名词&#xff0c;很是晦涩难懂&#xff1b;在Linux下进程退出时也包含缓冲区&#xff0c;因此缓冲区到底是什么&#xff1f;有什么作用&#xff1f; 让我们先从一个小…

斯坦福大学全能家政服务机器人Mobile ALOHA以及“小群体大智慧”Zooids集群机器人

斯坦福大学成功研发出低成本自主进化克隆人类行为和任务的能力全能型家政服务机器人。 原文标题: 【Mobile ALOHA-Learning Bimanual Mobile Manipulation with Low-Cost Whole-Body Teleoperation】 论文链接:【Mobile ALOHA (mobile-aloha.github.io)】。 以及由斯坦福大学…

51单片机项目(32)——基于51单片机的温度检测及控制装置的proteus仿真

1.功能设定 使用DS18B20测定当前温度并实时显示在LCD1602屏幕&#xff0c;使用四个按键设定温度的上限、下限。当温度低于下限时&#xff0c;蜂鸣器报警同时开启升温装置&#xff1b;当温度大于上限时&#xff0c;蜂鸣器报警同时启动降温装置。 仿真图如下&#xff1a; 2.软件设…

【无标题】管理kvm 虚拟机

管理kvm 虚拟机 点击虚拟机 创建新的虚拟机 安装操作系统 设置root密码

中小学信息学奥赛CSP-J认证 CCF非专业级别软件能力认证-入门组初赛模拟题第三套(选择题)

CSP-J入门组初赛模拟练习题第三套 1、以下不是属于国家顶级域名的是 A、.au B、.cn C、.com D、.jp 答案&#xff1a;C 考点分析&#xff1a;主要考查域名相关知识&#xff0c;au是澳大利亚、cn是中国&#xff0c;jp是日本&#xff0c;答案C 2、2个10进制数1111和1010的异…

蓝桥杯 星期计算

思路1 由于2022太大&#xff0c;用double来存储&#xff0c;即(52022 % 7) % 7即可 int num 5;int t (int)(Math.pow(20,22)%7);num t;num%7;System.out.println(num1);思路2 你需要知道 (a * b ) % p a % p * b % p Scanner scan new Scanner(System.in);int num 1;for…

计算机设计大赛 深度学习YOLO抽烟行为检测 - python opencv

文章目录 1 前言1 课题背景2 实现效果3 Yolov5算法3.1 简介3.2 相关技术 4 数据集处理及实验5 部分核心代码6 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习YOLO抽烟行为检测 该项目较为新颖&#xff0c;适合作为竞赛课…

【感知机】感知机(perceptron)学习策略

感知机( perceptron )是二类分类的线性分类模型&#xff0c;其输入为实例的特征向量&#xff0c;输出为实例的类别&#xff0c;取1 和-1二值。感知机对应输入空间(特征空间)中将实例划分为正负两类的分离超平面&#xff0c;是一种判别模型。感知机是神经网络与支持向量机的基础…

Linux系统——拓展LVM逻辑卷分区与磁盘配额

一、LVM逻辑卷分区 1.检测并确认新硬盘 1.1fdisk 查看或管理硬盘分区 fdisk -l&#xff08;小写的L&#xff09; &#xff08;硬盘设备&#xff09; 或 fdisk 硬盘设备 1.2实际操作 1.2.1fdisk查询结果详解 Device&#xff1a;分区的设备文件名称Boot&#xff1a;是否…

WordPress主题YIA在广告位添加图片广告时下方有空白怎么办?

YIA主题设置中默认有4个广告位&#xff0c;而侧边栏的广告位由站长自行添加。boke112百科在这些广告位添加图片广告后发现图片下方有空白&#xff0c;导致下方的两个角没有变圆角&#xff0c;看起来也有点不好看。具体如下图所示&#xff1a; 其实&#xff0c;这个问题就是典型…

PyTorch-线性回归

已经进入大模微调的时代&#xff0c;但是学习pytorch&#xff0c;对后续学习rasa框架有一定帮助吧。 <!-- 给出一系列的点作为线性回归的数据&#xff0c;使用numpy来存储这些点。 --> x_train np.array([[3.3], [4.4], [5.5], [6.71], [6.93], [4.168],[9.779], [6.1…

【开源图床】使用Typora+PicGo+Github+CDN搭建个人博客图床

准备工作&#xff1a; 首先电脑得提前完成安装如下&#xff1a; 1. nodejs环境(node ,npm):【安装指南】nodejs下载、安装与配置详细教程 2. Picgo:【安装指南】图床神器之Picgo下载、安装与配置详细教程 3. Typora:【安装指南】markdown神器之Typora下载、安装与无限使用详细教…