RabbitMQ---work消息模型

news2024/12/25 1:44:36

1、work消息模型

工作队列或者竞争消费者模式
在这里插入图片描述

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:

o P:生产者:任务的发布者
o C1:消费者,领取任务并且完成任务,假设完成速度较快
o C2:消费者2:领取任务并完成任务,假设完成速度慢

面试题:避免消息堆积?

1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。

1.1、生产者

生产者与案例1中的几乎一样:

public class Send {
   private final static String QUEUE_NAME = "test_work_queue";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       // 循环发布任务
       for (int i = 0; i < 50; i++) {
           // 消息内容
           String message = "task .. " + i;
           channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
           System.out.println(" [x] Sent '" + message + "'");
       }
       // 关闭通道和连接
       channel.close();
       connection.close();
   }
}

不过这里我们是循环发送50条消息。

1.2、消费者1

// 消费者1
public class Recv {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者1] received : " + msg + "!");
                try {
                    // 模拟完成任务的耗时:1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                // 手动ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

1.3、消费者2

//消费者2
public class Recv2 {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] received : " + msg + "!");
                try {
                    // 模拟完成任务的耗时:200ms
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                }
                // 手动ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:
在这里插入图片描述
在这里插入图片描述

可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。

1.4、能者多劳

• 刚才的实现有问题吗?
o 消费者1比消费者2的效率要低,一次任务的耗时较长
o 然而两人最终消费的消息数量是一样的
o 消费者2大量时间处于空闲状态,消费者1一直忙碌
• 现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
• 怎么实现呢?
o 我们可以使用basicQos方法和prefetchCount = 1设置。
o 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。
o 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。
o 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
在这里插入图片描述

再次测试:

  ![在这里插入图片描述](https://img-blog.csdnimg.cn/a25bdfcd50bf41f9a6e51076560cd15f.png)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/919535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32电源名词解释

STM32电源架构 常用名词 VCC Ccircuit 表示电路&#xff0c;即接入电路的电压。 VDD Ddevice 表示器件&#xff0c; 即器件内部的工作电压。 VSS Sseries 表示公共连接&#xff0c;通常指电路公共接地端电压。 VDDA Aanalog 表示模拟&#xff0c;是模拟电路部分的电源。主要为…

七大排序算法详解

1.概念 1.排序的稳定性 常见的稳定的排序有三种&#xff1a;直接插入排序&#xff0c;冒泡排序&#xff0c;归并排序 对于一组数据元素排列&#xff0c;使用某种排序算法对它进行排序&#xff0c;若相同数据之间的前后位置排序后和未排序之前是相同的&#xff0c;我们就成这种…

stm32之5.长按按键(使用时钟源)调整跑马灯速度

------------------------------ 源码 #include <stm32f4xx.h> #include "led.h" #include "delay.h" #include "my_str.h" #include "beep.h" #include "key.h" int main(void) { key_init(); Led_init();…

Java【手撕双指针】LeetCode 57. “两数之和“, 图文详解思路分析 + 代码

文章目录 前言一、两数之和1, 题目2, 思路分析3, 代码展示 前言 各位读者好, 我是小陈, 这是我的个人主页, 希望我的专栏能够帮助到你: &#x1f4d5; JavaSE基础: 基础语法, 类和对象, 封装继承多态, 接口, 综合小练习图书管理系统等 &#x1f4d7; Java数据结构: 顺序表, 链表…

M1使用android模拟器的办法

问题背景 在产品的某一期需求中&#xff0c;是关于对于模拟器使用app的用户进行一定的管控&#xff0c;比如说封禁。那么这时候测试就需要模拟器了。对于Windows平台&#xff0c;网上有很多的平台可供选择。使用M1的我&#xff0c;只能不断的找可以用的模拟器。 解决过程 首…

uniapp-form表单

<template><view class"ptb-20 plr-30 bg min100"><view class"bg-white radius-20 pd-30"><view class"bold mt-30 mb-50 size-32">选择方式&#xff1a;</view><u--form labelPosition"left" :mod…

Orchestrator自身高可用性方案

目录 获得 HA 的方法 一 没有高可用性 &#xff08;No high availability&#xff09; 使用场景 架构组成 架构图 二 半高可用性&#xff08;Semi HA&#xff09; 三 基于共享数据库后端高可用&#xff08;HA via shared backend&#xff09; 四 基于Raft协议高可用 五…

python中的matplotlib画折线图(数据分析与可视化)

先导包&#xff08;必须安装了numpy 、pandas 和matplotlib才能导包&#xff09;&#xff1a; import numpy as np import pandas as pd import matplotlib.pyplot as plt核心代码&#xff1a; import numpy as np import pandas as pd import matplotlib.pyplot as pltpd.se…

什么是软件压力测试?软件压力测试工具和流程有哪些?

软件压力测试 一、含义&#xff1a;软件压力测试是一种测试应用程序性能的方法&#xff0c;通过模拟大量用户并发访问&#xff0c;测试应用程序在压力情况下的表现和响应能力。软件压力测试的目的是发现系统潜在的问题&#xff0c;如内存泄漏、线程锁、资源泄漏等&#xff0c;…

小猫爪:嵌入式小知识15-XCP基础简介

小猫爪&#xff1a;嵌入式小知识15-XCP基础简介 0 目录1 前言2 XCP的由来3 XCP基础简介3.1 XCP的协议组成3.2 XCP的通信模式3.2.1 Standard communication model3.2.2 Block Transfer communication model3.2.3 Interleaved communication model 3.3 XCP帧格式3.4 XCP的专业术语…

又一个国产操作系统将现身,基于AOSP,兼容安卓应用

日前媒体报道指又一家手机企业似乎自研自主操作系统&#xff0c;名字为*IOS&#xff0c;已申请域名备案&#xff0c;这被认为是它开始自研手机操作系统的迹象&#xff0c;自研操作系统似乎已成为国产手机努力的方向。 关于*IOS最早在2014年就曾传出&#xff0c;但是后来因故而最…

【Unity】Text文本组件的一些操作

Unity的Text组件的几种常见的操作方法 Text组件是Unity中用于在UI界面上显示文本的组件。它包含了一些常见的属性和方法&#xff0c;可以用来控制文本的内容、外观和交互。以下是一些常见的Text组件的操作&#xff1a; 设置文本内容&#xff1a;通过直接在Unity编辑器中的Text…

计算机竞赛 基于GRU的 电影评论情感分析 - python 深度学习 情感分类

文章目录 1 前言1.1 项目介绍 2 情感分类介绍3 数据集4 实现4.1 数据预处理4.2 构建网络4.3 训练模型4.4 模型评估4.5 模型预测 5 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于GRU的 电影评论情感分析 该项目较为新颖&#xff0c;适合作为竞…

VM——获取图像中的圆环区域

、需求&#xff1a;下图是圆柱形铝罐&#xff0c;需要获取图像中的罐沿区域。 2、方法如下&#xff1a; (1)通过找外圆&#xff0c;提取圆形区域 &#xff08;2&#xff09;利用“拷贝填充”模块&#xff0c;绘制外圆ROI&#xff0c;选择“输出掩膜” &#xff08;3&#xff09…

【排错经验】树莓派4B摄像头问题集锦(USB摄像头)

1、E: Unable to locate package luvcview 这款软件不是在任意版本的操作系统下都适用的&#xff0c;要查看自己Ubuntu版本支持的uvcview。 方法&#xff1a;输入命令&#xff1a;sudo apt-cache search uvcview 由图可知&#xff0c;我这个版本的操作系统支持的是guvcview 所…

GMS基本模块TIN、Solids、Modflow2000/2005、MT3DMS、MODPATH。及其在地下水流动、溶质运移、粒子追踪方面的应用

解决地下水数值模拟技术实施过程中遇到的困难&#xff0c;从而提出切实可行的环境保护措施&#xff0c;达到有效保护环境、防治地下水污染&#xff0c;推动经济社会可持续发展的目的。 &#xff08;1&#xff09;水文地质学&#xff0c;地下水数值模拟基础理论&#xff1b;&am…

WPF中手写地图控件(3)——动态加载地图图片

瓦片增加一个Loading动画 可以查看我的另一个博客WPF中自定义Loading图 从中心扩散 进行从里到外的扩散&#xff0c;方向是上左下右。如下图所示 于是我们可以定义一个拥有坐标X跟Y的集合&#xff0c;他允许这个集合&#xff0c;内部使用枚举器的MoveNext自动排序&#xf…

K8S如何部署ZooKeeper以及如何进行ZooKeeper的平滑替换

前言 在之前的章节中&#xff0c;我们已经成功地将Dubbo项目迁移到了云环境。在这个过程中&#xff0c;我们选择了单机ZooKeeper作为注册中心。接下来&#xff0c;我们将探讨如何将单机ZooKeeper部署到云端&#xff0c;以及在上云过程中可能遇到的问题及解决方案。 ZooKeeper…

最优的家电设备交互方式是什么?详解家电设备交互的演进之旅

家电&#xff0c;在人们的日常生活中扮演着不可或缺的角色&#xff0c;也是提升人们幸福感的重要组成部分&#xff0c;那你了解家电的发展史吗&#xff1f; 70年代 结婚流行“四大件”&#xff1a;手表、自行车、缝纫机&#xff0c;收音机&#xff0c;合成“三转一响”。 80年…

精进语言模型:探索LLM Training微调与奖励模型技术的新途径

大语言模型训练&#xff08;LLM Training&#xff09; LLMs Trainer 是一个旨在帮助人们从零开始训练大模型的仓库&#xff0c;该仓库最早参考自 Open-Llama&#xff0c;并在其基础上进行扩充。 有关 LLM 训练流程的更多细节可以参考 【LLM】从零开始训练大模型。 使用仓库之…