2工作队列

news2025/1/6 20:14:52

工作队列

逻辑图

image-20210810220747032

<!-- SpringBoot 消息队列的起步依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

轮询分发 Round-robin

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class WorkQueueProducer {
    /**
     * 生产者 → 消息队列
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 创建队列
     * 发送消息
     **/
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        if(true){
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建队列
        /**
         * String queue                     队列名称
         * boolean durable                  是否持久化,
         * boolean exclusive                含义一:是否独占,是否只能有一个消费者监听
         *                                  含义二:connection 关闭是否删除队列
         * boolean autoDelete               是否自动删除,当没有消费者的时候是否自动删除
         * Map<String, Object> arguments    参数
         */
        channel.queueDeclare("WorkQueues",true,false,false,null);

        //发送消息
        /**
         * String exchange          : 交换机名称,简单模式不使用交换机
         * String routingKey        : 路由规则,当不使用交换机时,路由键需要和队列名称相同
         * BasicProperties props    : 配置参数
         * byte[] body              : 消息体,真实的数据
         */
        for (int i = 0; i < 20; i++) {
            String str = "WorkQueues is so easy!\t" + i + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());;
            System.out.println("发送消息:\t" + str);
            channel.basicPublish("","WorkQueues",null,str.getBytes());
        }

        //释放资源
        channel.close();
        connection.close();

        System.out.println("消息发送成功");
    }
}
  • 与简单队列几乎没有什么不同

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueConsumerA {

    /**
     * 消息队列 ← 消费者
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 订阅队列
     * 接收消息
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂,并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        //创建连接 Connection
        Connection connection = factory.newConnection();
        //创建通道 Channel
        Channel channel = connection.createChannel();


        /**
         * consumerTag  消费信息标签
         * delivery     回执
         */
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            byte[] body = delivery.getBody();
            System.out.println("消费消息:\t" + new String(body));
        };
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * String queue                         :   队列名称
         * boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
         * DeliverCallback deliverCallback      :   回调函数
         * CancelCallback cancelCallback        :   消费者取消订阅时的回调函数
         */
        channel.basicConsume("WorkQueues", true, deliverCallback, consumerTag -> {
        });
    }
}
  • 再创建一个类 WorkQueueConsumerB,代码与 WorkQueueConsumerA 一样,只是类型不同

测试

  • 先启动生产者,查看 RabbitMQ 网页控制台

  • 先将2个消费者启动

    • 第一个消费者启动的时候,会将所有的都消费掉
    • 将两个都启动之后,再启动生产者
  • 再启动生产者

image-20210810224949350

image-20210810225050947image-20210810225104104

公平分发 Fair

如果机器 A 性能很好,一下子就处理完了,其他时间一直空闲,而机器 B 性能很差,很久都不能处理完一条,但是队列还是一人一条的轮询分发,这就造成 A 性能浪费,B 处理慢

我们采用公平分发

采用 basicQos(prefetchCount=1) ,来限制 MQ 只发不超过1条的消息给同一个消费者,当消费者处理完消息,给 MQ 反馈了,MQ 才会进行第二次发送

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class WorkQueueProducer {
    /**
     * 生产者 → 消息队列
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 创建队列
     * 发送消息
     **/
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        if(true){
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建队列
        /**
         * String queue                     队列名称
         * boolean durable                  是否持久化,
         * boolean exclusive                含义一:是否独占,是否只能有一个消费者监听
         *                                  含义二:connection 关闭是否删除队列
         * boolean autoDelete               是否自动删除,当没有消费者的时候是否自动删除
         * Map<String, Object> arguments    参数
         */
        channel.queueDeclare("WorkQueues",true,false,false,null);

        //发送消息
        /**
         * String exchange          : 交换机名称,简单模式不使用交换机
         * String routingKey        : 路由规则,当不使用交换机时,路由键需要和队列名称相同
         * BasicProperties props    : 配置参数
         * byte[] body              : 消息体,真实的数据
         */
        for (int i = 0; i < 20; i++) {
            String str = "WorkQueues is so easy!\t" + i + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());;
            System.out.println("发送消息:\t" + str);
            channel.basicPublish("","WorkQueues",null,str.getBytes());
        }

        //释放资源
        channel.close();
        connection.close();

        System.out.println("消息发送成功");
    }
}

消费者

消费者A
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class WorkQueueConsumerA {

    /**
     * 消息队列 ← 消费者
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 订阅队列
     * 接收消息
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂,并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        //创建连接 Connection
        Connection connection = factory.newConnection();
        //创建通道 Channel
        Channel channel = connection.createChannel();


        /**
         * prefetchCount 设为 1
         * MQ 发送小于等于 1 的数据给消费者
         * 当消费者消费完这几条数据,就会给 MQ 一个反馈,MQ 再次发送
         */
        channel.basicQos(1);


        /**
         * consumerTag  消费信息标签
         * delivery     回执
         */
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            byte[] body = delivery.getBody();
            System.out.println("消费消息:\t" + new String(body));

            try {
                /**
                 * 睡眠 1 秒,模拟等待
                 */
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 手动回执
             * long deliveryTag
             * boolean multiple
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * String queue                         :   队列名称
         * boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
         * DeliverCallback deliverCallback      :   回调函数
         * CancelCallback cancelCallback        :   消费者取消订阅时的回调函数
         */
        channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {
        });
    }
}
消费者B
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class WorkQueueConsumerB {

    /**
     * 消息队列 ← 消费者
     * 创建连接工厂,并设置参数
     * 创建连接 Connection
     * 创建通道 Channel
     * 订阅队列
     * 接收消息
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂,并设置参数
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        //创建连接 Connection
        Connection connection = factory.newConnection();
        //创建通道 Channel
        Channel channel = connection.createChannel();

        /**
         * prefetchCount 设为 1
         * MQ 发送小于等于 1 的数据给消费者
         * 当消费者消费完这几条数据,就会给 MQ 一个反馈,MQ 再次发送
         */
        channel.basicQos(1);


        /**
         * consumerTag  消费信息标签
         * delivery     回执
         */
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            byte[] body = delivery.getBody();
            System.out.println("消费消息:\t" + new String(body));

            try {
                /**
                 * 睡眠 1 秒,模拟等待
                 */
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 手动回执
             * long deliveryTag
             * boolean multiple
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * String queue                         :   队列名称
         * boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
         * DeliverCallback deliverCallback      :   回调函数
         * CancelCallback cancelCallback        :   消费者取消订阅时的回调函数
         */
        channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {
        });
    }
}
  • 与轮询的区别

    • channel.basicQos(1);
      
    • try {
          /**
          * 睡眠 1 秒,模拟等待
          */
          TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      
    • channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      
    • 自动确认的 autoAck 改为 false
      channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {});   
      

测试

image-20210810232114774

image-20210810232129482image-20210810232142274

SpringBoot整合

小结

2 个消费者监听同一个队列,消息被平均分配到 2 个消费者,提高了处理效率,3个4个消费者效率更高

轮询分发:假设有100条消息,A消费者消费50条,B消费者消费50条,但是 A 机器是8核32G的,B机器是1核1G的,显然 B机器消费慢,A机器一直空闲

公平分发:性能好的机器多消费一点,性能差的少消费一点,负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2111825.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[数据集][目标检测]人脸口罩佩戴目标检测数据集VOC+YOLO格式8068张3类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;8068 标注数量(xml文件个数)&#xff1a;8068 标注数量(txt文件个数)&#xff1a;8068 标注…

Fortran程序辅助构建(Python)

目的 Visual Studio用不明白&#xff0c;于是我找了一个Fortran解释器&#xff08;大概&#xff09;&#xff0c;接着了解到cmd也是可以直接运行Fortran的&#xff0c;于是VScode就又得1分。但是每次构建都得敲命令&#xff0c;后来我就写了一个脚本&#xff0c;专门解决这个痒…

【人工智能/机器学习/机器人】数学基础-学习笔记

函数 奇偶性&#xff1a; 偶函数&#xff1a; f ( − x ) f ( x ) f(-x)f(x) f(−x)f(x)   y轴对称 f ( x ) x 2 f(x)x^2 f(x)x2     f ( − x ) ( − x ) 2 x 2 f ( x ) f(-x)(-x)^2x^2f(x) f(−x)(−x)2x2f(x) 奇函数&#xff1a; f ( − x ) − f ( x ) f(-…

如何制作新生资料收集系统?

新学年伊始&#xff0c;学校需要高效收集学生信息和证件照。易查分提供了一个便捷的解决方案&#xff0c;通过创建一个集成信息和图片的收集系统&#xff0c;可以快速完成这项工作&#xff0c;并将信息导出为PDF&#xff0c;方便打印和存档。 制作步骤如下&#xff1a; 1. 准备…

Android Studio打开Modem模块出现:The project ‘***‘ is not a Gradle-based project

花了挺长时间处理该问题&#xff0c;特记录如下&#xff1a;1.背景&#xff1a; 在Android studio 下导入一个新增的modem模块&#xff0c;如MPSS.DE.3.1.1\modem_proc\AAA, 目的是看代码方便一些&#xff0c;可以自由搜索各种关键字。但导入该项目时出现了如下错误&#xff1a…

C++ 封装 DLL 供 Unity 调用

一&#xff1a;封装DLL 开发工具最好使用 Visual Studio 20XX 来制作&#xff0c;因为VS Code 需要配置很多东西&#xff0c;环境搭建过程比较复杂。 a、我安装的是 Visual Studio 2022&#xff0c;安装的时候&#xff0c;【工作负荷】记得勾选 【使用C的桌面开发】和【使用C的…

dubbo 服务消费原理分析之引用服务配置

文章目录 前言一、服务监听ContextRefreshedEvent1、AbstractApplicationContext.refresh2、AbstractApplicationContext.finishRefresh3、DubboDeployApplicationListener.onApplicationEvent4、DefaultModuleDeployer .referServices5、SimpleReferenceCache.get 二、引用服务…

SRT库介绍

文章目录 简介SRT协议介绍FFmpegSRS推拉流测试SRT库介绍apps示例程序srt-file-transmitsrt-live-transmitsrt-test-multiplexsrt-test-relaysrt-tunnel docs/buildsrtcoreexamples编译 安装错误处理 API说明初始化、回收创建配置套接字连接管理Socket Group属性设置传输数据统计…

CNC数控加工如何开启个性化制造新时代?

在现代制造业中&#xff0c;CNC 数控加工定做正以其独特的特点和显著的优势&#xff0c;成为满足各种复杂、高精度加工需求的首选方式。与时利和一起了解CNC 数控加工定做是如何开启个性化制造新时代! 一、CNC 数控加工定做的特点 1.高精度加工 CNC 数控加工依靠先进的计算机控…

Java并发编程实战 04 | 使用WaitNotify时要注意什么?

在 Java 中&#xff0c;wait()、notify() 和 notifyAll() 方法在多线程编程中主要用于线程间的协作和同步。理解这些方法的使用特点对于编写稳定的多线程程序至关重要。我们将从以下三个问题入手深入探讨它们的使用&#xff1a; 为什么必须在 synchronized 代码块中使用 wait(…

字体反爬(一)

网址 http://xxfb.mwr.cn/sq_djdh.html?v1.0 获取相关数据 解决 F12 先找接口吧&#xff0c; 搜索一下表格的数据 直接从表格中复制 复制过来乱码&#xff0c;基本锁定有字体反爬处理 先点进去看看 {"addvnm": "#GkcERlldm4_1725629424756otltag㯼㢴#Fon…

Linux 技巧汇编

10个重要的Linux ps命令实战 显示所有当前进程 根据用户过滤进程 通过cpu和内存使用来过滤进程 通过进程名和PID过滤 根据线程来过滤进程 树形显示进程 显示安全信息 格式化输出root用户&#xff08;真实的或有效的UID&#xff09;创建的进程 使用PS实时监控进程状态 …

泛型列表相关知识

集合 C#中集合是指在system.Collection下的类型&#xff0c;他们大多数是通过实现此命名空间下的接口来实现的。 C#集合是来维护一组对象的数据结构&#xff0c;与数组不同&#xff0c;集合包含更多的功能。如&#xff1a;自动添加元素到指定位置&#xff0c;排序等。 泛型集…

企业级WEB应用服务器---TOMACT

一、WEB技术介绍 1.1 Http和B/S结构 操作系统一般都有子进程系统&#xff0c;使用多进程就可以充分利用硬件资源&#xff0c;提高效率。在前面的学习中我们了解到进程中可以有多个线程&#xff0c;且每一个线程都可以被CPU调度执行&#xff0c;这样就可以让程序并行执行。一台…

深入浅出孪生神经网络,高效训练模型

大家好&#xff0c;在深度学习领域&#xff0c;神经网络几乎能处理各种任务&#xff0c;但通常需要依赖于海量数据来达到最佳效果。然而&#xff0c;对于像面部识别和签名验证这类任务&#xff0c;我们不可能总是有大量的数据可用。由此产生了一种新型的神经网络架构&#xff0…

【自考zt】【数据结构】【22.04】

一、单选 二、填空 三、解答 四、算法阅读 五、算法设计

【Flutter】解决第一次运行项目很慢(gradle需要下载依赖)

配置gradle默认下载路径 默认下C盘谁顶得住 配置环境变量 名称: GRADLE_USER_HOME 值: D:\Develop\gradle 自己创建一个 下边是重点 配置gradle远端下载地址 后边版本号自己换 https://mirrors.cloud.tencent.com/gradle/ https://mirrors.cloud.tencent.com/gradle/gradl…

Matlab 一维层状声子晶体振动传输特性

一维声子晶体的传递矩阵法是一种用于研究声波在一维周期性结构中传播的方法。这种方法基于‌波动方程和周期性边界条件&#xff0c;通过计算声波在不同介质中的传播特性&#xff0c;进而分析声子晶体的带隙结构。传递矩阵法可以有效地预测声波在一维声子晶体中的传播行为&#…

利用AI大语言模型和Langchain开发智能车算法训练知识库(上篇)

今天小李哥将介绍亚马逊云科技的Jupyter Notebook机器学习托管服务Amazon SageMaker上&#xff0c;通过AI大语言模型、向量知识库和LangChain Agent&#xff0c;创建用于AI 智能车模型训练的RAG问答知识库。整个项目的架构图如下&#xff1a; 本系列共分为上下两篇。在上篇内容…

Java中的强引用、软引用、弱引用和虚引用于JVM的垃圾回收机制

参考资料 https://juejin.cn/post/7123853933801373733 在 Java 中&#xff0c;引用类型分为四种&#xff1a;强引用&#xff08;Strong Reference&#xff09;、软引用&#xff08;Soft Reference&#xff09;、弱引用&#xff08;Weak Reference&#xff09;和虚引用&#xf…