【RabbitMQ三】——RabbitMQ工作队列模式(Work Queues)

news2024/9/24 1:17:08

RabbitMQ工作队列模式

  • 为什么要有工作队列模式
  • 如何使用工作队列模式
  • 轮询
  • 消息确认
    • 验证消息确认
  • 消息持久化
  • 公平调度
    • 验证公平调度
      • **现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。**
      • 现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);添加公平调度相关代码进行测试。**
  • 总结
  • 如果博主的文章对您有所帮助,可以评论、点赞、收藏,支持一下博主!!!

为什么要有工作队列模式

在这里插入图片描述
1.rabbitmq所有模式都具备的优点:避免立即执行资源密集型型任务,并且不得不等待它完成。我们可以将这个资源密集型任务安排再以后完成。将任务封装为消息并将其发送到队列中,后台运行工作进行将从队列中获取任务,并执行任务。
2.在工作队列模式中可以有多个worker时,任务将在他们之间共享,可以并行处理任务。例如队列中一共有10个任务,有两个worker,他们将共同完成10个任务。可以是第一个worker完成4个任务,第二个worker完成6个任务。又或者是第一个worker完成5个任务,第二个worker完成5个任务。并行处理任务,大大缩短了时间,提高了执行效率。

如何使用工作队列模式

示例业务:一个生产者,生产10条消息(任务)并放入工作队列中。两个消费者(也可以理解为工人)共同执行消息(任务)

RabbitMQUtils工具类,用于创建连接和信道

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */
public class RabbitMQUtils {

    /*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //RabbitMQ服务器ip地址
        factory.setHost("ip");
        //端口号  默认值为5672
        factory.setPort(5672);
        //为用户分配的虚拟主机 默认值为/
        factory.setVirtualHost("/");
        //用户名  默认为guest
        factory.setUsername("用户名");
        //密码   默认为guest
        factory.setPassword("密码");
        //创建连接
        Connection connection=factory.newConnection();
        return  connection;
    }

    /*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection=getConnection();
        Channel channel=connection.createChannel();
        return channel;
    }
}

Producer生产者:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者1]
 * @createTime : [2023/1/17 8:48]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:48]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //连接RabbitMQ
        RabbitMQUtils.getConnection();
        //获取信道
        Channel channel = RabbitMQUtils.getChannel();

        /*
        * TASK_QUEUE_NAME 队列名称
        * durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息
        * exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除,
            (这里需要注意三点:1.排他队列是基于连接Connection可见的,
            同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;
            "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,
            这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,
            该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)

        *autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,
            之后所有与这个队列连接的消费者都断开时,才会自动删除。
            不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",
            因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
        *arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
        *
        * */

        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //发送10条消息
        for (int i = 0; i <10 ; i++) {
            String message="廊坊师范..."+i;
            /*
            *交换机命名,不填写使用默认的交换机
            * routingKey -路由键道具-消息的其他属性-路由头等正文
            * 消息正文
            * **/
            //推送消息
            channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }


    }

}

ConsumerOne消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }

    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

ConsumerTwo消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

运行程序
先启动两个消费者,再启动生产者
可以看到生产者发送了10条消息
消费者1和消费者2分别消费了5条消息
在这里插入图片描述在这里插入图片描述
在这里插入图片描述

轮询

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。
从上面我们的示例中就可以看出,两个消费者分别消费了5条消息,并且都是消息的消费都是按照顺序进行消费。

消息确认

执行一个任务可能需要几秒钟的时间,您可能想知道如果使用者启动了一个较长的任务,并且在完成之前终止了该任务会发生什么。在我们当前的代码中,一旦RabbitMQ将消息传递给用户,它就会立即将其标记为删除。在这种情况下,如果终止一个消费者,它刚刚处理的消息就会丢失。发送给这个特定消费者但尚未处理的消息也会丢失。

但是我们不想失去任何消息。如果一个消费者死了,我们希望将任务传递给另一个消费者。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。一个确认信息被消费者发送回来,告诉RabbitMQ已经接收、处理了一个特定的消息,并且RabbitMQ可以自由删除它。

如果一个消费者死了(它的通道关闭了,连接关闭了,或者TCP连接丢失了)而没有发送ack, RabbitMQ会理解消息没有被完全处理,并将其重新排队。如果同时有其他消费者在线,它将迅速将其重新发送给另一个消费者。这样你就可以确保没有信息丢失,即使消费者偶尔死亡。

对消费者交付确认强制执行时间(默认为30分钟)。这有助于检测从不确认交付的错误(卡住)消费者。您可以根据交付确认超时增加此超时时间。

默认情况下,手动消息确认是打开的。一旦我们完成了一个任务,是时候将这个标志设置为false并从worker发送一个适当的确认。
示例代码:

boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

消费者完整代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }

    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

使用这段代码,您可以确保即使使用CTRL+C终止一个正在处理消息的消费者,也不会丢失任何东西。在worker终止后不久,所有未确认的消息都将被重新传递。

确认必须在接收传递的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。

验证消息确认

验证步骤:启动加入消息确认的两个消费者,并启动生产者。再中断消费者1进行观察控制台输出结果。
消费者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }

    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

生产者:
在这里插入图片描述
消费者1
可以看到消费者1在收到第六条消息之后再处理过程中,消费者1被中断了。第六条消息并没有执行完。
在这里插入图片描述
消费者2
可以看到第六条消息被重新传递给了消费者2,第六条消息并没有丢失。
在这里插入图片描述
再次验证:将两个消费者的消息确认代码除去再进行验证。启动两个消费者,并启动生产者。再中断消费者1进行观察控制台输出结果。

消费者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

生产者:在这里插入图片描述
消费者1:
消费者1接收到第七条消息时消费者1被中断。
在这里插入图片描述
消费者2:
第七条消息并没有被重新发送给消费者2,第七条消息丢失了。
在这里插入图片描述

消息持久化

我们已经学会了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,我们的消息仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。

首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要声明它是持久的

boolean durable = true;
channel.queueDeclare("task_queue2", durable, false, false, null);

注意:

RabbitMQ不允许你用不同的参数重新定义一个现有的队列,并且会向任何试图这样做的程序返回一个错误。
这个queueDeclare更改需要同时应用于生产者和使用者代码。

此时我们可以确定task_queue队列不会丢失,即使RabbitMQ重新启动。现在我们需要将消息标记为持久消息——通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue2",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

公平调度

您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两个工作人员的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。RabbitMQ对此一无所知,它仍然会均匀地分发消息。

这是因为RabbitMQ只是在消息进入队列时分派消息。它不会查看消费者的未确认消息的数量。它只是盲目地将每n个消息发送给第n个消费者。

为了克服这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工作人员。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

消费者完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        int prefetchCount = 1;
		channel.basicQos(prefetchCount);

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }

    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

验证公平调度

现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。

生产者完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者1]
 * @createTime : [2023/1/17 8:48]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:48]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        //连接RabbitMQ
        RabbitMQUtils.getConnection();
        //获取信道
        Channel channel = RabbitMQUtils.getChannel();

        /*
        * TASK_QUEUE_NAME 队列名称
        * durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息
        * exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除,
            (这里需要注意三点:1.排他队列是基于连接Connection可见的,
            同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;
            "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,
            这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,
            该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)

        *autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,
            之后所有与这个队列连接的消费者都断开时,才会自动删除。
            不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",
            因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
        *arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
        *
        * */

        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //发送10条消息
        for (int i = 0; i <10 ; i++) {
            String message="廊坊师范..."+i;
            /*
            *交换机命名,不填写使用默认的交换机
            * routingKey -路由键道具-消息的其他属性-路由头等正文
            * 消息正文
            * **/
            //推送消息
            channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }


    }

}

消费者1完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/1/17 9:07]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:07]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String TASK_QUEUE_NAME = "task_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
     RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //channel.basicQos(1);

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }

    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

消费者2完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //channel.basicQos(1);

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

执行结果:
消费者1:
在这里插入图片描述
消费者2:
在这里插入图片描述
结论:
有两个消费者的情况下,一个消费者1一直很忙(完成一条消息需要3秒),消费者2一致很轻松(完成一条消息需要1秒)。
RabbitMQ对此一无所知,它仍然会均匀地分发消息。导致消费者1和消费者2执行了同样的消息数量。

现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);添加公平调度相关代码进行测试。**

消费者1完整代码

        import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
     * @author : [WangWei]
     * @version : [v1.0]
     * @className : ConsumerOne
     * @description : [消费者1]
     * @createTime : [2023/1/17 9:07]
     * @updateUser : [WangWei]
     * @updateTime : [2023/1/17 9:07]
     * @updateRemark : [描述说明本次修改内容]
     */
    public class ConsumerOne {
        private static final String TASK_QUEUE_NAME = "task_queue1";

        public static void main(String[] args) throws IOException, TimeoutException {
            RabbitMQUtils.getConnection();
            Channel channel = RabbitMQUtils.getChannel();

            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            channel.basicQos(1);

            //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
        }

        /*
         * @version V1.0
         * Title: doWork
         * @author Wangwei
         * @description 模拟任务的执行时间
         * @createTime  2023/1/18 9:21
         * @param [task]
         * @return void
         */
        private static void doWork(String task) {
            for (char ch : task.toCharArray()) {
                //如果消息中存在.则1秒之后继续,有几个.停止几秒
                if (ch == '.') {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }


消费者2完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/1/17 9:10]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 9:10]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String TASK_QUEUE_NAME = "task_queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        channel.basicQos(1);

        //使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }
    /*
     * @version V1.0
     * Title: doWork
     * @author Wangwei
     * @description 模拟任务的执行时间
     * @createTime  2023/1/18 9:21
     * @param [task]
     * @return void
     */
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            //如果消息中存在.则1秒之后继续,有几个.停止几秒
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}


执行结果:
消费者1:
在这里插入图片描述

消费者2:
在这里插入图片描述
结论:
置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的消费者。

总结

学习一门知识需要亲自动手去验证去证明这种方式是可行了,这样对于这个知识点才算是理解的更深。按照这样做一定行,而不是应该可以,大概可以吧。

如果博主的文章对您有所帮助,可以评论、点赞、收藏,支持一下博主!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/171266.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BC即将登录Coinbase Institutional,2023年以全新姿态出发

以支付为最初定位的加密资产&#xff0c;在支付领域的发展始终停滞不前&#xff0c;尤其是在2022年&#xff0c;加密行业经历了几次“至暗时刻”&#xff0c;导致加密市场资金不断出逃市场全面转熊&#xff0c;越来越多的人对加密资产市场的发展前景失去信心。 而在2021年年底开…

【GD32F427开发板试用】移植CoreMark验证0等待区Flash大小

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;Doravmon 引言 非常荣幸能够参与到此次GD32F427开发板试用的活动中来&#xff0c;在拿到开发板之前就翻了翻手册&#xff0c;一直有个疑问困惑…

APM/STM32F072RB基于HAL库配置USB CDC虚拟串口功能

APM/STM32F072RB基于HAL库配置USB CDC虚拟串口功能&#x1f4e2;采用的自制开发板&#xff0c;开源PCB工程详情放在《极海APM32F072RB开发环境测试》✨本案例基于STM32CubeMX工具配置。&#x1f4fa;使用STM32CubeMX工具配置工程改为APMF072RB型号过程如下&#xff1a; ⛳注意…

性能测试实战 | 电商业务的性能测试(一): 必备基础知识

本文为霍格沃兹测试学院优秀学员课程学习系列笔记&#xff0c;想一起系统进阶的同学文末加群交流。 1.1 测试步骤总览 需求分析与测试设计&#xff08;性能需求目标业务模型拆解&#xff09; 测试数据准备和构造(基于模型的数据准备) 性能指标预期(性能需求目标) 发压工具配…

vue2 使用@vue/composition-api依赖包 编译、打包各种报错

vue2 使用vue/composition-api依赖包 编译、打包各种报错问题来源解决办法最近在维护以前&#xff08;大概一年前&#xff09;的项目时&#xff0c;遇到个这种问题&#xff1a; 项目本身是用 vue-cli 创建的 vue 2.x.xx 版本的项目&#xff0c;然后引入 vue/composition-api 依…

MIT6.830-2022-lab5实验思路详细讲解

文章目录前言一、实验背景二、实验正文Exercise 1 &#xff1a;SearchExercise 2 &#xff1a;Insert - Splitting PagesExercise 3 &#xff1a;Delete - Redistributing pagesExercise 4&#xff1a;Delete - Redistributing pages总结前言 Datebase中很重要的一部分就是ind…

【Java面试】SpringBoot篇

注&#xff1a;本文通篇将SpringBoot以sb代替。 文章目录Spring和SpringBoot的关系和区别&#xff1f;谈谈你对SpringBoot的理解&#xff0c;它有哪些特性&#xff1f;SpringBoot的核心注解说说你对SpringBoot自动配置的理解为什么SpringBoot的jar包可以直接运行&#xff1f;Sp…

uboot启动流程分析(基于i.m6ull)

一、uboot的makefile 1.1 makefile整体解析过程 为了生成u-boot.bin这个文件&#xff0c;首先要生成构成u-boot.bin的各个库文件、目标文件。为了各个库文件、目标文件就必须进入各个子目录执行其中的Makefile。由此&#xff0c;确定了整个编译的命令和顺序。 1.2 makefile整…

10.2 初始泛型算法

文章目录只读算法find()count()accumulate()equal()写容器元素算法fill()fill_n()back_inserter()copy()copy_backward()replace()replace_copy()next_permutation()prev_permutation()重排容器元素算法sort()unique()stable_sort()除了少数例外&#xff0c;标准库算法都对一个…

pandas数据聚合与分组运算

文章目录数据聚合与分组运算分组与聚合的原理通过groupby()方法将数据拆分成组按列名进行分组按Series对象进行分组按字典进行分组按函数进行分组数据聚合与分组运算 对数据集进行分组并对各组应用一个函数&#xff08;无论是聚合还是转换&#xff09;&#xff0c;通常是数据分…

哈佛结构和冯诺依曼结构?STM32属于哈佛结构还是冯诺依曼结构?

现代的CPU基本上归为冯诺伊曼结构&#xff08;也成普林斯顿结构&#xff09;和哈佛结构。 冯诺依曼体系 冯诺依曼体系结构图如下 冯诺依曼结构也称普林斯顿结构&#xff0c;是一种将程序指令存储器和数据存储器合并在一起的存储器结构。数据与指令都存储在同一存储区中&…

大数据技术架构(组件)5——Hive:流程剖析2

1.1.2、Stage division&#xff08;不够细致&#xff0c;需要例子&#xff09;Stage理解&#xff1a;结合对前面讲到的Hive对查询的一系列执行流程的理解&#xff0c;那么在一个查询任务中会有一个或者多个Stage.每个Stage之间可能存在依赖关系。没有依赖关系的Stage可以并行执…

IIS部署应用程序连接 LocalDB 数据库

使用.net core框架创建ASP.NET Core API应用程序&#xff0c;利用Entity Framework core实体进行MS LocalDB数据库进行连接操作&#xff08;增/删/改/查运行&#xff09;。 问题&#xff1a; 在Visual Studio 2022 开发工具可以正常运行 Web API 应用程序连接 LocalDB 数据库…

R语言基于poLCA包进行潜类别分析

潜在类别分析是一种分析多元分类数据的统计技术。当观测数据以一系列分类响应的形式出现时- -例如&#xff0c;在民意调查、个人层面的投票数据、人与人之间可靠性的研究或消费者行为和决策中- -通常感兴趣的是调查观测变量之间的混淆来源&#xff0c;识别和表征相似案例的集群…

初步了解高性能队列——Disruptor(Java)

高性能队列——Disruptor ① 概述 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列&#xff0c;研发的初衷是解决内部的内存队列的延迟问题&#xff0c;而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单&#xff0c;2010年在QCon演讲后&#xff0c…

DevOps利器之二(Git,Gitlab)

一、背景Git&#xff0c;Gitlab在DevOps中主要解决持续集成源码管控部分&#xff0c;本文主要从基本概念&#xff0c;实施部署两部分介绍。二、git概述https://git-scm.com/book/zh/v2 --推荐官方电子书 Git - 它是一个源代码分布式版本控制系统&#xff0c;可让开发人员在本地…

行业分享:锂电池4大生产难题,视觉检测即可有效解决

导语&#xff1a;机器视觉检测已在锂电池生产的各个环节中&#xff0c;为产品产量与质量提供可靠保障。维视智造作为锂电池视觉检测系统提供商&#xff0c;为企业提供专业、系统、稳定的锂电行业解决方案&#xff0c;可保证0漏检&#xff0c;确保安全生产&#xff0c;全面提升生…

Java总结(运算符)

1.算数运算符short s12;s1s12; &#xff08;编译不能运行)short s12;s1 2 ; (编译能运行&#xff0c;不改变变量本身的数据类型)2.逻辑运算符区分&和&&相同点&#xff1a;运算结果相同&#xff1b;当符号左边是true时&#xff0c;两者都会执行符号右边的运算不同点…

医疗数据安全实力派 | 美创科技品牌案例入选《2022年医疗行业网络安全报告》

近日&#xff0c;网络安全产业机构“数说安全”正式发布《2022年医疗行业网络安全报告》&#xff08;以下简称“报告”&#xff09;。报告对我国医疗行业信息化现状和政策、医疗行业市场发展、医疗行业需求侧及供给侧进行深度剖析。美创科技作为专业数据安全代表厂商入选医疗网…

你应该知道的机器学习模型部署细节和实施步骤

机器学习操作 (MLOps&#xff0c;Machine Learning Operations ) 是“机器学习”和“工程”的组合&#xff0c;涵盖了与生产 ML 生命周期管理有关的所有内容。 ML模型生命周期可大致分为三个阶段 文章目录技术交流设计模型开发操作步骤1&#xff1a;确定部署环境命令行终端Cond…