三、work queues(多进程消费一个队列)

news2025/1/10 10:27:50

1、轮训分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

1.1 抽取工具类

public class RabbitMqUtils {
    //得到一个连接的channel
    public static Channel getChannel() throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.126.10");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

1.2 启动两个工作线程

/**
 * 消费线程1
 */
public class Work01 {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        System.out.println("Work01 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

public class Work02 {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        System.out.println("Work02 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

1.3 启动一个发送线程

public class Task1 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("发送消息完成:" + message);
            }
        }
    }
}

在这里插入图片描述

1.4 消费结果

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且
是按照有序的一个接收一次消息
在这里插入图片描述
在这里插入图片描述

2、消息应答

2.1 为什么需要消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况?

RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制
消息应答就是: 消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

2.2 自动应答

什么是自动应答?
消息发送后立即被认为是已经传送成功,这种模式需要在高吞吐量和数据传输安全方面做权衡

  • 为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了
  • 面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死

适用场景:
适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用!!!

2.3 消息应答的代码

  • Channel.basicAck(用于肯定确认) : RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认) : 消费者对消息进行否定确认后,队列中的消息从unchecked中重新入队
  • Channel.basicReject(用于否定确认) : 表明消费者不处理该消息了直接拒绝,可以将其丢弃了

2.4 Multiple 的解释

手动应答的好处是可以批量应答并且减少网络拥堵
在这里插入图片描述

multiple 的 true 和 false 代表不同意思

  • true 代表批量应答 channel 上未应答的消息。比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这 些还未应答的消息都会被确认收到消息应答
  • false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

2.5 消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息

2.6 消息手动应答代码

消费者消费消息时,autoAck属性设置为 false

// 消费者1:消费处理时间短
public class Work01 {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
            //当工作线程对消息处理完毕后,手动对消息进行应答
            //第一个参数:消息标记
            //第二个参数:是否批量应答当前消息之前所有未应答的消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        System.out.println("Work01 消费者启动等待消费......");
        // 是否开启手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

// 消费者2:消费处理时间长,sleep30秒后,再进行应答
public class Work02 {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        //设置不公平分发
//        channel.basicQos(1);
        System.out.println("Work02 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

2.7 手动应答效果演示

在这里插入图片描述

2.8 不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是
RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1);

public class Work01 {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
            //当工作线程对消息处理完毕后,手动对消息进行应答
            //第一个参数:消息标记
            //第二个参数:是否批量应答当前消息之前所有未应答的消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        System.out.println("Work01 消费者启动等待消费......");
        //设置不公平分发
        channel.basicQos(1);
        // 是否开启手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

在这里插入图片描述
在这里插入图片描述

3、RabbitMQ的持久化

3.1 概念

刚刚我们已经了解到,当消息发送给了消费者,但是消费者在消费的过程中发生异常时,如何保证消息的不丢失。
但是如何保证,RabbitMQ服务退出或者因为其他原因崩溃时,队列中的消息依然存在 !!!

确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

3.2 如何实现队列的持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
在这里插入图片描述
以下为控制台中持久化与非持久化队列的 UI 显示区
在这里插入图片描述

3.3 消息实现持久化

要想使的消息实现持久化,需要在生产者发送消息时,给消息设置持久化的属性:MessageProperties.PERSISTENT_TEXT_PLAIN

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘但是还没有存储完的时候,RabbitMQ出现异常,此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。

3.4 预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息;另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383696.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探寻世界:用Python获取照片的地理定位信息

目录 步骤: 源代码: 代码说明: 报错1: 解决方法1: 报错2: 解决方法2: 效果如下所示: 验证效果如下: 一、步骤: 要从 JPEG 图像中获取经纬度信息&…

投票页面制作线上投票活动制作网络投票制作关注投票制作

现在来说,公司、企业、学校更多的想借助短视频推广自己。通过微信投票小程序,网友们就可以通过手机拍视频上传视频参加活动,而短视频微信投票评选活动既可以给用户发挥的空间激发参与的热情,又可以让商家和企业实现推广的目的&…

2023上海国际电商物流包装产业展览会相约上海

2023年7月5-7日 | 上海新国际博览中心 同期举办:2023上海国际快递物流产业博览会 指导单位:上海市邮政管理局 中国快递协会 主办单位:上海市快递行业协会 上海市仓储与配送行业协会 上海市物流协会 承办单位:上海信世展览服务有…

【NLP相关】GPT-X合集:GPT类模型介绍(附相关论文和Github项目地址)

❤️觉得内容不错的话,欢迎点赞收藏加关注😊😊😊,后续会继续输入更多优质内容❤️👉有问题欢迎大家加关注私戳或者评论(包括但不限于NLP算法相关,linux学习相关,读研读博…

哈希一致性算法(分布式服务器落点算法)

场景预设和一般hash算法: 先预设一个场景,有10000份文件,需要缓存到五台缓存服务器之上 那么按照最常规,每个服务器平均分配2000份文件 那么用一个取余操作就可以完成 比如说是第2513的图片,那么用一个公式 需要缓…

HTML概述与基本标签

🌟所属专栏:HTML只因变凤凰之路🐔作者简介:rchjr——五带信管菜只因一枚😮前言:该系列将持续更新HTML的相关学习笔记,欢迎和我一样的小白订阅,一起学习共同进步~👉文章简…

simplifyEnrichment | 让我来做你的富集结果的瘦身教练吧!~

1写在前面 最近真是烦心啊,事事不顺,找个日子我要找大师算一卦。😂 大家基本都会做富集分析,但有时候terms实在太多,读起来真是累,也搞不清到底谁是其中相对重要的。🥲 之前有一些R包通过计算基…

【告别篇】大家好,再见了,我转行了,在筹备创业

前言 相信大家也一直看到我的博客没有更新过了,我其实很久没有打开过博客了,也就意味着我很长一段时间都在停滞不前,没有了学习的动力。 现在我上来是想跟大家告个别 : 很多粉丝宝宝的私信我看了,但是没有回&#xf…

并查集结构

文章目录并查集特点构建过程查找两个元素是否是同一集合优化查找领头元素设置两个元素为同一集合构建结构应用场景并行计算集合问题并查集特点 对于使用并查集构建的结构,可以使得查询两个元素是否在同一集合,以及合并集合的操作无限接近O(1) 构建过程…

Intellij idea使用Statistic统计代码行数的方法

一、安装Statistic1、打开IDEA2、打开settings进行设置3、选择plugins,搜索Statistic并安装4、下载完成之后,重启IDEA,此时Statistic就安装好了二、使用Statistic1、安装好Statistic之后我们可以通过以下步骤 将Statistic插件的控制台展示出来…

2023年Dubbo常见面试题

2023年Dubbo常见面试题 Dubbo 中 zookeeper 做注册中心,如果注册中心集群都挂掉,发布者和订阅者之间还能通信么? 可以通信的,启动 dubbo 时,消费者会从 zk 拉取注册的生产者的地址接口等数据,缓存在本地。…

3/2考试总结

时间安排 7:30–7:50 读题,T1 貌似是个构造,T2 应该是个圆方树 dp 加上一些神秘的暴力,T3 不知道是啥。 7:50–9:00 T1,发现没法暴力。考虑能不能构造什么的,好像也不好构造。可能是个别的什么东西。手玩样例有一些结论&#xff…

【UE4 Cesium】加载离线地图

主体思路:先使用水经注软件下载瓦片数据,再使用Python转换瓦片数据格式(TMS),使用Nginx发布网络服务,最后将网络服务加载到UE中。步骤:使用水经注下载瓦片数据,这里下载的是全球七级…

JavaSE22-集合2-map

文章目录一、集合概念二、map集合1、Map集合的特点2、HashMap2.1 HashMap特点2.2 创建对象2.3 常用方法2.4 遍历2.4.1 使用entrySet遍历2.4.2 使用keySet遍历3、HashMap的key去重原理一、集合概念 集合就是用于存储多个数据的容器。相对于具有相同功能的数组来说,集…

神垕古镇景区三方背后的博弈,争夺许昌第一家5A景区主导权

钧 瓷 内 参 第37期(总第368期) 2023年3月2日 神垕古镇景区景域,建业,孔家三方背后的博弈,争夺许昌第一家5A景区主导权 在博弈论(Game Theory)经济学中,“智猪博弈”是一个著名的…

Delphi 中 FireDAC 数据库连接(脱线连接 )

参见:Delphi 中 FireDAC 数据库连接(总览)述了如何使用FireDAC离线模式,它允许你在没有与数据库持久连接的情况下处理数据。一、概述FireDAC的离线模式类似于多层客户端,大部分时间客户端与数据库断开连接。只有当客户…

给深度学习研究生的入门建议(未完待续ing)

诸神缄默不语-个人CSDN博文目录 本文将系统性介绍深度学习方向(准)研究生可供参考的入门建议。 我的背景是浙江大学人工智能专业在读硕士,研究方向是GNN、NLP、司法智能。 (我的CSDN博文基本涵盖了我所有的深度学习知识&#xff…

pytorch-多层感知机,最简单的深度学习模型,将非线性激活函数引入到模型中。

多层感知机,线性回归和softmax回归在内的单层神经网络。然而深度学习主要关注多层模型。在本节中,我们将以多层感知机(multilayer perceptron,MLP)为例,介绍多层神经网络的概念。 隐藏层 多层感知机在单层…

Vue环境的搭建和在vscode上的应用(Window10)

Vue环境的搭建 1.安装: 从官网下载安装包,解压到指定位置,就相当于安装完成了。 2.配置环境变量 找到node.js的文件夹,在里面找到src,把路径复制一下。 我在E盘建立了一个文件夹放node,如图找到bin的路径&…

vuecli3打包项目上线之后报错怎么使用本地的sourcemap文件定位调试?

问题 我们上线的时候一般都不会添加sourcemap文件&#xff0c;一方面为了加快构建速度&#xff0c;另一方面避免源码泄漏。所以有时出现报错的时候很难定位问题所在。 例子&#xff1a;比如我写了一个错误的代码&#xff0c;点击 <template><div class"hello&…