RabbitMQ 教程 | 第3章 客户端开发向导

news2025/1/20 3:55:08

👨🏻‍💻 热爱摄影的程序员
👨🏻‍🎨 喜欢编码的设计师
🧕🏻 擅长设计的剪辑师
🧑🏻‍🏫 一位高冷无情的编码爱好者
大家好,我是 DevOps 工程师
欢迎分享 / 收藏 / 赞 / 在看!

这篇 RabbitMQ 教程为学习者提供了全面的内容,从 RabbitMQ 的简介开始,涵盖了消息中间件的概念、RabbitMQ 的安装与使用,以及交换机、队列、路由键等相关概念的介绍。进一步深入,教程探讨了 AMQP 协议、客户端开发向导,以及消息的发送和消费方式。同时,学习者还可以了解消息传输保障、高级特性如死信队列、延迟队列、优先级队列、RPC 实现等。此外,教程还涵盖了 RabbitMQ 的管理、配置、运维、监控和集群管理等重要主题,帮助学习者充分掌握 RabbitMQ 的应用。整篇教程丰富内容详实,适合初学者和有经验的开发者参考学习。

全篇共 11 章,9 万余字。本文:第3章 客户端开发向导。

第3章 客户端开发向导

3.1 连接 RabbitMQ

在本节中,我们将学习如何使用 RabbitMQ 的客户端库与 RabbitMQ 服务器建立连接。

在 Spring Boot 中,连接 RabbitMQ 可以通过 Spring AMQP 库来实现。Spring AMQP 是 Spring 对 AMQP 协议的封装,使得在 Spring Boot 应用中连接和使用 RabbitMQ 变得非常方便。下面是连接 RabbitMQ 的步骤:

  1. 添加依赖: 在 pom.xml 文件中添加 Spring Boot 和 Spring AMQP 的依赖。确保你已经正确配置了 Maven 或 Gradle 来管理依赖。
<!-- Spring Boot Starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置 RabbitMQ 连接信息: 在 application.properties(或 application.yml)文件中配置 RabbitMQ 的连接信息,包括主机名、端口、用户名、密码等。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

可以根据实际情况修改上述配置信息,确保连接到正确的 RabbitMQ 服务器。

  1. 创建 RabbitMQ 连接工厂: 在 Spring Boot 中,通过 ConnectionFactory 来创建 RabbitMQ 连接。可以直接使用 CachingConnectionFactory,它是 ConnectionFactory 的实现,会缓存连接,提高连接的复用性和性能。
@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
}
  1. 创建 RabbitTemplate: RabbitTemplate 是 Spring AMQP 提供的用于与 RabbitMQ 交互的模板类。它封装了发送和接收消息的方法,简化了与 RabbitMQ 的交互过程。
@Configuration
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        // ...
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
  1. 发送和接收消息: 现在,你可以在应用中使用 RabbitTemplate 来发送和接收消息了。通过调用 convertAndSend 方法发送消息,调用 receiveAndConvert 方法接收消息。
@RestController
public class MessageController {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MessageController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @PostMapping("/send")
    public String sendMessage(@RequestBody String message) {
        rabbitTemplate.convertAndSend("exchange", "routingKey", message);
        return "Message sent successfully!";
    }
}

这是一个简单的示例,通过发送 POST 请求,将消息发送到名为"exchange"的交换机,并使用"routingKey"进行路由。

以上就是在 Spring Boot 中连接 RabbitMQ 的基本步骤。通过 Spring AMQP 的封装,你可以轻松地在应用中与 RabbitMQ 进行交互,实现可靠的消息传递。

3.2 使用交换机和队列

在 RabbitMQ 中,交换机(Exchange)和队列(Queue)是两个重要的组件,它们一起协同工作,实现消息的传递和路由。以下是 RabbitMQ 使用交换机和队列的基本流程:

  1. 声明交换机: 首先,生产者或消费者需要声明一个交换机。交换机是消息的接收和路由中心,负责接收来自生产者的消息,并根据消息的路由键将消息路由到一个或多个队列中。声明交换机时,需要指定交换机的名称、类型和其他相关参数。
  2. 声明队列: 接下来,生产者或消费者需要声明一个队列。队列用于存储消息,生产者发送的消息会被交换机路由到队列中,而消费者从队列中接收消息进行处理。声明队列时,需要指定队列的名称、是否持久化、是否独占等参数。
  3. 绑定交换机和队列: 在声明了交换机和队列之后,需要将队列绑定到交换机上。绑定是指将队列与交换机关联起来,使得交换机可以将消息路由到指定的队列中。在绑定时,需要指定交换机的名称、队列的名称以及绑定键(Binding Key)等参数。
  4. 生产者发送消息: 生产者使用指定的交换机和路由键将消息发送到 RabbitMQ 服务器。消息发送到交换机后,交换机会根据消息的路由键将消息路由到绑定到它的队列中。
  5. 消费者接收消息: 消费者订阅队列,开始接收消息。当队列中有消息到达时,RabbitMQ 服务器将消息传递给消费者的消息回调函数。消费者可以在回调函数中处理收到的消息。

通过交换机和队列的灵活组合,RabbitMQ 可以实现不同类型的消息传递模式,如点对点(Point-to-Point)和发布-订阅(Publish-Subscribe)。生产者将消息发送到交换机,交换机根据消息的路由键将消息路由到相应的队列中,消费者订阅队列并接收消息进行处理。这种灵活的消息传递机制使得 RabbitMQ 非常适用于构建可靠的消息传递系统。

3.2.1 exchangeDeclare 方法详解

exchangeDeclare 方法是用于在 RabbitMQ 中声明交换机(Exchange)的方法。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。exchangeDeclare 方法的详细参数和含义如下:

void exchangeDeclare(
    String exchangeName,    // 交换机名称
    String exchangeType,    // 交换机类型
    boolean durable,        // 是否持久化
    boolean autoDelete,     // 是否自动删除
    boolean internal,       // 是否是内部使用的交换机
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. exchangeName(String):要声明的交换机的名称,是一个字符串。交换机名称在 RabbitMQ 中必须是唯一的。
  2. exchangeType(String):交换机的类型,是一个字符串。RabbitMQ 支持不同类型的交换机,常用的类型包括"direct"、"fanout"、"topic"和"headers"。不同类型的交换机有不同的消息路由规则。
  3. durable(boolean):是否持久化交换机。如果设置为 true,交换机会在 RabbitMQ 服务器重启后仍然存在,否则会在服务器重启时删除。
  4. autoDelete(boolean):是否自动删除交换机。如果设置为 true,当交换机不再被使用时(没有队列绑定到该交换机),RabbitMQ 会自动删除该交换机。
  5. internal(boolean):是否是内部使用的交换机。如果设置为 true,该交换机只能被 RabbitMQ 内部使用,客户端无法直接发送消息到该交换机。
  6. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如交换机的备份参数、TTL 参数等。

示例使用代码:

public class ExchangeDeclareExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String exchangeName = "myExchange";
            String exchangeType = "fanout";
            boolean durable = true;
            boolean autoDelete = false;
            boolean internal = false;
            Map<String, Object> arguments = new HashMap<>();

            channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, internal, arguments);
            System.out.println("Exchange declared successfully!");
        }
    }
}

上述示例中,创建了一个名为"myExchange"的 fanout 类型的持久化交换机。你可以根据需要选择不同类型的交换机,并根据业务需求设置其他参数。成功执行 exchangeDeclare 方法后,交换机将在 RabbitMQ 服务器中声明并可用于消息的路由和传递。

3.2.2 queueDeclare 方法详解

queueDeclare 方法是用于在 RabbitMQ 中声明队列(Queue)的方法。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。queueDeclare 方法的详细参数和含义如下:

AMQP.Queue.DeclareOk queueDeclare(
    String queueName,        // 队列名称
    boolean durable,         // 是否持久化
    boolean exclusive,       // 是否独占队列
    boolean autoDelete,      // 是否自动删除
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. queueName(String):要声明的队列的名称,是一个字符串。队列名称在 RabbitMQ 中必须是唯一的。
  2. durable(boolean):是否持久化队列。如果设置为 true,队列会在 RabbitMQ 服务器重启后仍然存在,否则会在服务器重启时删除。
  3. exclusive(boolean):是否独占队列。如果设置为 true,该队列只能被当前连接使用,其他连接无法访问。通常用于临时队列。
  4. autoDelete(boolean):是否自动删除队列。如果设置为 true,当队列不再被使用时(没有消费者订阅该队列),RabbitMQ 会自动删除该队列。
  5. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如队列的 TTL 参数、死信队列参数等。

示例使用代码:

public class QueueDeclareExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String queueName = "myQueue";
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            Map<String, Object> arguments = new HashMap<>();

            channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);
            System.out.println("Queue declared successfully!");
        }
    }
}

上述示例中,创建了一个名为"myQueue"的持久化队列。你可以根据需要设置队列的持久性、独占性、自动删除和其他参数。成功执行 queueDeclare 方法后,队列将在 RabbitMQ 服务器中声明并可用于消息的接收和处理。

3.2.3 queueBind 方法详解

queueBind 方法用于在 RabbitMQ 中将队列(Queue)绑定到交换机(Exchange)。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。queueBind 方法的详细参数和含义如下:

void queueBind(
    String queueName,        // 队列名称
    String exchangeName,     // 交换机名称
    String routingKey,       // 绑定键(Binding Key)
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. queueName(String):要绑定的队列的名称,是一个字符串。
  2. exchangeName(String):要绑定的交换机的名称,是一个字符串。
  3. routingKey(String):绑定键(Binding Key),用于指定交换机将消息路由到队列的规则。不同类型的交换机对绑定键的匹配规则有所不同。
  4. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如绑定的头信息(Headers)等。

示例使用代码:

public class QueueBindExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String queueName = "myQueue";
            String exchangeName = "myExchange";
            String routingKey = "myRoutingKey";
            Map<String, Object> arguments = new HashMap<>();

            channel.queueBind(queueName, exchangeName, routingKey, arguments);
            System.out.println("Queue bound to exchange successfully!");
        }
    }
}

上述示例中,将名为"myQueue"的队列绑定到名为"myExchange"的交换机,绑定键为"myRoutingKey"。根据实际情况,你需要设置正确的队列名称、交换机名称和绑定键。成功执行 queueBind 方法后,交换机将根据绑定键的规则将消息路由到队列中,从而实现消息的传递和处理。

3.2.4 exchangeBind 方法详解

exchangeBind 方法用于在 RabbitMQ 中将一个交换机(Exchange)绑定到另一个交换机。在使用该方法前,需要先连接到 RabbitMQ 服务器,并创建一个通道(Channel)。exchangeBind 方法的详细参数和含义如下:

void exchangeBind(
    String destinationExchange,   // 目标交换机名称
    String sourceExchange,        // 源交换机名称
    String routingKey,            // 绑定键(Binding Key)
    Map<String, Object> arguments // 其他参数
) throws IOException;

参数解释:

  1. destinationExchange(String):要绑定到的目标交换机的名称,是一个字符串。
  2. sourceExchange(String):要绑定的源交换机的名称,是一个字符串。
  3. routingKey(String):绑定键(Binding Key),用于指定交换机将消息从源交换机路由到目标交换机的规则。不同类型的交换机对绑定键的匹配规则有所不同。
  4. arguments(Map<String, Object>):其他参数,是一个键值对的 Map。可以通过该参数设置一些额外的参数,例如绑定的头信息(Headers)等。

示例使用代码:

public class ExchangeBindExample {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String destinationExchange = "destinationExchange";
            String sourceExchange = "sourceExchange";
            String routingKey = "myRoutingKey";
            Map<String, Object> arguments = new HashMap<>();

            channel.exchangeBind(destinationExchange, sourceExchange, routingKey, arguments);
            System.out.println("Exchange bound successfully!");
        }
    }
}

上述示例中,将名为"destinationExchange"的交换机绑定到名为"sourceExchange"的交换机,绑定键为"myRoutingKey"。根据实际情况,你需要设置正确的目标交换机名称、源交换机名称和绑定键。成功执行 exchangeBind 方法后,消息从源交换机会被路由到目标交换机,进而实现更灵活的消息传递和路由。

3.2.5 何时创建

在 RabbitMQ 中,创建交换机和队列的时机取决于你的应用需求和消息传递模式。不同的场景可能需要不同的处理方式。以下是一些常见的场景和处理建议:

  1. 确定消息传递模式: 在创建交换机和队列之前,首先要明确你的消息传递模式。是点对点传输还是发布-订阅模式?根据消息传递模式,选择合适的交换机类型和绑定方式。
  2. 静态创建交换机和队列: 如果你的交换机和队列在应用启动时就已经确定,且不会动态变化,可以在应用启动时静态创建它们。这样做可以确保交换机和队列在应用运行期间一直可用。
  3. 动态创建交换机和队列: 如果你的交换机和队列是根据实际情况动态变化的,可以在需要时动态创建它们。例如,根据用户的订阅行为,动态创建队列用于订阅特定类型的消息。这样做可以节省资源,避免不必要的队列和交换机占用空间。
  4. 创建时机:
    • 对于持久化的交换机和队列,建议在应用启动时创建,确保它们在服务器重启后仍然存在。
    • 对于临时的交换机和队列,可以在需要时创建,使用完毕后再自动删除,节省资源。
  1. 错误处理:
    • 如果创建交换机或队列失败,应该处理创建失败的情况,例如记录日志、重试或通知管理员。
    • 在动态创建交换机和队列时,还需要考虑并发访问和资源竞争的情况,确保创建过程的线程安全性。
  1. 防止重复创建:
    • 在动态创建交换机和队列时,需要注意防止重复创建相同名称的交换机和队列。可以使用缓存或数据库记录已创建的交换机和队列,避免重复创建。

创建交换机和队列的时机和处理方式应该根据你的业务需求和消息传递模式来确定。灵活地根据实际情况选择静态或动态创建,以及持久化或临时创建,确保消息传递的高效性和可靠性。同时,还要注意错误处理和资源竞争等情况,保证应用的稳定性和可靠性。

3.3 发送消息

使用 RabbitMQ 的客户端库向交换机发送消息通常涉及以下步骤:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是发送和接收消息的基础。
  2. 声明交换机: 在发送消息之前,你需要先声明要使用的交换机(Exchange)。交换机负责将消息路由到队列。
  3. 发布消息: 使用 basicPublish 方法将消息发布到交换机。在这里,你需要指定交换机的名称、路由键(Routing Key)以及要发送的消息内容。
  4. 关闭连接和通道: 当发送完所有消息后,记得及时关闭通道和连接,以释放资源。

下面是一个使用 RabbitMQ 的 Java 客户端库实现向交换机发送消息的简单示例:

public class MessagePublisher {

    private static final String EXCHANGE_NAME = "myExchange";
    private static final String ROUTING_KEY = "myRoutingKey";
    private static final String MESSAGE = "Hello RabbitMQ!";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 发布消息
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
            System.out.println("Message sent successfully!");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项:

  1. 确保交换机已经存在: 在发送消息之前,确保你要使用的交换机已经在 RabbitMQ 服务器上声明过。否则,消息将无法正确路由到队列。
  2. 确认路由键和交换机类型匹配: 确保发送消息时指定的路由键和交换机类型匹配,否则消息可能无法正确路由到队列。
  3. 处理异常: 在发送消息时,可能会出现网络异常或其他错误。在实际应用中,建议捕获异常并处理,例如记录日志或重试。
  4. 避免阻塞: 在发送消息时,不要在主线程中执行阻塞操作。如果发送大量消息,考虑使用异步发送消息,避免主线程阻塞。
  5. 注意消息序列化: 在实际应用中,你可能需要将复杂对象转换为字节流进行传输。在这种情况下,需要考虑消息的序列化和反序列化。

使用 RabbitMQ 的客户端库向交换机发送消息是一个相对简单的过程。需要注意交换机和队列的声明,正确设置路由键和交换机类型,处理异常情况,以及避免阻塞操作。合理地使用异步发送消息可以提高系统性能和吞吐量。同时,根据实际需求进行消息序列化和反序列化,确保消息的正确传递和处理。

3.4 消费消息

使用 RabbitMQ 的客户端库从队列中消费消息通常涉及以下步骤:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是接收和处理消息的基础。
  2. 声明队列: 在消费消息之前,你需要先声明要消费的队列(Queue)。如果队列不存在,RabbitMQ 会自动创建一个新的队列。
  3. 创建消费者: 使用 DefaultConsumer 类创建一个消费者,该类是 RabbitMQ 客户端库中提供的默认消费者实现。
  4. 注册消费者: 使用 basicConsume 方法将消费者注册到队列上,从而开始接收和处理消息。
  5. 处理消息: 在消费者的 handleDelivery 方法中,你可以自定义对接收到的消息进行处理的逻辑。
  6. 关闭连接和通道: 当消费消息的任务完成后,记得及时关闭通道和连接,以释放资源。

下面是一个使用 RabbitMQ 的 Java 客户端库实现从队列中消费消息的简单示例:

public class MessageConsumer {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 创建消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                }
            };

            // 注册消费者
            channel.basicConsume(QUEUE_NAME, true, consumer);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

不同消费模式的选择:

  1. 自动确认模式(Automatic Acknowledgment):
    • 设置 autoAck 参数为 true,表示自动确认消息。
    • 适用于简单、不需要保证消息处理一次性的场景。
    • 只要消息被消费者接收,RabbitMQ 就会将消息从队列中删除,不管消费者是否处理成功。
  1. 手动确认模式(Manual Acknowledgment):
    • 设置 autoAck 参数为 false,表示手动确认消息。
    • 需要在消费者处理完消息后,调用 basicAck 方法手动确认消息,告知 RabbitMQ 消息已被处理。
    • 适用于需要确保消息处理的可靠性和一次性处理的场景。
  1. 消费者预取(Consumer Prefetch):
    • 使用 basicQos 方法设置 prefetchCount 参数,表示消费者一次性从队列中预取的消息数量。
    • 通过合理设置预取数量,可以提高消息处理的吞吐量和性能。

注意事项:

  1. 防止消息丢失:
    • 确保在处理消息时,发生异常时不要丢失消息。可以捕获异常后重新处理或记录日志。
  1. 防止消息阻塞:
    • 避免在消费者的 handleDelivery 方法中执行耗时的操作,避免阻塞其他消息的处理。
  1. 并发处理:
    • 在多线程环境中,需要确保消费者的线程安全性,避免并发问题。

使用 RabbitMQ 的客户端库从队列中消费消息是一个相对简单的过程。根据实际需求选择自动确认模式或手动确认模式,注意消息的处理可靠性和防止阻塞,合理设置消费者预取参数以提高性能。在实际应用中,还需要考虑异常处理、并发问题和消息丢失等情况,确保消息的可靠传递和处理。

3.4.1 推模式

推模式是一种消息消费方式,其中消费者主动从队列中取出消息并进行处理。相比于拉模式,推模式更加主动和实时。

推模式的消费方式可以通过以下步骤实现:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是发送和接收消息的基础。
  2. 声明队列: 在消费消息之前,你需要先声明要消费的队列(Queue)。如果队列不存在,RabbitMQ 会自动创建一个新的队列。
  3. 注册消费者: 使用 basicConsume 方法将消费者注册到队列上,从而开始接收和处理消息。需要设置autoAck参数为false,以使用手动确认模式。
  4. 获取消息: 使用 basicGet 方法主动从队列中获取一条消息。该方法会立即返回,无论队列中是否有消息。如果队列为空,返回的消息对象为 null。
  5. 处理消息: 对于获取的消息对象,你可以进行相应的处理操作,例如解析消息内容、执行业务逻辑等。
  6. 手动确认: 在消息处理完成后,调用 basicAck 方法手动确认消息。通过向 RabbitMQ 发送确认信息,告知它消息已被处理。
  7. 重复获取和处理消息: 重复执行步骤 4 至步骤 6,即循环获取和处理队列中的消息。

下面是一个使用 RabbitMQ 的 Java 客户端库实现推模式消费消息的简单示例:

public class PushConsumer {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 注册消费者
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);

                    // 处理消息

                    // 手动确认消息
                    try {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });

            // 不断循环获取和处理消息
            while (true) {
                // 获取消息
                GetResponse response = channel.basicGet(QUEUE_NAME, false);

                if (response != null) {
                    // 处理消息
                    String message = new String(response.getBody(), "UTF-8");
                    System.out.println("Received message: " + message);

                    // 手动确认消息
                    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项:

  1. 避免阻塞: 在获取消息和处理消息的过程中,避免阻塞操作,以允许消费者能够及时获取到消息并进行处理。
  2. 处理异常: 在实际应用中,可能会出现网络异常或其他错误。在处理消息时,建议捕获异常并进行适当的处理,例如记录日志、重试或通知管理员。
  3. 控制消费速度: 使用合适的方式控制消费速度,避免消费者处理消息的速度过快或过慢。
  4. 注意消息处理的幂等性: 由于消息的推送和处理是异步的,确保消息处理的幂等性,以防止重复处理相同的消息。

推模式的消费方式可以更实时地获取消息并进行处理,适用于需要快速响应和实时性要求较高的场景。然而,需要注意消费者的阻塞和异常处理,以及消息处理的幂等性问题。根据具体的业务需求,选择合适的消费模式,推模式和拉模式都有各自的适用场景。

3.4.2 拉模式

拉模式是一种消息消费方式,其中消费者根据需要主动从队列中拉取消息进行处理。相比于推模式,拉模式更加灵活,消费者可以根据自身的处理能力和需求主动控制消息获取的频率。

拉模式的消费方式可以通过以下步骤实现:

  1. 建立连接和创建通道: 首先,你需要建立到 RabbitMQ 服务器的连接,并创建一个通道(Channel)。RabbitMQ 的连接和通道是发送和接收消息的基础。
  2. 声明队列: 在消费消息之前,你需要先声明要消费的队列(Queue)。如果队列不存在,RabbitMQ 会自动创建一个新的队列。
  3. 获取消息: 使用 basicGet 方法主动从队列中获取一条消息。该方法会立即返回,无论队列中是否有消息。如果队列为空,返回的消息对象为 null。
  4. 处理消息: 对于获取的消息对象,你可以进行相应的处理操作,例如解析消息内容、执行业务逻辑等。
  5. 手动确认: 在消息处理完成后,调用 basicAck 方法手动确认消息。通过向 RabbitMQ 发送确认信息,告知它消息已被处理。
  6. 重复获取和处理消息: 通过循环不断地执行步骤 3 至步骤 5,即可实现拉模式的消息消费。

下面是一个使用 RabbitMQ 的 Java 客户端库实现拉模式消费消息的简单示例:

public class PullConsumer {

    private static final String QUEUE_NAME = "myQueue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 循环获取和处理消息
            while (true) {
                // 获取消息
                GetResponse response = channel.basicGet(QUEUE_NAME, true);

                if (response != null) {
                    // 处理消息
                    String message = new String(response.getBody(), "UTF-8");
                    System.out.println("Received message: " + message);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项:

  1. 控制消息获取频率: 在拉模式下,消费者可以根据自身的处理能力和需求主动控制消息获取的频率。可以使用合适的等待时间,以避免过于频繁地获取消息。
  2. 处理异常: 在实际应用中,可能会出现网络异常或其他错误。在处理消息时,建议捕获异常并进行适当的处理,例如记录日志、重试或通知管理员。
  3. 注意消息处理的幂等性: 由于消息的拉取和处理是异步的,确保消息处理的幂等性,以防止重复处理相同的消息。

拉模式的消费方式相对于推模式更加灵活,适用于需要根据消费者自身需求主动控制消息获取的场景。然而,需要注意控制消息获取频率和异常处理,以及消息处理的幂等性问题。根据具体的业务需求,选择合适的消费模式,拉模式和推模式都有各自的适用场景。

3.5 消费端的确认与拒绝

在 RabbitMQ 中,消费者可以通过手动确认和拒绝消息来确保消息的处理可靠性。当消费者成功处理了一条消息时,可以发送确认消息给 RabbitMQ,告知它该消息已经被处理。如果在处理消息时出现异常或处理失败,消费者可以发送拒绝消息给 RabbitMQ,要求重新投递或将消息转移到死信队列。以下是手动确认和拒绝消息的方法:

  1. 手动确认消息: 在消费者处理消息成功后,调用 basicAck 方法手动确认消息。这会告诉 RabbitMQ 该消息已被成功处理,可以从队列中删除。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        try {
            // 处理消息
            processMessage(body);

            // 手动确认消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();
            // 如果处理消息失败,可以选择拒绝消息并重新投递或转移到死信队列
            // channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
});
  1. 手动拒绝消息: 在消费者处理消息失败时,可以调用 basicReject 方法手动拒绝消息。第三个参数 requeue 指定是否重新将消息放回队列。如果 requeue 为 false,则消息将会被删除或进入死信队列;如果为 true,则消息会重新投递到队列中。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        try {
            // 处理消息(假设出现异常)
            throw new Exception("Something went wrong!");

        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();

            // 手动拒绝消息,并不重新投递
            channel.basicReject(envelope.getDeliveryTag(), false);
            // 或手动拒绝消息,并重新投递
            // channel.basicReject(envelope.getDeliveryTag(), true);
        }
    }
});

在消费端出现异常时的处理方式取决于业务需求和消息处理策略:

  • 如果消息处理失败,且不希望重新处理该消息,可以使用 basicReject 方法拒绝消息,并设置 requeue 参数为 false,告诉 RabbitMQ 将该消息丢弃或转移到死信队列。
  • 如果消息处理失败,但希望重新处理该消息,可以使用 basicReject 方法拒绝消息,并设置 requeue 参数为 true,将消息重新放回队列。
  • 如果消息处理失败,但希望等待一段时间后再重新处理该消息,可以使用 basicNack 方法拒绝消息,并设置 requeue 参数为 true,并结合消息的过期时间或延迟队列来实现延迟重试。
  • 如果消息处理失败,但希望将该消息保存起来以供稍后处理,可以使用 basicReject 方法拒绝消息,并将消息内容持久化到数据库或其他存储介质中。
  • 在处理异常时,建议记录日志,以便后续排查问题和分析失败原因。

总之,手动确认和拒绝消息能够确保消息的处理可靠性,并根据业务需求和消息处理策略选择适当的处理方式。在消费端出现异常时,可以选择拒绝消息并重新投递、拒绝消息并将其丢弃或转移到死信队列,或者将消息持久化到数据库中等方式来处理异常情况。

3.6 关闭连接

正确地关闭与 RabbitMQ 服务器的连接是很重要的,这样可以释放资源并避免可能的内存泄漏或其他问题。在关闭连接时,需要注意以下事项:

  1. 关闭通道(Channel): 在关闭连接之前,先关闭所有的通道。通道是进行消息传递的实际渠道,关闭通道可以确保所有的消息都已被处理或传递。
  2. 停止消费者: 如果存在消费者,确保在关闭连接之前先停止消费者。这样可以防止消费者在连接关闭后继续尝试消费消息,从而导致资源浪费或错误。
  3. 关闭连接: 最后,关闭与 RabbitMQ 服务器的连接。

在Java客户端库中,关闭连接的操作如下所示:

public class ConnectionManager {
    private Connection connection;

    // 建立连接
    public void connect() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 关闭连接
    public void closeConnection() {
        try {
            if (connection != null) {
                // 关闭所有通道
                for (Channel channel : connection.getChannels()) {
                    channel.close();
                }
                // 关闭连接
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

需要注意的事项:

  1. 在关闭连接前确保所有的通道都已关闭。忽略关闭通道的步骤可能导致资源泄漏。
  2. 在关闭连接时,要确保所有需要处理的消息都已被消费或确认。如果有未处理的消息,它们可能会在连接关闭后重新投递给其他消费者,或者进入死信队列,具体取决于消费端的处理策略。
  3. 在连接关闭后,不要再试图使用已关闭的连接或通道。这可能会导致意外错误。
  4. 如果在连接关闭前出现异常,要进行适当的异常处理,例如记录日志或尝试重新关闭连接。

总结起来,正确地关闭与 RabbitMQ 服务器的连接是确保应用程序稳定性和性能的重要一步。遵循上述步骤,先关闭通道,再停止消费者,最后关闭连接。同时,注意异常处理,以及在连接关闭后不再使用已关闭的连接或通道。这样可以避免资源泄漏和其他可能的问题,并保证应用程序的正常运行。

3.7 小结

本章介绍了 RabbitMQ 客户端的开发向导,包括连接 RabbitMQ 服务器、使用交换机和队列、发送和消费消息等操作。在下一章中,我们将进一步学习 RabbitMQ 的高级特性,包括消息何去何从、过期时间、死信队列、延迟队列等功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/813396.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第1章 JavaScript简史

JavaScript的起源 JavaScript是Netscape公司与Sun公司合作开发的在JavaScript诞生之前游览器就是显示超文本文档的简单的软件&#xff0c;JavaScript为此增加了交互行为ECMAScript是JavaScript的标准化&#xff0c;本质上是同一个语言JavaScript是一门脚本语言通常只能运行在游…

VCS ICO - Intelligent Coverage Optimization

ico是vcs提供的用于优化覆盖率的feature&#xff1b;一般用户通过dist solver bofore等约束了变量的随机概率&#xff0c;而ico会在用户约束的基础上&#xff0c;做一些自动“修正”&#xff0c;以此来优化随机激励&#xff0c;提高随机多样性&#xff0c;加速覆盖率收敛&#…

【腾讯云 Cloud Studio 实战训练营】通过云IDE构建Web3项目

文章目录 背景一、 前言二、 Cloud Studio 主要功能三、Cloud Studio 实验前期准备3.1. 注册平台 四、构建Web3项目项目中技术栈 五、其他功能演示六、常见问题及注意事项七、总结八、相关链接 ​ Cloud Studio 是基于浏览器的集成式开发环境&#xff08;IDE&#xff09;&#…

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(12)-Fiddler设置IOS手机抓包,你知多少???

1.简介 Fiddler不但能截获各种浏览器发出的 HTTP 请求&#xff0c;也可以截获各种智能手机发出的HTTP/ HTTPS 请求。 Fiddler 能捕获Android 和 Windows Phone 等设备发出的 HTTP/HTTPS 请求。同理也可以截获iOS设备发出的请求&#xff0c;比如 iPhone、iPad 和 MacBook 等苹…

每日一题——只出现一次的数字

只出现一次的数字 题目链接 思路 要求为线性时间复杂度&#xff0c;即时间复杂度为O(n)&#xff0c;那么我们就不能用简单的两层循环来解决问题 要求只能使用常量额外空间&#xff0c;即空间复杂度为O(1)&#xff0c;那么我们就不能额外开辟一个数组来记录每个元素出现的次数…

Cpp学习——通过日期类来了解Cpp中的运算符重载

目录 一&#xff0c;日期类 二&#xff0c;运算符重载 运算符重载1(比较&#xff09; 1.< 2. 复用 3.> 4.! 5.< 6.> 运算符重载2(日期加减&#xff09; 0.准备条件------计算每月的日期函数 1. 2. 3.- 4.- 5.前置 6.后置 7前置-- 6.后置-- 7.计…

「BLIP 微调指南」以 Image-Text Captioning 任务为例

前言&#xff1a;近日需要用到 BLIP 微调下游任务&#xff0c;搜索发觉如今并无 BLIP 微调教程&#xff0c;下面就以 Image-Text Captioning 任务为例&#xff0c;演示如何完成 BLIP 模型在自己数据集上的微调。 目录 1. BLIP 介绍2. 关键代码定位3. 关键参数赋值4. 模型定义&a…

Scratch 教程 之 如何四舍五入保留一个小数到指定的数位

有些时候&#xff0c;我们需要四舍五入一个多位小数到指定的位&#xff0c;但scratch并没有这个积木&#xff0c;怎么做呢&#xff1f;我来教你&#xff5e; 我们创建一个函数&#xff0c;需要时调用就行了&#xff5e; 如图&#xff0c;创建一个带参函数&#xff0c;勾选"…

《GreenPlum系列-部署维护》GreenPlum数据库Standby故障处理

一、Standby故障 1.检查监控中心数据库状态 2.查看master节点数据库状态 su - gpadmin gpstate -f二、重启数据库 1.快速关闭数据库 [gpadminmdw pg_log]$ gpstop -M fast ... Continue with Greenplum instance shutdown Yy|Nn (defaultN): > y ...2.开启数据库 [gpad…

[SSM]Spring对事务的支持

目录 十六、Spring对事务的支持 16.1事务概述 16.2引入事务场景 16.3Spring对事务的支持 Spring实现事务的两种方式 Spring事务管理API 声明式事务之注解实现方式 事务属性 事务的全注解式开发 声明式事务之XML实现方式 十六、Spring对事务的支持 16.1事务概述 什么是…

stm32 mpu6050 cubemx DMP法读取角度

文章目录 前言一、相关文件二、cubemx配置三、代码变量初始化主循环 总结 前言 文件 记录使用dmp库来读取mpu6050的角度。 这是参考文件 参考1–主要参考 github参考 参考2 参考三 一、相关文件 相关文件在这里下载&#xff08;未填&#xff0c;不过可以在上面的git中下载&a…

(树) 剑指 Offer 28. 对称的二叉树 ——【Leetcode每日一题】

❓ 剑指 Offer 28. 对称的二叉树 难度&#xff1a;简单 请实现一个函数&#xff0c;用来判断一棵二叉树是不是对称的。如果一棵二叉树和它的镜像一样&#xff0c;那么它是对称的。 例如&#xff0c;二叉树 [1,2,2,3,4,4,3] 是对称的。 1/ \2 2/ \ / \3 4 4 3但是下面这个…

在Python中导入gurobipy模块显示ModuleNotFoundError: No module named ‘gurobipy‘的解决办法

笔者在学习用pythonAnacondagurobi求解优化问题时遇到了这个问题&#xff0c;看了很多帖子的方法都没有解决&#xff0c;所以这里分享一下自己的解决办法。 首先找到自己用的Anaconda3所在位置&#xff0c;这里不再赘述。 下拉文件夹找到envs&#xff0c;双击PythonProject&…

【雕爷学编程】MicroPython动手做(18)——掌控板之声光传感器

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

深入探索Linux文件系统与挂载点:掌握分区、挂载技巧

Linux 文件系统就是在 Linux 操作系统里咱们用来组织和管理文件的工具。它规定了文件和文件夹的结构&#xff0c;还确定了它们在存储设备上的储存方式。大致上来说&#xff0c;Linux 有很多种文件系统&#xff0c;比如 ext4、XFS、Btrfs 等。 挂载点其实就是把一个文件系统与 …

VS Code环境配置问题

VS Code 环境配置问题 文章目录 VS Code 环境配置问题配置 C问题解决不乱码只显示结果避免闪退&#xff0c;中文乱码 配置 Java下载 JDKJDK 环境配置安装插件 配置 C 跟着官网教程&#xff08;英文版&#xff09;和其他博客配置了一遍&#xff0c;却遇到了很多小问题&#xff…

2023“Java 基础 - 中级 - 高级”面试集结,已奉上我的膝盖

Java 基础&#xff08;对象线程字符接口变量异常方法&#xff09; 面向对象和面向过程的区别&#xff1f; Java 语言有哪些特点&#xff1f; 关于 JVM JDK 和 JRE 最详细通俗的解答 Oracle JDK 和 OpenJDK 的对比 Java 和 C的区别&#xff1f; 什么是 Java 程序的主类&…

Metabase 远程代码执行(CVE-2023-38646)

漏洞描述 Metabase是一款开源数据分析及可视化工具。它可允许用户连接至各种不同类型数据源,未经身份认证的攻击者可利用本漏洞在服务器上以运行 Metabase服务器的权限进行任意命令执行。 免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩…

第133页的gtk+编程例子——计算器应用改写网上的例子用gtk4编译

第133页的gtk编程例子——计算器应用改写网上的例子用gtk4编译 来源&#xff1a;《GTK的计算器》 https://blog.csdn.net/zhouzhouzf/article/details/17097999 例子程序是在gtk2.0编译的&#xff0c;之前修改许多地方才能在gtk3.0编译通过&#xff0c;如今再修改能在gtk4编译…

进阶高级测试专项,Pytest自动化测试框架总结(三)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、pytest前置条件…