WorkQueue模型

news2025/1/17 17:42:05

        WorkQueues,也被称为任务队列模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时的处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因为任务是不会被重复执行的。

P:生产者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢。

C2:消费者-2,领取任务并且完成任务,假设完成速度快。

1.生产者

public class Provider {
    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i = 1;i<=200;i++){
            channel.basicPublish("","work",null,(i+"hello rabbitmq").getBytes());
        }
        RabbitMqUtil.closeConnectionAndChannel(channel,connection);
    }
}

2.消费者-1

 

public class Consumer1 {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                    System.out.println("consumer1得到:"+new String(body));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

 3.消费者-2

public class Consumer2 {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2得到:"+new String(body));
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

4.结果分析

通过运行结果我们发现消费者1和消费2是平均处理消息的,就比如1000个消息一人处理一半。而且现在的机制是,队列中的消息会一次性全部分配给两个消费者,囤积到两个消费者处然后让两个消费者去各自慢慢消化。这样就会产生一些问题:

1.由于我们设置了消息的自动确认机制,两个消费者刚得到大量消息都还没开始消费其实就已经告诉队列我们确认完了,这样显然是不合理的。

2.消费者那边一次性囤积了大量未处理的消息,如果处理中宕机了,囤积的消息会丢失。

而且假如消费者2执行的很快,而另一个消费者1执行的很慢,这样消费者2很快执行完就空闲了,而消费者1一直迟迟执行不完。能不能改进为能者多劳的机制呢?

  • 消费者一次只接收一条未确认的消息。

  • 关闭自动确认消息。

  • 消费者处理完一条,要手动确认消息。

5.能者多劳

消费者改进:

public class Consumer1Improve {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicQos(1);//一次只接收一条
        //参数2:关闭自动确认
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                    System.out.println("consumer1得到:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1348464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IDEA错误: 找不到或无法加载主类 com.atguigu.springcloud.EurekaServer7001_App

第一种方法&#xff1a; 可以手动点击maven中的compile编译一下&#xff0c;如下图&#xff1a; 第二种方法&#xff1a; 在pom.xml文件中加入编译插件&#xff1a; <build><plugins><!-- 编译插件 --><plugin><artifactId>maven-compiler-plu…

matlab概率论例子

高斯概率模型&#xff1a; [f,xi] ksdensity(x): returns a probability density estimate, f, for the sample in the vector x. The estimate is based on a normal kernel function, and is evaluated at 100 equally spaced points, xi, that cover the range of the da…

如何在Linux系统中安装Redis

原本Redis官网提供了Windows和Linux两个版本&#xff0c;但从 2011-12-29 以后不再更新Windows版本&#xff08;https://github.com/dmajkic/redis/downloads&#xff09;&#xff0c;加之企业生产环境通常使用Linux系统&#xff0c;所以这里在Linux系统中演示如何安装Redis。 …

typescript,eslint,prettier的引入

typescript 首先用npm安装typescript&#xff0c;cnpm i typescript 然后再tsc --init生成tsconfig.json配置文件&#xff0c;这个文件在package.json同级目录下 最后在tsconfig.json添加includes配置项&#xff0c;在该配置项中的目录下&#xff0c;所有的d.ts中的类型可以在…

11 HAL库的硬件I2C驱动SI7006和AP3216C

引言&#xff1a; 本片文章想给大家分享一下使用HAL库驱动SI7006和AP3216C&#xff0c; 这两款常见的芯片的手册会在文章的末尾提供给大家。 一、SI7006和AP3216C简介 SI7006 SI7006是一款数字湿度和温度传感器&#xff0c;由Silicon Labs&#xff08;全称Silicon Laboratories…

【AI视频领域展望】未来视频行业:人工智能、5G和VR技术将如何改变视频制作和观看方式?

5G技术 5G技术的商用将会进一步推动物联网和视频行业的融合。通过5G技术&#xff0c;可以实现高清视频的实时传输和播放&#xff0c;为用户提供更加流畅和快速的观看体验。 5G视频的优势主要体现在以下几个方面&#xff1a; 更低的延迟&#xff1a;5G网络的延迟时间相比4G降低…

Plantuml之EBNF语法介绍(二十七)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

App.vue中引入自定义组件

components目录中定义组件&#xff1a;Person.vue 目录截图&#xff1a; Person.vue文件中内容&#xff1a; <template><div class"person"><h2>姓名&#xff1a;{{name}}</h2><h2>年龄&#xff1a;{{age}}</h2><!--定义了…

OSCHINA Gitee 联合呈现,《2023 中国开源开发者报告》正式发布,总结分非常帮,可以免费看的报告!

《2023 中国开源开发者报告》 详细地址&#xff1a; https://talk.gitee.com/report/china-open-source-2023-annual-report.pdf 不需要收费下载&#xff01;&#xff01; 其中大模型的部分总结的非常棒 gietee 也支持 AI 模型托管了 如何在 Gitee 上托管 AI 模型 https://…

使用WAZUH检测LD_PRELAOD劫持、SQL注入、主动响应防御

目录 1、检查后门 使用工具检测后门 1.chkrootkit 2.rkhunter 手动检查文件 检查ld.so.preload文件 2、检测LD_PRELOAD ubuntu配置 wazuh配置 3、检测SQL注入 ubuntu配置 攻击模拟 4、主动响应 wauzh的安装以及设置代理可以参考本篇&#xff1a;WAZUH的安装、设置…

【Qt之Quick模块】6. QML语法详解_3 QML对象特性

概述 每一个QML对象类型都包含一组已定义的特性。当进行实例时都会包含一组特性&#xff0c;这些特性是在对象类型中定义的。 一个QML文档中的对象类型声明了一个新的类型&#xff0c;即实例出一个类型。 其中包含以下特性。 the id attribute &#xff1a; id特性property a…

《教育观察》是什么级别的期刊?是正规期刊吗?能评职称吗?

教育类&#xff5c;《教育观察》知网收录 《教育观察》始终秉持“ 立足教育实践&#xff0c;展望教育未来”&#xff0c;致力于在教育实践中以“观察”为方法&#xff0c;以“观察者”为主体&#xff0c;以“新观察”为旨趣&#xff0c;打造从教育实践中洞察教育未来的教育研究…

深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第四节 参数传递对堆栈的影响 2

深入浅出图解C#堆与栈 C# Heaping VS Stacking 第四节 参数传递对堆栈的影响 2 [深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第一节 理解堆与栈](https://mp.csdn.net/mdeditor/101021023)[深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第二节 栈基本工作原理](htt…

c语言:打印平行四边形|练习题

一、题目 输入平行四边形的边数&#xff0c;用星号打印平行四边形 如图&#xff1a; 二、思路分析 图形分为两部分 1、左边的空格 2、右边的星号 因此&#xff0c;把空格和星号合起来&#xff0c;就是要求的图形 三、代码图片【带注释】 四、源代码【带注释】 #include <s…

你逛过凌晨四点的校园吗?--大四毕业生的年终总结

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 又是一年的年终总结&#xff0c;我也迎来了自己的毕业季&#xff0c;没错&#xff0c;我马上要毕业啦&#xff01;不知道大家是什么时候认识我的呢&#xff0c;又或者是第一次发现我~这一年&#xff0c;迎接过朝阳、拍下过…

手摸手系列之SpringBoot+Vue2项目整合高德地图实现车辆实时定位功能

前言 最近在做一个物流内陆运输的项目&#xff0c;其中的一个关键功能是根据车辆的GPS数据在页面上实时显示车辆位置信息。由于我们已经获得了第三方提供的GPS数据&#xff0c;所以接下来的任务是将这些数据整合到我们的系统中&#xff0c;并利用高德地图API来展示车辆的实时位…

机器学习分类

1. 监督学习 监督学习指的是人们给机器一大堆标记好的数据&#xff0c;比如&#xff1a; 一大堆照片&#xff0c;标记出哪些是猫的照片&#xff0c;哪些是狗的照片 让机器自己学习归纳出算法或模型 使用该算法或模型判断出其他没有标记的照片是否是猫或狗 上述流程如下图所…

解决windows系统找不到msvcr100.dll问题,vcomp100.dll缺失的5个解决方法

在日常使用计算机的过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是“找不到vcomp100.dll”的错误。那么&#xff0c;vcomp100.dll究竟是什么文件&#xff1f;为什么会出现丢失的情况&#xff1f;本文将为您详细解析vcomp100.dll的作用、丢失原因以及提…

C++的面向对象学习(9):文件操作

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、类的封装的多文件实现回顾二、文件操作1.对文件进行操作需要头文件<fstream>2.操作文件的三大类方法&#xff1a;读、写、读写 三、实现文本文件的读、写…

【STM32F103】SysTick系统定时器延时函数

SysTick SysTick是Cortex-M3内核中的一个外设&#xff0c;内嵌在NVIC中&#xff0c;叫系统定时器。 当处理器在调试期间被喊停时&#xff0c;SysTick也将暂停运作。 一共有四个寄存器&#xff0c;不过我们通常用前三个&#xff0c;不需要校准。下图出自《STM32F10xxx Cortex…