RabbitMQ消息可靠性保证机制4--消费端限流

news2025/1/14 1:08:52

7.7 消费端限流

在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?

当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了

7.7.1 资源限制限流

在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。

/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000

# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB

# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0

# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4

# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824

# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiB



k, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)

KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)

可以通过两种来设置生效

  1. 临时生效

    此配制仅当前生效在重启后将失效。

# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4

样例:

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
  1. 长期生效

在rabbitmq.conf的配制文件中加入

# 硬盘限制 
disk_free_limit.absolute=68455178240

# 内存限制
vm_memory_high_watermark.relative = 0.4

样例:

[root@nullnull-os rabbitmq]# vi  /etc/rabbitmq/rabbitmq.conf 
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240

[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf 
disk_free_limit.absolute=68455178240

[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]# 

注意,此需要重启rabbitMQ才能生效。

磁盘限制配制参考

Configuring Disk Free Space Limit

The disk free space limit is configured with the disk_free_limit setting. By default 50MB is required to be free on the database partition (see the description of file locations for the default database location). This configuration file sets the disk free space limit to 1GB:

disk_free_limit.absolute = 1000000000

Or you can use memory units (KB, MB GB etc.) like this:

disk_free_limit.absolute = 1GB

It is also possible to set a free space limit relative to the RAM in the machine. This configuration file sets the disk free space limit to the same as the amount of RAM on the machine:

disk_free_limit.relative = 1.0

The limit can be changed while the broker is running using the rabbitmqctl set_disk_free_limit command or rabbitmqctl set_disk_free_limit mem_relative command. This command will take effect until next node restart.

The corresponding configuration setting should also be changed when the effects should survive a node restart.

来自:https://www.rabbitmq.com/disk-alarms.html

内存配制限制参考

https://www.rabbitmq.com/memory.html

Configuring the Memory Threshold

The memory threshold at which the flow control is triggered can be adjusted by editing the configuration file.

The example below sets the threshold to the default value of 0.4:

\# new style config format, recommended
vm_memory_high_watermark.relative = 0.4

The default value of 0.4 stands for 40% of available (detected) RAM or 40% of available virtual address space, whichever is smaller. E.g. on a 32-bit platform with 4 GiB of RAM installed, 40% of 4 GiB is 1.6 GiB, but 32-bit Windows normally limits processes to 2 GiB, so the threshold is actually to 40% of 2 GiB (which is 820 MiB).

Alternatively, the memory threshold can be adjusted by setting an absolute limit of RAM used by the node. The example below sets the threshold to 1073741824 bytes (1024 MiB):

vm_memory_high_watermark.absolute = 1073741824

Same example, but using memory units:

vm_memory_high_watermark.absolute = 1024MiB

If the absolute limit is larger than the installed RAM or available virtual address space, the threshold is set to whichever limit is smaller.

The memory limit is appended to the log file when the RabbitMQ node starts:

2019-06-10 23:17:05.976 [info] <0.308.0> Memory high watermark set to 1024 MiB (1073741824 bytes) of 8192 MiB (8589934592 bytes) total

The memory limit may also be queried using the rabbitmq-diagnostics memory_breakdown and rabbitmq-diagnostics status commands.

The threshold can be changed while the broker is running using the

rabbitmqctl set_vm_memory_high_watermark <fraction>

command or

rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>

For example:

rabbitmqctl set_vm_memory_high_watermark 0.6

and

rabbitmqctl set_vm_memory_high_watermark absolute "4G"

When using the absolute mode, it is possible to use one of the following memory units:

  • M, MiB for mebibytes (2^20 bytes)
  • MB for megabytes (10^6 bytes)
  • G, GiB for gibibytes (2^30 bytes)
  • GB for gigabytes (10^9 bytes)

中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html

更多配制可参见:https://www.rabbitmq.com/configure.html#config-file

样例程序:

maven导入

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>

生产程序:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;

public class ResourceLimitProduct {
  public static void main(String[] args) throws Exception {
    // 资源限制
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {

      // 定义交换器、队列和绑定
      channel.exchangeDeclare("res.limit.ex", BuiltinExchangeType.DIRECT, true, false, null);
      channel.queueDeclare("res.limit.qu", true, false, false, null);
      channel.queueBind("res.limit.qu", "res.limit.ex", "res.limit.rk");

      // 开启发送方确认机制
      AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();

      ConfirmCallback confirm =
          new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
              if (multiple) {
                System.out.println("【批量确认】:小于" + deliveryTag + "已经确认");
              } else {
                System.out.println("【单条确认】:等于" + deliveryTag + "已经确认");
              }
            }
          };

      ConfirmCallback nackConfirm =
          new ConfirmCallback() {
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
              if (multiple) {
                System.out.println("【批量不确认】:小于" + deliveryTag + "已经确认");
              } else {
                System.out.println("【单条不确认】:等于" + deliveryTag + "已经确认");
              }
            }
          };

      channel.addConfirmListener(confirm, nackConfirm);

      for (int i = 0; i < 100000000; i++) {
        String msg = getKbMessage(i);
        long sequence = channel.getNextPublishSeqNo();
        System.out.println("【发送】成功了序列消息:" + sequence);

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.contentType("text/plain");
        // 发送的消息持久化
        builder.deliveryMode(2);
        AMQP.BasicProperties properties = builder.build();

        channel.basicPublish(
            "res.limit.ex", "res.limit.rk", properties, msg.getBytes(StandardCharsets.UTF_8));

        Thread.sleep(ThreadLocalRandom.current().nextInt(5, 100));
      }
    } catch (IOException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private static String getKbMessage(int i) {
    StringBuilder msg = new StringBuilder("发送确认消息:" + i + "--");
    for (int j = 0; j < 102400; j++) {
      msg.append(j);
    }
    return msg.toString();
  }
}

设置硬盘资源限制

[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...

运行生产者的应用程序,查看控制台的输出

【发送】成功了序列消息:1
【单条确认】:等于1已经确认
【发送】成功了序列消息:2
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【发送】成功了序列消息:4
【单条确认】:等于3已经确认
【发送】成功了序列消息:5
......
【单条确认】:等于702已经确认
【单条确认】:等于703已经确认
【发送】成功了序列消息:704
【发送】成功了序列消息:705
【发送】成功了序列消息:706
【发送】成功了序列消息:707
【发送】成功了序列消息:708
【发送】成功了序列消息:709
【发送】成功了序列消息:710
【发送】成功了序列消息:711

到此使用硬盘空间限制的测试完成。

内存资源限制

编辑配制文件rabbitmq.conf

vi /etc/rabbitmqrabbitmq.conf 

# 添加配制
vm_memory_high_watermark.absolute=120M

重启让其生效

systemctl restart rabbitmq-server

检查配制生效情况

[root@nullnull-os rabbitmq]# rabbitmqctl environment

......
      {trace_vhosts,[]},
      {vhost_restart_strategy,continue},
      {vm_memory_calculation_strategy,rss},
      {vm_memory_high_watermark,{absolute,"120MB"}},
      {vm_memory_high_watermark_paging_ratio,0.5},
      {writer_gc_threshold,1000000000}]},
 {rabbit_common,[]},
......

查看到如下配制说明生效。

运行生产者

观察客户端输出

【发送】成功了序列消息:1
【发送】成功了序列消息:2
【单条确认】:等于1已经确认
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【单条确认】:等于3已经确认
【发送】成功了序列消息:4
【发送】成功了序列消息:5
【发送】成功了序列消息:6
【单条确认】:等于4已经确认
【单条确认】:等于5已经确认
【单条确认】:等于6已经确认
【发送】成功了序列消息:7
【单条确认】:等于7已经确认
......
【发送】成功了序列消息:174
【单条确认】:等于174已经确认
【发送】成功了序列消息:175
【单条确认】:等于175已经确认
【发送】成功了序列消息:176
【单条确认】:等于176已经确认
【发送】成功了序列消息:177
【发送】成功了序列消息:178
【发送】成功了序列消息:179
【发送】成功了序列消息:180
【发送】成功了序列消息:181
【发送】成功了序列消息:182
【发送】成功了序列消息:183
【发送】成功了序列消息:184
【发送】成功了序列消息:185
【发送】成功了序列消息:186
【发送】成功了序列消息:187

观察网页端的情况

在这里插入图片描述

到此内存资源限制而导致的限流测试完成。

7.7.2 默认的credit flow流控

RabbitMQ Credit Flow Mechanism (信用流控制机制) 是 RabbitMQ 使用的一种流量控制机制,旨在确保生产者(publishers)不会发送太多的消息给消费者(consumers),从而导致系统超载或资源耗尽。这个机制主要是为了保护消费者免受生产者发送太多消息的影响。

以下是 RabbitMQ Credit Flow 机制的基本工作原理:

  1. 信用计数器(Credit Counter):对于每个消费者,RabbitMQ 维护一个称为信用计数器的值。这个计数器表示消费者当前可以接收多少条消息。
  2. 初始信用额度(Initial Credit):当一个消费者连接到队列并开始消费消息时,RabbitMQ 为该消费者分配一个初始信用额度。这个额度通常与队列中的未确认消息数量有关。
  3. 消费者确认(Consumer Acknowledgments):当消费者成功处理一条消息并确认它时,它将会恢复一定数量的信用,这允许 RabbitMQ 将更多的消息发送给消费者。
  4. 信用降低(Decreasing Credit):当消费者未确认消息超出其信用额度时,其信用额度将降低。这会导致生产者无法继续发送消息给该消费者,直到其信用额度恢复。
  5. 自动降低的消费者(Auto-decrease Consumers):RabbitMQ 还可以配置为自动降低某些消费者的信用,以避免某个消费者占用太多资源。这通常用于处理慢速或长时间处理的消费者。

这个机制有助于平衡生产者和消费者之间的消息流量,防止生产者发送大量消息导致队列爆满,从而提高系统的稳定性和可靠性。

要注意的是,RabbitMQ 的信用流控制机制是可配置的,您可以根据您的需求来调整信用额度和其他参数,以满足特定的应用场景。此外,RabbitMQ 还提供了一些工具和插件,用于监控和管理流量控制,以确保系统的正常运行。

可以通过查看队列的状态信息来了解 Credit Flow 机制的当前状态。以下是一些常见的方式来查看 Credit Flow 状态:

  1. RabbitMQ Management UI:RabbitMQ 提供了一个基于 Web 的管理界面,您可以通过该界面查看队列的状态和统计信息,包括队列的消息数量、未确认消息数量以及消费者的状态。要访问管理界面,请确保已启用 RabbitMQ Management 插件。默认情况下,它通常在 http://localhost:15672/ 上运行。

    在管理界面中,您可以选择特定的队列,然后查看其状态和相关的统计信息,包括未确认消息数量。这可以帮助您了解 Credit Flow 是否生效,是否有消费者的信用已用尽。

  2. 命令行工具:您还可以使用 RabbitMQ 的命令行工具来查看队列的状态。以下是一个示例命令,用于查看队列的状态:

    rabbitmqctl list_queues name messages consumers messages_unacknowledged
    

    这将显示队列的名称、消息数量、消费者数量以及未确认消息数量。未确认消息数量表示消费者尚未确认的消息数量,这可以用于判断 Credit Flow 是否生效。

  3. 监控工具:您可以使用监控工具(如Prometheus和Grafana)来设置自定义监控和警报,以便实时跟踪队列的状态和信用流控制情况。通过这些工具,您可以创建仪表板来显示队列的各种指标,包括未确认消息数量和消费者的信用。

通过以上方法,您可以监视 RabbitMQ 中队列的状态和 Credit Flow 机制的工作情况,以确保系统的稳定性和可靠性。

在这里插入图片描述

7.7.3 Qos机制

RabbitMQ中有一种Qos保证机制,可以限制channel上接收到的未被Ack的消息数量,如果过这个数量限制RabbitMQ将不会再往消费端推送消息。是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)需要注意的是Qos机制仅对消费端推模式有效,对拉模式无效。而且不支持NONE-ACK模式。

执行channel.basicConsume方法之前通过channel.basicQos方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费慢的时候可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个.broker就发送一个,确认两个就发送两个,换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。

生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快县城超过了下游的消费速度时就容易出现消息积压、堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等手段,避免超过broker端的极限承载能力或者压垮下游消费者。

再讲消费者,我们期望消费者能够尽快的消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端能够处理速度是最快、最稳定而且还相对均匀(比较理想化)

提供应用吞吐量和缩短消费过程的耗时,主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间
  2. 增加消费节点实例。
  3. 调整并发消费的线程数。

测试

maven导入:

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;

public class QosProduct {
  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换机
    channel.exchangeDeclare(
        "qos.ex",
        BuiltinExchangeType.DIRECT,
        // 持久化标识
        false,
        // 是否自动删除
        false,
        // 属性信息
        null);

    for (int i = 0; i < 100; i++) {
      String msg = "这是发送的消息:" + i;
      channel.basicPublish("qos.ex", "qos.rk", null, msg.getBytes(StandardCharsets.UTF_8));
    }
  }
}

消费者 :

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;


public class QosConsumer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");
      
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换器、队列和绑定
    channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);
    channel.queueDeclare("qos.qu", false, false, false, null);
    channel.queueBind("qos.qu", "qos.ex", "qos.rk");

    // 设置Qos为5,未被确认ACK的为5,还有一个参数,即是否为全局,true为全局
    channel.basicQos(5);

    channel.basicConsume(
        "qos.qu",
        false,
        new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(
              String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
              throws IOException {

            LocalDateTime time = LocalDateTime.now();
            System.out.println(
                "[消费]" + time + "+收到的消息:" + new String(body, StandardCharsets.UTF_8));

            int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);
            try {
              Thread.sleep(randomSleep);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }

            if (envelope.getDeliveryTag() % 3 == 0) {
              // 进行消息确认
              channel.basicAck(envelope.getDeliveryTag(), true);
            }
          }
        });
  }
}

测试

先启动消费都,再启动生产者,查看控制台输出

[消费]2023-08-25T12:08:13.143+收到的消息:这是发送的消息:0
[消费]2023-08-25T12:08:13.765+收到的消息:这是发送的消息:1
[消费]2023-08-25T12:08:14.127+收到的消息:这是发送的消息:2
[消费]2023-08-25T12:08:14.892+收到的消息:这是发送的消息:3
......
[消费]2023-08-25T12:08:57.437+收到的消息:这是发送的消息:96
[消费]2023-08-25T12:08:57.530+收到的消息:这是发送的消息:97
[消费]2023-08-25T12:08:57.566+收到的消息:这是发送的消息:98
[消费]2023-08-25T12:08:57.649+收到的消息:这是发送的消息:99

查看队列的情况:

[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name                                      │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:59116 -> 10.0.4.16:5672 (1) │ 5              │ 0                     │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]# 

网页端查看

在这里插入图片描述

并行消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;

public class QosThreadConsumer {
  public static void main(String[] args) throws Exception {

    // 资源限制
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    // 设置channel并发请求最大数
    factory.setRequestedChannelMax(5);

    // 自定义线程池工厂
    ThreadFactory thsFactory = Executors.privilegedThreadFactory();
    factory.setThreadFactory(thsFactory);

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换器、队列和绑定
    channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);
    channel.queueDeclare("qos.qu", false, false, false, null);
    channel.queueBind("qos.qu", "qos.ex", "qos.rk");

    // 设置每秒处理2个
    channel.basicQos(5, true);

    channel.basicConsume(
        "qos.qu",
        false,
        new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(
              String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
              throws IOException {

            LocalDateTime time = LocalDateTime.now();
            long threadId = Thread.currentThread().getId();
            System.out.println(
                "[消费]"
                    + time
                    + ",线程:"
                    + threadId
                    + ",收到的消息:"
                    + new String(body, StandardCharsets.UTF_8));

            int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);
            try {
              Thread.sleep(randomSleep);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }

            if (envelope.getDeliveryTag() % 3 == 0) {
              // 进行消息确认
              channel.basicAck(envelope.getDeliveryTag(), true);
            }
          }
        });
  }
}

控制台输出:

[消费]2023-08-26T09:37:21.430,线程:24,收到的消息:这是发送的消息:0
[消费]2023-08-26T09:37:21.866,线程:25,收到的消息:这是发送的消息:1
[消费]2023-08-26T09:37:22.434,线程:25,收到的消息:这是发送的消息:2
[消费]2023-08-26T09:37:22.847,线程:25,收到的消息:这是发送的消息:3
[消费]2023-08-26T09:37:23.685,线程:25,收到的消息:这是发送的消息:4
[消费]2023-08-26T09:37:23.847,线程:26,收到的消息:这是发送的消息:5
......
[消费]2023-08-26T09:39:10.684,线程:28,收到的消息:这是发送的消息:526
[消费]2023-08-26T09:39:10.695,线程:32,收到的消息:这是发送的消息:527
[消费]2023-08-26T09:39:10.767,线程:32,收到的消息:这是发送的消息:528
......
[消费]2023-08-26T09:39:58.270,线程:27,收到的消息:这是发送的消息:996
[消费]2023-08-26T09:39:58.405,线程:27,收到的消息:这是发送的消息:997
[消费]2023-08-26T09:39:58.575,线程:27,收到的消息:这是发送的消息:998
[消费]2023-08-26T09:39:58.671,线程:27,收到的消息:这是发送的消息:999

如果Qos设置为全局,则可以看到到

[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name                                      │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60591 -> 10.0.4.16:5672 (1) │ 0              │ 5                     │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60610 -> 10.0.4.16:5672 (1) │ 0              │ 0                     │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]# 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

抽象java入门1.5.3.2——类的进阶(中)

前期回顾&#xff1a;抽象java入门1.5.3.1——类的进阶https://blog.csdn.net/c_yanxin_ru/article/details/140858898?spm1001.2014.3001.5501 总结&#xff1a; 在代码溯源中&#xff0c;我发现了一个奇怪的东西&#xff0c;就是OUT不是类中类&#xff08;不是常规类的写法…

题解 洛谷 Luogu P1873 [COCI 2011/2012 #5] EKO / 砍树 二分答案 C/C++

题目传送门&#xff1a; P1873 [COCI 2011/2012 #5] EKO / 砍树 - 洛谷 | 计算机科学教育新生态https://www.luogu.com.cn/problem/P1873思路&#xff1a; 很简单的二分答案 每次找区间中点 m&#xff0c;判断以 m 为高度砍下的木头是否够 h 即可 代码&#xff1a; #defin…

蓝桥杯第22场小白入门赛2~5题

这场比赛开打第二题就理解错意思了&#xff0c;还以为只能用3个消除和5个消除其中一种呢&#xff0c;结果就是死活a不过去&#xff0c;第三题根本读不懂题意&#xff0c;这蓝桥杯的题面我只能说出的是一言难尽啊。。第四题写出来一点但是后来知道是错了&#xff0c;不会正解&am…

PyQt天天酷跑游戏(附下载地址)

欢迎下载体验&#xff01; 文件大小&#xff1a;25.7 M 下载地址&#xff1a;链接&#xff1a;https://wwrr.lanzoul.com/ifOvc2fe163c 观看演示视频~ Pyqt-跑酷游戏 一&#xff0e;前言 天天酷跑大家都玩过吧&#xff0c;这是我们学生时代的回忆&#xff0c;目前这款游戏还…

跨平台WPF框架Avalonia教程 十五

ListBox 列表框 列表框从元素源集合中显示多行元素&#xff0c;并允许选择单个或多个。 列表中的元素可以组合、绑定和模板化。 列表的高度会扩展以适应所有元素&#xff0c;除非特别设置&#xff08;使用高度属性&#xff09;&#xff0c;或由容器控件设置&#xff0c;例如…

python蓝桥杯刷题2

1.最短路 题解&#xff1a;这个采用暴力枚举&#xff0c;自己数一下就好了 2.门牌制作 题解&#xff1a;门牌号从1到2020&#xff0c;使用for循环遍历一遍&#xff0c;因为range函数无法调用最后一个数字&#xff0c;所以设置成1到2021即可&#xff0c;然后每一次for循环&…

基于YOLOv8深度学习的独居老人情感状态监护系统(PyQt5界面+数据集+训练代码)

本研究提出了一种创新的独居老人情感状态监护系统&#xff0c;基于YOLOV8深度学习模型&#xff0c;旨在通过对老年人面部表情的实时监测与分析&#xff0c;来精准识别其情感变化&#xff0c;从而提高独居老人的生活质量&#xff0c;确保其心理健康。本系统通过整合先进的YOLOV8…

基于SSM的农家乐管理系统+论文示例参考

1.项目介绍 功能模块&#xff1a;管理员&#xff08;农家乐管理、美食信息管理、住宿信息管理、活动信息、用户管理、活动报名、论坛等&#xff09;&#xff0c;普通用户&#xff08;注册登录、活动报名、客房预订、用户评价、收藏管理、模拟支付等&#xff09;技术选型&#…

小米顾此失彼:汽车毛利大增,手机却跌至低谷

科技新知 原创作者丨依蔓 编辑丨蕨影 三年磨一剑的小米汽车毛利率大增&#xff0c;手机业务毛利率却出现下滑景象。 11月18日&#xff0c;小米集团发布 2024年第三季度财报&#xff0c;公司实现营收925.1亿元&#xff0c;同比增长30.5%&#xff0c;预估902.8亿元&#xff1b;…

【环境搭建】使用IDEA远程调试Docker中的Java Web

有时候要对Docker的Java Web远程调试其功能&#xff0c;于是就需要使用IDEA的远程调试功能&#xff0c;记录一下简单配置方法。 以Kylin4.0.0为例&#xff0c;首先拉取镜像并启动容器&#xff1a; $ docker pull apachekylin/apache-kylin-standalone:4.0.0$ docker run -d \-…

【AI图像生成网站Golang】项目架构

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与调试(等待更新) 四、项目架构 本项目的后端基于Golang和Gin框架开发&#xff0c;主要包括的模块有&#xff1a; backend/ ├── …

Centos7安装Jenkins脚本一键部署

公司原先Jenkins二进制安装&#xff0c;自己闲来无事在测试主机优化了一下&#xff0c;一键部署&#xff0c;jenkins2.426版本jdk11版本 #!/bin/bashjenkins_file"jenkins-2.426.3-1.1.noarch.rpm"# 更新软件包列表 echo "更新软件包列表..." sudo yum up…

【WPF】Prism学习(五)

Prism Commands 1.错误处理&#xff08;Error Handling&#xff09; Prism 9 为所有的命令&#xff08;包含AsyncDelegateCommand&#xff09;提供了更好的错误处理。 避免用try/catch包装每一个方法根据不同遇到的异常类型来提供特定的逻辑处理可以在多个命令之间共享错误处…

Ubuntu 18.04 配置sources.list源文件(无法安全地用该源进行更新,所以默认禁用该源)

如果你 sudo apt update 时出现诸如 无法安全地用该源进行更新&#xff0c;所以默认禁用该源 的错误&#xff0c;那就换换源吧&#xff0c;链接&#xff1a; https://mirror.tuna.tsinghua.edu.cn/help/ubuntu/ 注意版本&#xff1a; 修改源文件&#xff1a; sudo nano /etc…

C++ —— 剑斩旧我 破茧成蝶—C++11

江河入海&#xff0c;知识涌动&#xff0c;这是我参与江海计划的第2篇。 目录 1. C11的发展历史 2. 列表初始化 2.1 C98传统的{} 2.2 C11中的{} 2.3 C11中的std::initializer_list 3. 右值引用和移动语义 3.1 左值和右值 3.2 左值引用和右值引用 3.3 引用延长生命周期…

04 - Clickhouse-21.7.3.14-2单机版安装

目录 一、准备工作 1、确定防火墙处于关闭状态 2、CentOS 取消打开文件数限制 3、安装依赖 4、CentOS取消SELINUX 二、单机安装 2.1、下载安装 2.2、安装这4个rpm包 2.3、修改配置文件 2.4、启动服务 2.5、关闭开机自启 2.6、使用Client连接server 一、准备工作 1…

STM32设计学生宿舍监测控制系统-分享

目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 电路图采用Altium Designer进行设计&#xff1a; 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 本项目旨在利用STM32单片机为核心&#xff0c;结合传感器技术、无线通信技…

macOS 的目录结构

文章目录 根目录 (/)常见目录及其用途示例目录结构注意事项根目录 (/)主要目录及其含义其他目录总结 macOS 的目录结构无论是在 Intel 架构还是 ARM 架构的 Mac 电脑上都是相同的。macOS 的目录结构遵循 Unix 和 BSD 的传统&#xff0c;具有许多标准目录。以下是一些主要目录及…

日常ctf

15&#xff0c; [MoeCTF 2021]Web安全入门指北—小饼干 直接改就行了 16&#xff0c; [MoeCTF 2021]2048 传入参数就获取到flag了 /flag.php?score500000000 17&#xff0c; [SWPUCTF 2022 新生赛]funny_web 账户密码是 NSS 2122693401 登录进去查看源码 考intval缺陷&…

【java】java入门

盘符名称冒号---------盘符切换 dir---------------查看当前路径下的内容 cd目录--------进入单级目录 cd..----------回退到上一级目录 cd \----------回退到盘符目录 cls----------清屏 exit 为什么要配环境变量&#xff1f; 在任意的目录下都可以打开指定的软件。把软件的路…