掌握RabbitMQ:全面知识点汇总与实践指南

news2025/1/8 6:36:02

前言

RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。

特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。

作用:服务间异步通信;顺序消费;定时任务;请求削峰;

1、AMQP协议定义

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输

特性AMQPMQTT
适用场景大型企业级应用、金融交易、云服务物联网、移动应用、智能家居
通信模式生产者-消费者发布-订阅
消息大小较大,适合复杂的消息结构小型,适合简单的消息
QoS 级别支持,但不如 MQTT 精细详细的 QoS 级别,特别是针对 IoT 场景
性能要求对性能有一定要求,但更注重可靠性和安全性极低的带宽消耗和资源占用
安全性强调端到端的安全性支持基本的安全特性,适用于资源受限环境

2、AMQP机制

1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。

  • 生产者发送消息到Broker消息代理服务
  • 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
  • 队列存储消息,直到消费者取走消息
  • 消费者,读取队列中的消息

AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。

类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。

2>AMQP消息传递方式

特性点对点模式 (P2P)发布/订阅模式 (Pub/Sub)
消息传递方式每条消息仅被一个消费者处理一条消息可以被多个消费者同时接收
队列数量单个队列每个消费者有自己的队列
生产者行为直接发送到队列发送到交换器,由交换器负责路由
消费者行为从同一队列中竞争消费各自独立消费自己的队列中的消息
适用场景任务分配、工作流管理广播通知、日志记录、事件驱动架构
扩展性受限于单个队列的吞吐量可以通过增加更多的消费者来提高整体吞吐量
复杂度较低,易于理解和实现需要考虑交换器类型、路由规则等因素,稍微复杂

在这里插入图片描述

  • 1、点对点
    生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
    每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。

竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。

// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());

// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    // 执行任务...
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});

  • 2、发布订阅
    生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本

(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息

// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());

// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    // 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

3、AMQP消息只被消费一次

  • 1、合理配置消息队列ACK机制
    大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
  • 2、合理配置消息队列预取数量
    防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
  • 3、消费者幂等性设计
    针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
    确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
    增加补偿机制,比如退款,退积分等概念的操作
  • 4、分布式锁
    借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。
  • 5、监控告警机制
    监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。
  • 6、事务性消息
    指的是消息和业务操作,一起成功或一起失败的机制。
    (1)本地事务+补偿机制
    (2)二阶段提交
    引入协调者和参与者的概念。
    客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
    (3)三阶段提交
    针对二阶段提交完善事务性消息机制。
    首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。

4、AMQP 消息顺序消费

  • 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
  • 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:channel.basicQos(1);
  • 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
  • 另外使用幂等性设计来避免重复消费。
  • 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
  • 增加监控告警到服务负责人。
  • 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。

5、AMQP消息可靠性

  • 事务支持
    允许一组操作作为一个整体提交或回滚。
  • ACK确认机制
    当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。
  • 持久化选项
    可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。

6、RabbitMQ配置ACK

1>rabbitmq.conf或rabbitmq.ini开启配置

# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on

2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class RabbitMQConsumer {
    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列,确保它存在
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 设置预取计数为 1,确保每次只处理一条消息
            channel.basicQos(1);

            // 开启手动确认模式
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 模拟任务处理时间
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                } finally {
                    System.out.println(" [x] Done");
                    // 手动发送 ACK 确认
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };

            // 开始消费消息
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
}

3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认

// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    if (!channel.waitForConfirmsOrDie(timeout)) {
        // 处理未确认的消息...
    }
} catch (Exception e) {
    // 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    if (!channel.waitForConfirmsOrDie(timeout)) {
        // 处理未确认的消息...
    }
} catch (Exception e) {
    // 处理异常情况...
}

// 开启事务模式
channel.txSelect();
try {
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    channel.txCommit();
} catch (Exception e) {
    channel.txRollback();
}

7、RabbitMQ配置协议

1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。

# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]

# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile   = /path/to/server_certificate.pem
ssl_options.keyfile    = /path/to/private_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671

# 设置心跳间隔时间为 60 秒
heartbeat = 60

8、RabbitMQ消息持久化

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PersistentExample {
    private final static String QUEUE_NAME = "persistent_queue";
    private final static String EXCHANGE_NAME = "persistent_exchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 创建持久化的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            // 创建持久化的队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 绑定队列到交换器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");

            // 发送持久化的消息
            String message = "Persistent message!";
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 2 表示持久化
                    .build();
            channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
  • 1、持久化队列
    channel.queueDeclare("durable_queue", true, false, false, null);
  • 2、交换器持久化
    确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
    channel.exchangeDeclare("durable_exchange", "direct", true);
  • 3、消息持久化
    delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
    channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());

9、RabbitMQ自动重连

网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态

ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

10、RabbitMQ组件

组件名称说明
Producer生产者负责生成并发送消息的应用程序。
Consumer消费者负责接收并处理消息的应用程序。
Message消息承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。
Exchange交换机用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。
Queue队列存储待处理消息的地方,消费者从中拉取消息进行处理。
Binding绑定定义了交换机和队列之间的关系,包括路由键等参数。
Virtual Host虚拟主机类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。

11、RabbitMQ核心组件交换器和路由键

交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。

消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。

消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。

若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。

交换器说明应用场景
Direct精确匹配路由键只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。
Topic基于通配符模式匹配路由键适用于灵活的消息过滤和多条件匹配。
Fanout广播所有消息给所有绑定的队列适用于需要将相同消息发送给多个消费者的场景。
Headers根据消息头属性进行路由适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。

1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。

  • 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");

// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");

// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class DirectConsumer {
    private final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Binding Key
            if (argv.length < 1) {
                System.err.println("Usage: DirectConsumer [info] [warning] [error]");
                System.exit(1);
            }
            for (String severity : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列

  • 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");

// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");

// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class FanoutConsumer {
    private final static String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词

  • 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");

// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");

// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class TopicConsumer {
    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Binding Key 模式
            if (argv.length < 1) {
                System.err.println("Usage: TopicConsumer [binding_key_pattern]");
                System.exit(1);
            }
            for (String bindingKey : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。

  • 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");

// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);

// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .headers(headers)
        .build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
  • 消费者
import com.rabbitmq.client.*;

public class HeadersConsumer {
    private final static String EXCHANGE_NAME = "headers_exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明 Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "headers");

            // 创建临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列到 Exchange,并指定 Headers 匹配规则
            Map<String, Object> headers = new HashMap<>();
            headers.put("user_id", "12345");
            headers.put("order_status", "pending");
            AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

12、RabbitMQ核心方法及参数说明

1>newConnection 创建连接工程并开启连接

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();

2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据

信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。

Channel channel = connection.createChannel();

3>exchangeDeclare 交换器声明

channel.exchangeDeclare("my_exchange", "direct", true, false, null);

exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。

4>queueDeclare 队列声明

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。

5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则

channel.queueBind(queueName, "my_exchange", "routing_key");

queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则

6>basicPublish 发布消息
向指定的交换器发布一条消息

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .contentType("text/plain")
        .deliveryMode(2) // 2 表示持久化
        .build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());

exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。

7>basicConsume 消费消息
费来自指定队列的消息

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用

8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费

channel.basicAck(envelope.getDeliveryTag(), false);

9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃

// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);

13、RabbitMQ镜像集群模式

搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。

参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272478.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react构建项目报错 `npm install --no-audit --save @testing-l

这应该是我们想构建 react18 的项目&#xff0c;但是 通过 npx create-react-app my-app进行构建时&#xff0c;给我们安装的依赖是 react 19 下面提供一下我的解决方法&#xff1a; 第一步&#xff1a;在 package.json 中把依赖 react19 改为 react 18 第二步&#xff1a;添…

App窗口创建流程(Android12 )

有关的窗口对象 PhoneWindowActivityThread#performLaunchActivity {Activity.attach}Surface new ViewRootImpl 创建null对象mSurface.transferFrom(getOrCreateBLASTSurface())//填充内容创建native层的SurfaceLayerSurfaceFlinger::createLayerRenderSurfaceSurfaceFlinger…

LLM之Agent(十三)| 使用 PydanticAI 框架构建多代理LLM 系统(保姆教程)

Pydantic 是 Python 生态系统中的强大平台,每月下载量超过 2.85 亿次。现在,Pydantic的创始人也正在通过 Pydantic AI 涉足 AI 的前沿领域,Pydantic AI 是一个专为构建由生成式 AI 提供支持的生产级应用程序的框架。在本文中,我们将深入探讨 Pydantic AI 的独特之处、它的主…

常用的数据结构API概览

List ArrayList 1、在初始化一个ArrayList的时候&#xff0c;如果我想同时set一些值 比如存放int[ ] List<int[]> list new ArrayList(Arrays.asList(new int[]{intervals[0][0],intervals[0][1]}));//或者int[] temp new int[]{intervals[0][0],intervals[0][1]}…

年会游戏大全 完整版见考试宝

企业年会游戏大全&#xff08;35个&#xff09; 1.泡泡糖 游戏准备&#xff1a;主持人召集若干人上台&#xff0c;人数最好是奇数。 游戏规则&#xff1a;当大家准备好时&#xff0c;主持人喊“泡泡糖”大家要回应“粘什么”&#xff0c;主持人随机想到身体的某个部位&#x…

用豆包去除文章Ai味和重复率,实操教程

用AI生成的文章总是有“AI味”或者重复率高的问题&#xff1f; 今天就教你如何使用豆包轻松去除这些问题 让你的文章更自然、更具个人风格&#xff01;✍️✨ 详细版指令教程都整理了&#xff0c;纯粹F享啦~

【论文复现】改进麻雀搜索算法优化冷水机组的最优负载调配问题

目录 1.摘要2.麻雀搜索算法SSA原理3.改进策略4.结果展示5.参考文献6.代码获取 1.摘要 为了应对暖通空调&#xff08;HVAC&#xff09;系统由于不当负荷分配导致的高能源消耗问题&#xff0c;本文提出了一种改进麻雀搜索算法&#xff08;ISSA&#xff09;。ISSA算法旨在在满足负…

分布式ID生成-雪花算法实现无状态

雪花算法这里不再赘述&#xff0c;其缺点是有状态&#xff08;多副本隔离时&#xff0c;依赖手动配置workId和datacenterId&#xff09;&#xff0c;代码如下&#xff1a; /*** 雪花算法ID生成器*/ public class SnowflakeIdWorker {/*** 开始时间截 (2017-01-01)*/private st…

四、对象图

对象图 、对象图概述 含义&#xff1a; 对象图显示了某一时刻的一组对象及它们之间的关系。 作用&#xff1a; 对象图可以看做是类图的实例&#xff0c;用来表达各个对象在某一时刻的状态。 组成&#xff1a; 对象图中的建模元素主要有对象和链&#xff0c;对象是类的实…

2025/1/4期末复习 密码学 按老师指点大纲复习

我们都要坚信&#xff0c;道路越是曲折&#xff0c;前途越是光明。 --------------------------------------------------------------------------------------------------------------------------------- 现代密码学 第五版 杨波 第一章 引言 1.1三大主动攻击 1.中断…

【已解决】Django连接mysql报错Did you install mysqlclient?

解决报错&#xff1a;from err django.core.exceptions.ImproperlyConfigured: Error loading MySQLdb module. Did you install mysqlclient&#xff1f; 在终端执行python manage.py makemigrations报错问题汇总 错误1&#xff1a;已安装mysqlclient&#xff0c;提示Did yo…

【C语言】可移植性陷阱与缺陷(七): 除法运算时发生的截断

在C语言编程中&#xff0c;除法运算可能会引发一些与可移植性相关的问题&#xff0c;特别是当涉及到整数除法时发生的截断&#xff08;truncation&#xff09;。不同平台对于整数除法的行为和处理方式可能会有所不同&#xff0c;这可能导致代码在不同编译器或硬件平台上的行为不…

有限元分析学习——Anasys Workbanch第一阶段笔记(7)对称问题预备水杯案例分析

目录 1 序言 2 水杯案例 2.1 添加新材料 2.2 水压设置 2.3 约束边界条件设置及其结果 2.3.1 全约束固定(压缩桌面、Fixed support固定水杯底面) 2.3.2 单方面位移约束(压缩桌面、Displacement约束软弹簧) 2.3.3 接触约束(不压缩桌面、Fixed support 固定桌面、Frictional…

Spring Boot(4)使用 IDEA 搭建 Spring Boot+MyBatis 项目全流程实战

文章目录 一、⚡搞个引言二、⚡开始搭建 Spring Boot 项目吧&#xff01;2.1 启动 IDEA 并创建新项目2.2 选择项目依赖2.3 完成项目创建 三、&#x1f4d8;项目结构剖析四、✍配置数据库连接五、✍ 创建 MyBatis 相关组件5.1 实体类&#xff08;Entity&#xff09;5.2 Mapper 接…

[服务器][教程]Ubuntu24.04 Server开机自动挂载硬盘教程

1. 查看硬盘ID ls -l /dev/disk/by-uuid可以看到对应的UUID所对应的分区 2. 创建挂载文件夹 创建好文件夹即可 3. 修改配置文件 sudo vim /etc/fstab把对应的UUID和创建的挂载目录对应即可 其中# Personal mount points下面的是自己新添加的 &#xff1a;分区定位&#xff…

抢先体验:人大金仓数据库管理系统KingbaseES V9 最新版本 CentOS 7.9 部署体验

一、简介 KingbaseES 是中国人大金仓信息技术股份有限公司自主研发的一款通用关系型数据库管理系统&#xff08;RDBMS&#xff09;。 作为国产数据库的杰出代表&#xff0c;它专为中国市场设计&#xff0c;广泛应用于政府、金融、能源、电信等关键行业&#xff0c;以高安全性…

家教老师预约平台小程序系统开发方案

家教老师预约平台小程序系统将连接学生/家长与家教老师&#xff0c;提供一站式的家教服务预约体验。 一、用户需求分析1、家教老师&#xff1a;希望获得更多的学生资源&#xff0c;通过平台展示自己的教学特长和经验&#xff0c;管理个人日程&#xff0c;接收并确认预约请求&a…

基于Python的音乐播放器 毕业设计-附源码73733

摘 要 本项目基于Python开发了一款简单而功能强大的音乐播放器。通过该音乐播放器&#xff0c;用户可以轻松管理自己的音乐库&#xff0c;播放喜爱的音乐&#xff0c;并享受音乐带来的愉悦体验。 首先&#xff0c;我们使用Python语言结合相关库开发了这款音乐播放器。利用Tkin…

云架构Web端的工业MES系统设计之区分工业过程

云架构Web端的工业MES系统设计之区分工业过程 在当今数字化浪潮席卷全球的背景下,制造业作为国家经济发展的重要支柱产业,正面临着前所未有的机遇与挑战。市场需求的快速变化、客户个性化定制要求的日益提高以及全球竞争的愈发激烈,都促使制造企业必须寻求更加高效、智能的生产…

TCP协议:三次握手、四次挥手

文章目录 三次握手1. 什么是三次握手&#xff1f;2. 为什么是三次握手&#xff1f; 四次挥手1. 什么是四次挥手&#xff1f;2. 为什么是四次挥手&#xff1f; 引用 三次握手 1. 什么是三次握手&#xff1f; 三次握手是TCP协议中用于建立连接的过程。 第一次&#xff0c;表示请…