RabbitMQ 工作队列模型(任务队列)

news2025/1/17 17:56:32

任务模型

概念

​ 当消息处理比较耗时的时候,可能生产消息的速度大于消费的速度,长此以往,就会导致消息堆积,无法及时处理,此时可以使用任务模型,当多个消费者绑定一个队列,共同消费其中的消息。

image-20211122115837571

代码

  1. 生产者

    /**
     * 生产者
     */
    public class Provider {
    
        /**
         * 生产消息
         */
        @Test
        public void sendMessage() throws IOException, TimeoutException {
    
            //获取连接对象
            Connection connection = RabbitMQUtils.getConnection();
    
            //获取连接中通道
            Channel channel = connection.createChannel();
    
            /**
             * 通过通道声明队列
             * 队列名称 不存在自动创建
             * 是否持久化
             * 是否独占队列
             * 是否在消费完成后自动删除队列
             * 附加参数
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 生产消息
             * 1. 交换机名称
             * 2. 队列名称
             * 3. 传递消息额外设置
             * 4. 具体消息
             */
            for (int i = 0; i < 10; i++) {
                channel.basicPublish("","work",null,("我是工作模型中的消息" + i).getBytes());
            }
    
            RabbitMQUtils.close(connection,channel);
        }
    
    }
    
  2. 消费者1

    /**
     * 消费者1
     */
    public class Customer1 {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接对象
            Connection connection = RabbitMQUtils.getConnection();
    
            //获取连接中通道
            Channel channel = connection.createChannel();
    
            /**
             * 通道绑定队列
             * 队列名称 不存在自动创建
             * 是否持久化
             * 是否独占队列
             * 是否在消费完成后自动删除队列
             * 附加参数
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 消费消息
             * 1. 消费的队列
             * 2. 开始消息的自动确认机制
             * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
             */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1消费的消息:"+ new String(body) );
                }
            });
    
        }
    
    }
    
  3. 消费者2

    /**
     * 消费者2
     */
    public class Customer2 {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接对象
            Connection connection = RabbitMQUtils.getConnection();
    
            //获取连接中通道
            Channel channel = connection.createChannel();
    
            /**
             * 通道绑定队列
             * 队列名称 不存在自动创建
             * 是否持久化
             * 是否独占队列
             * 是否在消费完成后自动删除队列
             * 附加参数
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 消费消息
             * 1. 消费的队列
             * 2. 开始消息的自动确认机制
             * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
             */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2消费的消息:"+ new String(body) );
                }
            });
    
        }
    
    }
    
  4. 先启动连个消费者,然后启动生产者

    image-20211122142932726

    image-20211122142942182

官方总结

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询算法。

  1. 消息自动处理机制

​ 默认情况下,通道是自动确认的,拿到消息之后不论是否消费完毕就会向Rabbit MQ发送确认。这样的话,如上两个消费者有一个有延迟也是平均分配。我们希望有延迟的消费者少消费,没有延迟的消费者多消费。可以关闭其自动确认机制。完成消费之后手动确认即可。

消费者代码

 public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //每次消费一个消息
        channel.basicQos(1);

        /**
         * 通道绑定队列
         * 队列名称 不存在自动创建
         * 是否持久化
         * 是否独占队列
         * 是否在消费完成后自动删除队列
         * 附加参数
         */
        channel.queueDeclare("work",true,false,false,null);

        /**
         * 消费消息
         * 1. 消费的队列
         * 2. 自动确认机制
         * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
         */
        channel.basicConsume("work",false,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2消费的消息:"+ new String(body) );
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

    }

此时,我们的消费者1加入sleep模拟延迟,消费者2 正常执行。这样的话,结果如下

image-20211122145143763

image-20211122145149910

即达到了我们想要的效果。能者多劳。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/487639.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

认识JSP

什么是JSP? JSP&#xff08;Java Server Pages&#xff09;是一种类似于HTML的标记语言&#xff0c;用于创建动态Web页面。与HTML不同的是&#xff0c;JSP页面中可以嵌入Java代码&#xff0c;由Web服务器在动态页面中生成HTML代码&#xff0c;从而实现Web应用程序的前端交互效…

scratch求和 中国电子学会图形化编程 少儿编程 scratch编程等级考试四级真题和答案解析2023年3月

目录 scratch求和 一、题目要求 1、准备工作 2、功能实现 二、案例分析

node之Express

目录 Express简介 安装 创建基本的Web服务器 托管静态资源 以上述案例为基础访问静态资源的路径为&#xff1a;http://127.0.0.1:8080/xxx.html 托管多个静态资源目录 路由的概念 路由的匹配过程 模块化路由 模块化路由案例 中间件的概念 定义中间件函数例 全局生…

AI助阵,领先创新丨智能评价分析·数字员工,开启顶级消费者体验新篇章!

GPT-4作为人工智能时代发展道路上的里程碑&#xff0c;以其惊人的多模态大模型能力吸引了众多零售消费品牌的关注&#xff0c;利用AI技术解决客户复杂业务问题&#xff0c;成为推动行业提质增效的重要途径。实在智能是国内首个生成式AI标准编写单位&#xff0c;始终坚持AI赋能商…

各种常用字符编码详解【图文教程】,Unicode 是一个编码规范,并不是一个具体的编码

文章目录 常见编码的比较&#xff0c;各个编码中各字符占用(字节数):简体中文编码发展史&#xff0c;编码出现时间和涵盖范围UTF-8、unicode与GBK编码转化一问一答其他ASCII表 参考文档 常见编码的比较&#xff0c;各个编码中各字符占用(字节数): 编码英文中文&#xff08;简体…

数说热点 | 跟着《长月烬明》起飞,今年各地文旅主打的就是一个听劝

近日&#xff0c;随着热播剧《长月烬明》的爆火&#xff0c;蚌埠、宣城、敦煌等多个与剧情梦幻联动的宝藏城市被带飞&#xff0c;各地热心网友也纷纷催促自家文旅局赶紧“蹭飞”&#xff0c;《长月烬明》以一己之力打造了影视文旅融合的新样板。 仙偶剧特效天花板&#xff0c;…

SpringCloud------代码demo(二)

SpringCloud------代码demo&#xff08;二&#xff09; 编码实操 以订单——支付微服务模块作为基础&#xff0c;开始逐渐扩充 微服务架构编码构建 1.约定 > 配置 > 编码 2.IDEA新建project工作空间 3.Rest微服务工程构建 总父工程 POM project module 首先创建maven项…

FPGA入门系列14--VGA

文章简介 本系列文章主要针对FPGA初学者编写&#xff0c;包括FPGA的模块书写、基础语法、状态机、RAM、UART、SPI、VGA、以及功能验证等。将每一个知识点作为一个章节进行讲解&#xff0c;旨在更快速的提升初学者在FPGA开发方面的能力&#xff0c;每一个章节中都有针对性的代码…

06 - 3 事件驱动架构模式——EDA

何为EDA 事件驱动架构是一种异步分发事件的架构模式用于高扩展且低耦合的系统以事件为核心&#xff0c;一系列解耦的、单一功能的事件处理器 Notification 源系统发送消息通知其他系统状态改变接收方响应非必须发送 Event 逻辑与处理 Event 逻辑无依赖&#xff0c;独立变化解…

华为路由WS5200 四核版使用体验

文章目录 前言一、主界面和上网设置二、网络WIFI设置三、安全和系统总结 前言 其实我是看不上这种除了能上网&#xff0c;没任何用的东东的。除了上古时代用过类似的简单设备&#xff0c;已经十数年没再看一眼这种东西了&#xff0c;当然更不会去买这种东西&#xff0c;别误会…

vscode配置latex

reference&#xff1a;https://zhuanlan.zhihu.com/p/166523064 1 texlive卸载 找到texlive\2019\tlpkg\installer下的uninst.bat文件并点击运行。 删除环境变量 2 texlive安装 打开https://tug.org/texlive/acquire-iso.html点击下载iso文件 3 vscode 安装 4 latex插件…

Linux系统编程 多线程基础

文章目录 前言一、线程概念二、线程的创建三、线程的退出四、pthread_join函数总结 前言 本篇文章作为多线程的入门讲解将带大家先创建几个线程来感受一下什么是多线程&#xff0c;了解一下多线程到底有什么作用。 一、线程概念 线程&#xff08;Thread&#xff09;是程序执…

C++类与对象(二)——构造函数与析构函数

文章目录 一.类的默认6个成员函数二.构造函数1.引例2.构造函数的概念及特性 三.析构函数&#x1f60b;析构函数的特性 前言&#xff1a; 上篇文章初步认识了类以及类的相关知识&#xff0c;本篇将继续深入学习类与对象——类的默认6个成员函数&#xff1a; 一.类的默认6个成员函…

制造企业如何跨越大规模定制鸿沟?中国最大减速机企业的答案来了

导读&#xff1a;传统制造企业如何深度用云&#xff1f; 在中国制造向中国智造的转型中&#xff0c;长三角地区一直扮演着急先锋的角色。总部位于常州的江苏国茂减速机股份有限公司(简称国茂股份)&#xff0c;就是中国制造上云转型的典型代表。 国茂股份成立于1993年&#xff0…

Microelectronic学习章节总结(2)-- data path和control unit设计

文章目录 part1. Data path设计1.1 logic unit1.2 shifter1.3 adder1.4 comparator1.5 multiplier1.6 divider1.7 register file part2. Control unit设计part3. CPU SoC上的其它部件 (TODO&#xff1a;理解总结)3.1 总线AMBA&#xff08;parallel&#xff09; 3.2 Memorymemor…

[C++基础]-类和对象(下)

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正。 目录 一、深入学…

推荐算法实战项目:AFM 原理以及案例实战(附完整 Python 代码)

本文要介绍的是由浙江大学联合新加坡国立大学提出的AFM模型。通过名字也可以看出&#xff0c;此模型又是基于FM模型的改进&#xff0c;其中A代表”Attention“&#xff0c;即AFM模型实际上是在FM模型中引入了注意力机制改进得来的。 之所以要在FM模型中引入注意力机制&#xf…

PC3-管理员操作

token无效可能&#xff0c;就是token过期了需要配置&#xff1a;&#xff1a;&#xff1a; history 安装可以跳路由在ts文件中&#xff1a;因为在ts文件中还需要store&#xff0c;清空token // 安装可以跳路由在ts文件中npm i history 防止接口出现 token 无效&#xff0c;登…

【C++】AVL平衡二叉树源码剖析

目录 概述 算法 左单旋 右单旋 左右双旋 右左双旋 源码 AVLTree.h test.cpp 概述 AVL树也叫平衡二叉搜索树&#xff0c;是二叉搜索树的进化版&#xff0c;设计是原理是弥补二叉搜索树的缺陷&#xff1a;当插入的数据接近于有序数列时&#xff0c;二叉搜索树的性能严重…

20天能拿下PMP吗?

新版大纲&#xff0c;专注于人员、过程、业务环境三个领域&#xff0c;内容贯穿价值交付范围&#xff08;包括预测、敏捷和混合的方法&#xff09;。除了考试时间由240分钟变更为230分钟、200道单选题变为180道&#xff08;包含单选和多选&#xff09;之外&#xff0c;新考纲还…